Nested Foreach Flows
Question
How can I create a nested foreach with a Metaflow DAG?
Solution
You can nest linear, branching, and foreach steps, or any of their combinations between, a foreach and its corresponding join step. Here is a Slack conversation you can read to learn the fundamentals of nested foreach. The rest of this post demonstrates a machine learning use case.
1Write a Flow with a Nested Foreach
For example, flow trains two hyperparameter configurations for each of two model types. In this case the models are sklearn.linear_model.LogisticRegression and sklearn.ensemble.RandomForestClassifier. However, Metaflow is agnostic to the modeling frameworks you use so you can extend this template to many more models and hyperparameter combinations.
The flow contains the following steps:
startconfigures the first for each split using theself.model_param_combinationvariable. The variable name passed to foreach must be an iterable.- foreach model and list of parameter combinations, the
tunestep calls another foreach over the parameter combinations unique to this model. - foreach parameter combination foreach model, the
train_and_scorestep fits a classifier and tracks cross validation accuracies. join_traincollects results foreachtrain_and_scorestep of one branch of thetunestep.join_tunecollects results foreachtunestep.

from metaflow import FlowSpec, step
class NestedForeachFlow(FlowSpec):
@step
def start(self):
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
iris = load_iris()
self.X = iris['data']
self.y = iris['target']
self.model_param_combination = [
(
LogisticRegression,
[{"C": 0.5}, {"C": 1.0}]
),
(
RandomForestClassifier,
[{"max_depth": 2}, {"max_depth": 6}]
)
]
self.next(self.tune, foreach="model_param_combination")
@step
def tune(self):
self.model, self.param_dict = self.input
self.next(self.train_and_score, foreach="param_dict")
@step
def train_and_score(self):
from sklearn.model_selection import cross_val_score
self.params = self.input
self.clf = self.model(**self.params)
self.scores = cross_val_score(
self.clf, self.X, self.y, cv=5)
self.next(self.join_train)
@step
def join_train(self, inputs):
import json
import numpy as np
self.scores = {
'model': [],
'params': [],
'mean accuracy': [],
'std accuracy': [],
}
for i in inputs:
self.scores['model'].append(i.clf.__class__)
self.scores['params'].append(json.dumps(i.params))
self.scores['mean accuracy'].append(np.mean(i.scores))
self.scores['std accuracy'].append(np.std(i.scores))
self.next(self.join_tune)
@step
def join_tune(self, inputs):
self.all_scores = {}
for model in inputs:
if self.all_scores == {}:
self.all_scores = model.scores
else:
for k,v in model.scores.items():
self.all_scores[k] += v
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
NestedForeachFlow()
2Run the Flow
python nested_foreach.py run
3Analyze Results
After running the flow you can use this code in any Python script or notebook to access the results of each model training run:
from metaflow import Flow
import pandas as pd
results = pd.DataFrame(Flow('NestedForeachFlow').latest_run.data.all_scores)
results
| model | params | mean accuracy | std accuracy | |
|---|---|---|---|---|
| 0 | <class 'sklearn.linear_model._logistic.Logisti... | {"C": 0.5} | 0.966667 | 0.021082 |
| 1 | <class 'sklearn.linear_model._logistic.Logisti... | {"C": 1.0} | 0.973333 | 0.024944 |
| 2 | <class 'sklearn.ensemble._forest.RandomForestC... | {"max_depth": 2} | 0.960000 | 0.024944 |
| 3 | <class 'sklearn.ensemble._forest.RandomForestC... | {"max_depth": 6} | 0.960000 | 0.024944 |