Run SQL Query with AWS Athena
Question
How can I access data in S3 with a SQL query from my Metaflow flow?
Solution
You can run queries in any way you normally interact with AWS from your Python script in a Metaflow task. In addition to storing data in AWS S3, some helpful tools for doing this include AWS Glue and AWS Athena. AWS Glue is a managed extract, transform, and load (ETL) service. AWS Athena is a serverless SQL service that allows to you to run queries against Glue databases.
1Add Parquet files to AWS Glue DB
create_glue_db.py
import pandas as pd
import awswrangler as wr
def create_db(database_name, bucket_uri, table_name):
dataset = pd.DataFrame({
"id": [1, 2],
"feature_1": ["foo", "bar"],
"feature_2": ["fizz", "buzz"]}
)
try:
# create AWS Glue database query S3 data
wr.catalog.create_database(name=database_name)
except wr.exceptions.AlreadyExists as error:
# if database exists, ignore this step
print(f"{database_name} exists!")
# store data in AWS Data Lake
# here we use .parquet files
# AWS Glue works with many other data formats
_ = wr.s3.to_parquet(df=dataset,
path=f"{bucket_uri}/dataset/",
dataset=True,
database=database_name,
table=table_name)
2Run Flow
This flow shows how to:
- Access Parquet data with a SQL query using AWS Athena.
- Transform a data set.
- Write a pandas DataFrame to AWS S3 as
.parquet
files.
sql_query_athena.py
from metaflow import FlowSpec, step, Parameter
import awswrangler as wr
from create_glue_db import create_db
class AWSQueryFlow(FlowSpec):
bucket_uri = Parameter(
"bucket_uri",
default="s3://outerbounds-how-tos"
)
db_name = Parameter("database_name",
default="test_db")
table_name = Parameter("table_name",
default="test_table")
@step
def start(self):
create_db(self.db_name, self.bucket_uri,
self.table_name)
self.next(self.query)
@step
def query(self):
QUERY = f"SELECT * FROM {self.table_name}"
result = wr.athena.read_sql_query(
QUERY,
database=self.db_name
)
self.dataset = result
self.next(self.transform)
@step
def transform(self):
concat = lambda x: x["feat_1"] + x["feat_2"]
self.dataset["feat_12"] = self.dataset.apply(
concat,
axis=1
)
self.next(self.write)
@step
def write(self):
path = f"{self.bucket_uri}/dataset/"
_ = wr.s3.to_parquet(df=self.dataset,
mode="overwrite",
path=path,
dataset=True,
database=self.db_name,
table=self.table_name)
self.next(self.end)
@step
def end(self):
print("Database is updated!")
if __name__ == '__main__':
AWSQueryFlow()
python sql_query_athena.py run
3Access Artifacts Outside of Flow
The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.dataset
.
from metaflow import Flow
run_data = Flow('AWSQueryFlow').latest_run.data
run_data.dataset
id | feature_1 | feature_2 | feature_12 | |
---|---|---|---|---|
0 | 1 | foo | fizz | foofizz |
1 | 2 | bar | buzz | barbuzz |