Run SQL query with AWS Athena
Question
How can I access data in S3 with a SQL query from my Metaflow flow?
Solution
You can run queries in any way you normally interact with AWS from your Python script in a Metaflow task. In addition to storing data in AWS S3, some helpful tools for doing this include AWS Glue and AWS Athena. AWS Glue is a managed extract, transform, and load (ETL) service. AWS Athena is a serverless SQL service that allows to you to run queries against Glue databases.
1Add Parquet files to AWS Glue DB
create_glue_db.py
import pandas as pd
import awswrangler as wr
def create_db(database_name, bucket_uri, table_name):
    dataset = pd.DataFrame({
        "id": [1, 2],
        "feature_1": ["foo", "bar"],
        "feature_2": ["fizz", "buzz"]}
    )
    try: 
        # create AWS Glue database query S3 data
        wr.catalog.create_database(name=database_name)
    except wr.exceptions.AlreadyExists as error:
        # if database exists, ignore this step
        print(f"{database_name} exists!")
    # store data in AWS Data Lake
    # here we use .parquet files
        # AWS Glue works with many other data formats
    _ = wr.s3.to_parquet(df=dataset, 
                         path=f"{bucket_uri}/dataset/",
                         dataset=True, 
                         database=database_name,
                         table=table_name)
2Run Flow
This flow shows how to:
- Access Parquet data with a SQL query using AWS Athena.
 - Transform a data set.
 - Write a pandas DataFrame to AWS S3 as 
.parquetfiles. 
sql_query_athena.py
from metaflow import FlowSpec, step, Parameter
import awswrangler as wr
from create_glue_db import create_db
class AWSQueryFlow(FlowSpec):
    
    bucket_uri = Parameter(
                    "bucket_uri", 
                    default="s3://outerbounds-how-tos"
                 )
    db_name = Parameter("database_name", 
                        default="test_db")
    table_name = Parameter("table_name", 
                           default="test_table")
    @step
    def start(self):
        create_db(self.db_name, self.bucket_uri, 
                  self.table_name)
        self.next(self.query)
    @step
    def query(self):
        QUERY = f"SELECT * FROM {self.table_name}"
        result = wr.athena.read_sql_query(
            QUERY, 
            database=self.db_name
        )
        self.dataset = result
        self.next(self.transform)
        
    @step
    def transform(self):
        concat = lambda x: x["feat_1"] + x["feat_2"]
        self.dataset["feat_12"] = self.dataset.apply(
            concat, 
            axis=1
        )
        self.next(self.write)
        
    @step
    def write(self):
        path = f"{self.bucket_uri}/dataset/"
        _ = wr.s3.to_parquet(df=self.dataset, 
                             mode="overwrite",
                             path=path,
                             dataset=True, 
                             database=self.db_name,
                             table=self.table_name)
        self.next(self.end)
        
    @step
    def end(self):
        print("Database is updated!")
if __name__ == '__main__':
    AWSQueryFlow()
python sql_query_athena.py run
3Access Artifacts Outside of Flow
The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.dataset. 
from metaflow import Flow
run_data = Flow('AWSQueryFlow').latest_run.data
run_data.dataset
| id | feature_1 | feature_2 | feature_12 | |
|---|---|---|---|---|
| 0 | 1 | foo | fizz | foofizz | 
| 1 | 2 | bar | buzz | barbuzz |