Chunk a dataframe to .parquet
Question
I have a large pandas dataframe in memory. How can I chunk it into Parquet files using Metaflow?
Solution
You can do this using Metaflow's foreach. A pattern you can use to do this is to use Apache Arrow's zero copy slice ability with Metaflow's foreach.
1Gather data
Suppose you have curated a dataset:
import numpy as np
import pandas as pd
import string 
from datetime import datetime
letters = list(string.ascii_lowercase)
make_str = lambda n: ''.join(np.random.choice(letters, size=n))
dates = pd.date_range(start=datetime(2010,1,1), 
                      end=datetime.today(),
                      freq="min")
size = len(dates)
df = pd.DataFrame({
    'date': dates,
    'num1': np.random.rand(size),
    'num2': np.random.rand(size),
    'str1': [make_str(20) for _ in range(size)],
    'str2': [make_str(20) for _ in range(size)]
})
df.to_csv("./large_dataframe.csv")
df.head(3)
| date | num1 | num2 | str1 | str2 | |
|---|---|---|---|---|---|
| 0 | 2010-01-01 00:00:00 | 0.424410 | 0.503014 | xyouzjaivrwtnqczcieb | fonxhwjxdpdvnfvtvcar | 
| 1 | 2010-01-01 00:01:00 | 0.650159 | 0.184204 | dxrqtbmezgwobpqlpybt | ihahasnbtgptjfwnvlic | 
| 2 | 2010-01-01 00:02:00 | 0.602216 | 0.647338 | kaatnygdfekoxmpnvbky | wffzxlyzjnopahttvdxe | 
and your goal is to store this data efficiently in Parquet files.
2Determine How to Chunk the Data
Pyarrow version 5.0.0 is used to split the dataframe into chunks. You can see how in this utility function that will be used in the following flow:
dataframe_utils.py
import pyarrow as pa
import pandas as pd
from datetime import datetime
from typing import List, Tuple
def get_chunks(df:pd.DataFrame = None,
               num_chunks:int = 4) -> Tuple[pa.Table, List]:
    get_year = lambda x: datetime.strptime(
        x.split()[0], "%Y-%m-%d").year
    df['year'] = df.date.apply(get_year)
    num_records = df.shape[0] // num_chunks
    lengths = [num_records] * num_chunks
    lengths[-1] += df.shape[0] - num_chunks*num_records
    offsets = [sum(lengths[:i]) for i in range(num_chunks)]
    names = ["chunk_%s" %i for i in range(num_chunks)]
    return (pa.Table.from_pandas(df), 
            list(zip(names, offsets, lengths)))
3Run Flow
This flow shows how to load this into a pandas dataframe and apply the following steps:
- Use 
pyarrow.from_pandasmethod to load the data to Arrow memory. - In parallel branches:
- Use 
pyarrow.Table.sliceto make zero-copy views of chunks of the table. - Apply a transformation to the table; appending a column in this case
 - Move the chunks to your S3 bucket using 
pyarrow.parquet.write_table. 
 - Use 
 - Pick a chunk and verify the existence of the new transformed column.
 
If you have a dataframe in S3 that you want to read into memory, you can see an example specific to this topic here.
chunk_dataframe.py
from metaflow import FlowSpec, step
class ForEachChunkFlow(FlowSpec):
    
    bucket = "s3://outerbounds-how-tos"
    s3_path = "{}/dataframe-chunks/{}.parquet"
    df_path = "./large_dataframe.csv"
    
    @step
    def start(self):
        import pandas as pd
        from dataframe_utils import get_chunks
        my_big_df = pd.read_csv(self.df_path)
        self.table, self.chunks = get_chunks(my_big_df)
        self.next(self.process_chunk, foreach='chunks')
    
    @step
    def process_chunk(self):
        import pyarrow as pa
        import pyarrow.parquet as pq
        
        # get view of this chunk only
        chunk_id, offset, length = self.input
        chunk = self.table.slice(offset=offset, length=length)
    
        # do transformation on table
        col1 = chunk['num1'].to_numpy()
        col2 = chunk['num2'].to_numpy()
        values = pa.array(col1 * col2)
        chunk = chunk.append_column('new col', values)
    
        # write chunk as parquet file in S3 bucket
        self.my_path = self.s3_path.format(self.bucket, chunk_id)
        pq.write_table(table=chunk, where=self.my_path)
        self.next(self.join)
        
    @step
    def join(self, inputs):
        self.next(self.end)
    @step
    def end(self):
        import pyarrow.parquet as pq
        test_id = 'chunk_1'
        path = self.s3_path.format(self.bucket, test_id)
        test_chunk = pq.read_table(source=path)
        assert 'new col' in test_chunk.column_names
    
if __name__ == "__main__":
    ForEachChunkFlow()
python chunk_dataframe.py run