Download Metaflow Task Code Package
Question
How to download and unpack the code package that Metaflow versions in the cloud?
Solution
On every run where you use a remote datastore, Metaflow snapshots your code package and pushes it to the datastore. It is both useful for versioning, and it is used to bootstrap the task runtime environment.
This page shows how to use the Metaflow Client API to download the code package from any task onto your workstation.
1Find the Relevant Task Pathspec
from metaflow import Task
# You can find your this info in the CLI logs
# If you run the Metaflow GUI it is also visible there
# Alternatively, if you use Outerbounds, find it in the dashboard
flow_name = 'TrainiumBERTFinetune'
run_id = '216825'
step_name = 'tune_bert'
task_id = 'control-216825-cache_dataset-1332173'
task_pathspec = f'{flow_name}/{run_id}/{step_name}/{task_id}'
# use the Metaflow Client API
task = Task(task_pathspec)
Once you have identified and loaded the Task
object using Metaflow's Client API, you can download and unpack the code that was run on the remote machine.
2Download and Unpack the Code Package
EXTRACTION_PATH = task_id # set EXTRACTION_PATH to your desired location
task.code.tarball.extractall(path=EXTRACTION_PATH)
# What is the structure of the remote code package?
# Note: this will be different for each task.
ls {EXTRACTION_PATH}
3Bonus: Compare Code Package Versions
Suppose you want compare to versions of the codebase used to run a task. This section will provide a function to help you get started.
def compare_code_packages(
pathspec1,
pathspec2,
tmp_dir='/tmp/code-package-diffs',
html_dir='code-package-file-diffs'
):
import os
import pprint
from tempfile import TemporaryDirectory
import difflib
from filecmp import dircmp, cmpfiles
from metaflow import Task
def print_diff_files(dcmp):
"https://docs.python.org/3/library/filecmp.html"
for name in dcmp.diff_files:
print("diff_file %s found in %s and %s" % (
name, dcmp.left, dcmp.right))
for sub_dcmp in dcmp.subdirs.values():
print_diff_files(sub_dcmp)
def print_common_file_cmp(dir1, dir2, common, shallow=False):
match, mismatch, errors = cmpfiles(
dir1, dir2, common, shallow=shallow)
print('Matching files: '); pprint.pprint(match)
print('\nMismatching files: '); pprint.pprint(mismatch)
print('\nErrors: '); pprint.pprint(errors)
return mismatch
def create_file_diff(old_file, new_file, output_file):
file_1 = open(old_file, 'r').readlines()
file_2 = open(new_file, 'r').readlines()
if output_file:
delta = difflib.HtmlDiff().make_file(
file_1, file_2, old_file, new_file
)
with open(output_file, "w") as f:
print('Writing: ', output_file)
f.write(delta)
else:
delta = difflib.unified_diff(
file_1, file_2, old_file.name, new_file.name)
sys.stdout.writelines(delta)
def list_files_recursive(directory):
file_paths = []
for root, _, files in os.walk(directory):
for file in files:
file_paths.append(os.path.relpath(
os.path.join(root, file), start=directory))
return file_paths
def find_files_intersection(dir1, dir2):
files_dir1 = set(list_files_recursive(dir1))
files_dir2 = set(list_files_recursive(dir2))
file_intersection = list(files_dir1 & files_dir2)
return file_intersection
# download code packages
os.makedirs(os.path.join(tmp_dir, pathspec1), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, pathspec2), exist_ok=True)
td1 = TemporaryDirectory(dir=tmp_dir, prefix=pathspec1)
td2 = TemporaryDirectory(dir=tmp_dir, prefix=pathspec2)
Task(pathspec1).code.tarball.extractall(path=td1.name)
Task(pathspec2).code.tarball.extractall(path=td2.name)
# find files with a diff
file_intersection = list(filter(
lambda x: not x.startswith('metaflow'), # metaflow code package is versioned already
find_files_intersection(td1.name, td2.name)
))
# find unique files
unique_files_task1 = list(filter(
lambda x: x not in file_intersection and not x.startswith('metaflow'),
list_files_recursive(td1.name)
))
unique_files_task2 = list(filter(
lambda x: x not in file_intersection and not x.startswith('metaflow'),
list_files_recursive(td2.name)
))
mismatches = print_common_file_cmp(
dir1=td1.name,
dir2=td2.name,
common=file_intersection
)
print('\nUnique files:')
print(f'\n{pathspec1}')
pprint.pprint(unique_files_task1)
print(f'\n{pathspec2}')
pprint.pprint(unique_files_task2)
# create html of diff foreach mismatch
out_files = []
print('\n')
for mismatch_file in mismatches:
file1 = os.path.join(td1.name, mismatch_file)
file2 = os.path.join(td2.name, mismatch_file)
output_html_file = os.path.join(
html_dir, mismatch_file.replace('.', '_') + '.html')
if not os.path.exists(output_html_file):
os.makedirs(
'/'.join(output_html_file.split('/')[:-1]),
exist_ok=True
)
create_file_diff(file1, file2, output_html_file)
out_files.append(output_html_file)
td1.cleanup(); td2.cleanup()
return out_files
pathspec1 = 'DBTFlow/216737/jaffle_models/1331782'
pathspec2 = 'DBTFlow/216798/jaffle_models/1332054'
generated_html_diffs = compare_code_packages(pathspec1, pathspec2)
from IPython.display import display, HTML
path = generated_html_diffs[2]
with open(path, 'r') as html:
contents = html.read()
display(HTML(contents))
/tmp/code-package-diffs/DBTFlow/216737/jaffle_models/133178241e4_rfi/test_yield.py | /tmp/code-package-diffs/DBTFlow/216798/jaffle_models/1332054laidzm33/test_yield.py | ||||
---|---|---|---|---|---|
f | 1 | import subprocess | f | 1 | import subprocess |
2 | 2 | ||||
3 | 3 | ||||
4 | def main(): | 4 | def main(): | ||
5 | print("before yield") | 5 | print("before yield") | ||
6 | for s in no(): | 6 | for s in no(): | ||
7 | print(s) | 7 | print(s) | ||
n | 8 | subprocess.call(["echo", "TEST"]) | n | 8 | for line in subp_ret(): |
9 | print(line) | ||||
9 | for s in yes(): | 10 | for s in yes(): | ||
10 | print(s) | 11 | print(s) | ||
11 | 12 | ||||
12 | print("after yield") | 13 | print("after yield") | ||
13 | 14 | ||||
14 | def yes(): | 15 | def yes(): | ||
15 | yield "ahh" | 16 | yield "ahh" | ||
16 | yield "no!" | 17 | yield "no!" | ||
17 | yield "does this work?" | 18 | yield "does this work?" | ||
18 | 19 | ||||
19 | def no(): | 20 | def no(): | ||
20 | yield "no" | 21 | yield "no" | ||
21 | yield "say it aint so" | 22 | yield "say it aint so" | ||
22 | 23 | ||||
t | t | 24 | def subp_ret(): | ||
25 | return subp() | ||||
26 | |||||
27 | |||||
28 | def subp(): | ||||
29 | try: | ||||
30 | cmd = [] | ||||
31 | process = subprocess.Popen( | ||||
32 | ["./echo.sh", "5"], | ||||
33 | stdout=subprocess.PIPE, | ||||
34 | ) | ||||
35 | while True: | ||||
36 | process.poll() | ||||
37 | if process.returncode is None: | ||||
38 | # process is still running | ||||
39 | line = process.stdout.readline() | ||||
40 | if not line: | ||||
41 | # end of stdout, but process has not ended yet. | ||||
42 | continue | ||||
43 | yield line.decode() | ||||
44 | elif process.returncode == 0: | ||||
45 | break | ||||
46 | elif process.returncode != 0: | ||||
47 | raise Exception("Derped") | ||||
48 | finally: | ||||
49 | pass | ||||
50 | |||||
23 | if __name__=="__main__": | 51 | if __name__=="__main__": | ||
24 | main() | 52 | main() |
Legends | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
|