Skip to main content

Nested Foreach Flows

Question

How can I create a nested foreach with a Metaflow DAG?

Solution

You can nest linear, branching, and foreach steps, or any of their combinations between, a foreach and its corresponding join step. Here is a Slack conversation you can read to learn the fundamentals of nested foreach. The rest of this post demonstrates a machine learning use case.

1Write a Flow with a Nested Foreach

For example, flow trains two hyperparameter configurations for each of two model types. In this case the models are sklearn.linear_model.LogisticRegression and sklearn.ensemble.RandomForestClassifier. However, Metaflow is agnostic to the modeling frameworks you use so you can extend this template to many more models and hyperparameter combinations.

The flow contains the following steps:

  • start configures the first for each split using the self.model_param_combination variable. The variable name passed to foreach must be an iterable.
  • foreach model and list of parameter combinations, the tune step calls another foreach over the parameter combinations unique to this model.
  • foreach parameter combination foreach model, the train_and_score step fits a classifier and tracks cross validation accuracies.
  • join_train collects results foreach train_and_score step of one branch of the tune step.
  • join_tune collects results foreach tune step.

nested_foreach.py
from metaflow import FlowSpec, step

class NestedForeachFlow(FlowSpec):

@step
def start(self):
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
iris = load_iris()
self.X = iris['data']
self.y = iris['target']
self.model_param_combination = [
(
LogisticRegression,
[{"C": 0.5}, {"C": 1.0}]
),
(
RandomForestClassifier,
[{"max_depth": 2}, {"max_depth": 6}]
)
]
self.next(self.tune, foreach="model_param_combination")

@step
def tune(self):
self.model, self.param_dict = self.input
self.next(self.train_and_score, foreach="param_dict")

@step
def train_and_score(self):
from sklearn.model_selection import cross_val_score
self.params = self.input
self.clf = self.model(**self.params)
self.scores = cross_val_score(
self.clf, self.X, self.y, cv=5)
self.next(self.join_train)

@step
def join_train(self, inputs):
import json
import numpy as np
self.scores = {
'model': [],
'params': [],
'mean accuracy': [],
'std accuracy': [],
}
for i in inputs:
self.scores['model'].append(i.clf.__class__)
self.scores['params'].append(json.dumps(i.params))
self.scores['mean accuracy'].append(np.mean(i.scores))
self.scores['std accuracy'].append(np.std(i.scores))
self.next(self.join_tune)

@step
def join_tune(self, inputs):
self.all_scores = {}
for model in inputs:
if self.all_scores == {}:
self.all_scores = model.scores
else:
for k,v in model.scores.items():
self.all_scores[k] += v
self.next(self.end)

@step
def end(self):
pass

if __name__ == "__main__":
NestedForeachFlow()

2Run the Flow

python nested_foreach.py run
     Workflow starting (run-id 1663634177889364):
[1663634177889364/start/1 (pid 40412)] Task is starting.
[1663634177889364/start/1 (pid 40412)] Foreach yields 2 child steps.
[1663634177889364/start/1 (pid 40412)] Task finished successfully.
[1663634177889364/tune/2 (pid 40416)] Task is starting.
[1663634177889364/tune/3 (pid 40417)] Task is starting.
[1663634177889364/tune/3 (pid 40417)] Foreach yields 2 child steps.
[1663634177889364/tune/3 (pid 40417)] Task finished successfully.
[1663634177889364/train_and_score/4 (pid 40424)] Task is starting.
[1663634177889364/train_and_score/5 (pid 40425)] Task is starting.
[1663634177889364/tune/2 (pid 40416)] Foreach yields 2 child steps.
[1663634177889364/tune/2 (pid 40416)] Task finished successfully.
[1663634177889364/train_and_score/6 (pid 40426)] Task is starting.
[1663634177889364/train_and_score/7 (pid 40427)] Task is starting.
[1663634177889364/train_and_score/6 (pid 40426)] Task finished successfully.
[1663634177889364/train_and_score/7 (pid 40427)] Task finished successfully.
[1663634177889364/join_train/8 (pid 40440)] Task is starting.
[1663634177889364/train_and_score/4 (pid 40424)] Task finished successfully.
[1663634177889364/train_and_score/5 (pid 40425)] Task finished successfully.
[1663634177889364/join_train/9 (pid 40441)] Task is starting.
[1663634177889364/join_train/8 (pid 40440)] Task finished successfully.
[1663634177889364/join_train/9 (pid 40441)] Task finished successfully.
[1663634177889364/join_tune/10 (pid 40448)] Task is starting.
[1663634177889364/join_tune/10 (pid 40448)] Task finished successfully.
[1663634177889364/end/11 (pid 40452)] Task is starting.
[1663634177889364/end/11 (pid 40452)] Task finished successfully.
Done!

3Analyze Results

After running the flow you can use this code in any Python script or notebook to access the results of each model training run:

from metaflow import Flow
import pandas as pd
results = pd.DataFrame(Flow('NestedForeachFlow').latest_run.data.all_scores)
results
model params mean accuracy std accuracy
0 <class 'sklearn.linear_model._logistic.Logisti... {"C": 0.5} 0.966667 0.021082
1 <class 'sklearn.linear_model._logistic.Logisti... {"C": 1.0} 0.973333 0.024944
2 <class 'sklearn.ensemble._forest.RandomForestC... {"max_depth": 2} 0.960000 0.024944
3 <class 'sklearn.ensemble._forest.RandomForestC... {"max_depth": 6} 0.960000 0.024944

Further Reading