Load Parquet Data from S3 to Arrow Table
Question
I have a Parquet dataset stored in AWS S3 and want to access it in a Metaflow flow. How can I read one or several Parquet files at once from a flow and use them in an Arrow table?
Solution
You can access a Parquet dataset on S3 in a Metaflow flow using the metaflow.S3
functionalities. This feature allows you to download and upload large datasets with high throughput from S3.
1Access Parquet Data in S3
It is recommended to use metaflow.S3
in a context manager. It is important to know that metaflow.S3
saves temporary files for the duration of the context. This is why in the following example you will see file names rewritten for access after the scope closes.
To access one file you can use metaflow.S3.get
. Often times Parquet datasets have many files which is a good use case for Metaflow's s3.get_many
function.
This example uses Parquet data stored in S3 from Ookla Global's AWS Open Data Submission.
2Run Flow
This flow shows how to:
- Download multiple Parquet files using Metaflow's
s3.get_many
function. - Read the result of the first dataset chunk as a PyArrow table.
from metaflow import FlowSpec, step, S3
BASE_URL = 's3://ookla-open-data/' + \
'parquet/performance/type=fixed/'
YEARS = ['2019', '2020', '2021', '2022']
S3_PATHS = [
f'year={y}/quarter=1/{y}-' + \
'01-01_performance_fixed_tiles.parquet'
for y in YEARS
]
class ParquetArrowFlow(FlowSpec):
@step
def start(self):
self.next(self.load_parquet)
@step
def load_parquet(self):
import pyarrow.parquet as pq
with S3(s3root=BASE_URL) as s3:
tmp_data_path = s3.get_many(S3_PATHS)
first_path = tmp_data_path[0].path
self.table = pq.read_table(first_path)
self.next(self.end)
@step
def end(self):
print('Table for first year' + \
f'has shape {self.table.shape}.')
if __name__ == '__main__':
ParquetArrowFlow()
python load_parquet_to_arrow.py run
3Access Artifacts Outside of Flow
The following can be run in any script or notebook to access the contents of the table that was stored as a flow artifact with self.table
. You can also run quick tests to assert the artifacts have expected properties.
from metaflow import Flow
run = Flow('ParquetArrowFlow').latest_run
table = run.data.table
assert run.successful
assert table.shape == (4877036, 7)
table.select([1,2,3,4,5])
Further Reading
- Working with cloud data in Metaflow
- Scaling Metaflow flows