Chunk a Dataframe to Parquet
Question
I have a large pandas dataframe in memory. How can I chunk it into Parquet files using Metaflow?
Solution
You can do this using Metaflow's foreach. A pattern you can use to do this is to use Apache Arrow's zero copy slice ability with Metaflow's foreach.
1Gather data
Suppose you have curated a dataset:
import numpy as np
import pandas as pd
import string
from datetime import datetime
letters = list(string.ascii_lowercase)
make_str = lambda n: ''.join(np.random.choice(letters, size=n))
dates = pd.date_range(start=datetime(2010,1,1),
end=datetime.today(),
freq="min")
size = len(dates)
df = pd.DataFrame({
'date': dates,
'num1': np.random.rand(size),
'num2': np.random.rand(size),
'str1': [make_str(20) for _ in range(size)],
'str2': [make_str(20) for _ in range(size)]
})
df.to_csv("./large_dataframe.csv")
df.head(3)
date | num1 | num2 | str1 | str2 | |
---|---|---|---|---|---|
0 | 2010-01-01 00:00:00 | 0.424410 | 0.503014 | xyouzjaivrwtnqczcieb | fonxhwjxdpdvnfvtvcar |
1 | 2010-01-01 00:01:00 | 0.650159 | 0.184204 | dxrqtbmezgwobpqlpybt | ihahasnbtgptjfwnvlic |
2 | 2010-01-01 00:02:00 | 0.602216 | 0.647338 | kaatnygdfekoxmpnvbky | wffzxlyzjnopahttvdxe |
and your goal is to store this data efficiently in Parquet files.
2Determine How to Chunk the Data
Pyarrow version 5.0.0
is used to split the dataframe into chunks. You can see how in this utility function that will be used in the following flow:
dataframe_utils.py
import pyarrow as pa
import pandas as pd
from datetime import datetime
from typing import List, Tuple
def get_chunks(df:pd.DataFrame = None,
num_chunks:int = 4) -> Tuple[pa.Table, List]:
get_year = lambda x: datetime.strptime(
x.split()[0], "%Y-%m-%d").year
df['year'] = df.date.apply(get_year)
num_records = df.shape[0] // num_chunks
lengths = [num_records] * num_chunks
lengths[-1] += df.shape[0] - num_chunks*num_records
offsets = [sum(lengths[:i]) for i in range(num_chunks)]
names = ["chunk_%s" %i for i in range(num_chunks)]
return (pa.Table.from_pandas(df),
list(zip(names, offsets, lengths)))
3Run Flow
This flow shows how to load this into a pandas dataframe and apply the following steps:
- Use
pyarrow.from_pandas
method to load the data to Arrow memory. - In parallel branches:
- Use
pyarrow.Table.slice
to make zero-copy views of chunks of the table. - Apply a transformation to the table; appending a column in this case
- Move the chunks to your S3 bucket using
pyarrow.parquet.write_table
.
- Use
- Pick a chunk and verify the existence of the new transformed column.
If you have a dataframe in S3 that you want to read into memory, you can see an example specific to this topic here.
chunk_dataframe.py
from metaflow import FlowSpec, step
class ForEachChunkFlow(FlowSpec):
bucket = "s3://outerbounds-how-tos"
s3_path = "{}/dataframe-chunks/{}.parquet"
df_path = "./large_dataframe.csv"
@step
def start(self):
import pandas as pd
from dataframe_utils import get_chunks
my_big_df = pd.read_csv(self.df_path)
self.table, self.chunks = get_chunks(my_big_df)
self.next(self.process_chunk, foreach='chunks')
@step
def process_chunk(self):
import pyarrow as pa
import pyarrow.parquet as pq
# get view of this chunk only
chunk_id, offset, length = self.input
chunk = self.table.slice(offset=offset, length=length)
# do transformation on table
col1 = chunk['num1'].to_numpy()
col2 = chunk['num2'].to_numpy()
values = pa.array(col1 * col2)
chunk = chunk.append_column('new col', values)
# write chunk as parquet file in S3 bucket
self.my_path = self.s3_path.format(self.bucket, chunk_id)
pq.write_table(table=chunk, where=self.my_path)
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
import pyarrow.parquet as pq
test_id = 'chunk_1'
path = self.s3_path.format(self.bucket, test_id)
test_chunk = pq.read_table(source=path)
assert 'new col' in test_chunk.column_names
if __name__ == "__main__":
ForEachChunkFlow()
python chunk_dataframe.py run