Filter Flows on Condition
Question
How can I filter the runs of my flows based on some condition?
Solution
You can use the Metaflow Client API to access the results of your flows.
1Run Flow
This flow and the subsequent client API calls show how to:
- Define a flow that has a conditional value saved as a flow artifact.
- Filter runs of this flow by successful runs.
- Filter runs of this flow by time.
- Filter runs of this flow by a flow artifact value.
filter_on_condition.py
from metaflow import FlowSpec, step
class ConditionalFilterFlow(FlowSpec):
@step
def start(self):
import random
self.fancy_conditional = random.choice([1,2,3])
print(self.fancy_conditional)
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
ConditionalFilterFlow()
python filter_on_condition.py run
2Filter by Successful Runs
Here is an example using the Client API to access the results of the run you just did. Note that Flow(flow_name)
will return a generator that can be iterated over. You may also want to cut off the amount of runs that are fetched.
from metaflow import Flow
flow_name = 'ConditionalFilterFlow'
max_runs = 100 # limit query in case of many flow runs
successful_runs = []
for i, run in enumerate(Flow(flow_name)):
if run.successful:
successful_runs.append(run)
if i >= max_runs:
break
successful_runs
3Filter by Time
You can use properties of the Metaflow Run
to filter. This snippet shows how to use the datetime
library to filter runs from January 1st of this year to today:
from metaflow import Flow
from datetime import datetime
now = datetime.now()
start_year, today = datetime(now.year, 1, 1), datetime.now()
runs_this_year = []
max_runs = 100
for i, run in enumerate(Flow(flow_name)):
if run.created_at > start_year and run.created_at < today:
runs_this_year.append(run)
if i >= max_runs:
break
runs_this_year
4Filter by Flow Artifact
This snippet shows how to use the datetime library to filter runs from January 1st of this year to today.
from metaflow import Flow
runs_with_fancy_condition = []
num_runs_without_fancy_condition = 0
filter_value = 1
key_errors = 0
max_runs = 100
for i, run in enumerate(Flow(flow_name)):
try:
if run['start'].task.data.fancy_conditional == filter_value:
runs_with_fancy_condition.append(run)
else:
num_runs_without_fancy_condition += 1
except KeyError:
key_errors += 1
print("This flow has {} runs that have different fancy_conditional values != 1".format(
num_runs_without_fancy_condition))
print("This flow has {} runs that do not have the fancy_conditional parameter".format(
key_errors))
for run in runs_with_fancy_condition:
assert run.data.fancy_conditional == filter_value