Nested Foreach Flows
Question
How can I create a nested foreach with a Metaflow DAG?
Solution
You can nest linear, branching, and foreach steps, or any of their combinations between, a foreach and its corresponding join step. Here is a Slack conversation you can read to learn the fundamentals of nested foreach. The rest of this post demonstrates a machine learning use case.
1Write a Flow with a Nested Foreach
For example, flow trains two hyperparameter configurations for each of two model types. In this case the models are sklearn.linear_model.LogisticRegression
and sklearn.ensemble.RandomForestClassifier
. However, Metaflow is agnostic to the modeling frameworks you use so you can extend this template to many more models and hyperparameter combinations.
The flow contains the following steps:
start
configures the first for each split using theself.model_param_combination
variable. The variable name passed to foreach must be an iterable.- foreach model and list of parameter combinations, the
tune
step calls another foreach over the parameter combinations unique to this model. - foreach parameter combination foreach model, the
train_and_score
step fits a classifier and tracks cross validation accuracies. join_train
collects results foreachtrain_and_score
step of one branch of thetune
step.join_tune
collects results foreachtune
step.
from metaflow import FlowSpec, step
class NestedForeachFlow(FlowSpec):
@step
def start(self):
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
iris = load_iris()
self.X = iris['data']
self.y = iris['target']
self.model_param_combination = [
(
LogisticRegression,
[{"C": 0.5}, {"C": 1.0}]
),
(
RandomForestClassifier,
[{"max_depth": 2}, {"max_depth": 6}]
)
]
self.next(self.tune, foreach="model_param_combination")
@step
def tune(self):
self.model, self.param_dict = self.input
self.next(self.train_and_score, foreach="param_dict")
@step
def train_and_score(self):
from sklearn.model_selection import cross_val_score
self.params = self.input
self.clf = self.model(**self.params)
self.scores = cross_val_score(
self.clf, self.X, self.y, cv=5)
self.next(self.join_train)
@step
def join_train(self, inputs):
import json
import numpy as np
self.scores = {
'model': [],
'params': [],
'mean accuracy': [],
'std accuracy': [],
}
for i in inputs:
self.scores['model'].append(i.clf.__class__)
self.scores['params'].append(json.dumps(i.params))
self.scores['mean accuracy'].append(np.mean(i.scores))
self.scores['std accuracy'].append(np.std(i.scores))
self.next(self.join_tune)
@step
def join_tune(self, inputs):
self.all_scores = {}
for model in inputs:
if self.all_scores == {}:
self.all_scores = model.scores
else:
for k,v in model.scores.items():
self.all_scores[k] += v
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
NestedForeachFlow()
2Run the Flow
python nested_foreach.py run
3Analyze Results
After running the flow you can use this code in any Python script or notebook to access the results of each model training run:
from metaflow import Flow
import pandas as pd
results = pd.DataFrame(Flow('NestedForeachFlow').latest_run.data.all_scores)
results
model | params | mean accuracy | std accuracy | |
---|---|---|---|---|
0 | <class 'sklearn.linear_model._logistic.Logisti... | {"C": 0.5} | 0.966667 | 0.021082 |
1 | <class 'sklearn.linear_model._logistic.Logisti... | {"C": 1.0} | 0.973333 | 0.024944 |
2 | <class 'sklearn.ensemble._forest.RandomForestC... | {"max_depth": 2} | 0.960000 | 0.024944 |
3 | <class 'sklearn.ensemble._forest.RandomForestC... | {"max_depth": 6} | 0.960000 | 0.024944 |