Deploy with Sagemaker
Question
How can I train a model in my flow and deploy it with Sagemaker?
Solution
Sagemaker allows you to host a model on an EC2 instance and gives you an endpoint you can make requests to.
First there are some configuration details you will need to address to deploy a model from your flow.
1Set Sagemaker IAM Role
To use Sagemaker as shown on this page you will need an appropriate IAM Role configured for Sagemaker execution.
AWS has managed policies you can use for this like AmazonSageMakerFullAccess
or the DataScientist
role.
2Configuration
Next we will configure important environment variables. In this example a .env
file is used to manage environment variables for:
- An IAM role defined for Sagemaker Execution.
- An S3 prefix URI where Sagemaker can upload your custom code.
ROLE=<YOUR IAM ROLE>
CODE_LOCATION=<S3 URI TO PUSH YOUR CUSTOM CODE TO>
3Define Sagemaker Entry Point
Here is the file defining the entry point that will be passed to the Sagemaker deployment.
This file includes the model_fn
function which will load the model when a prediction is requested.
There are other functions related to model serving that are not included in this file because the SageMaker scikit-learn model server has a default implementation for those functions.
import joblib
import os
def model_fn(model_dir):
return joblib.load(os.path.join(model_dir, "model/model.joblib"))
4Define Deployment Logic
The following script contains a light wrapper around the SageMaker SDK that will be called from the flow:
import os
import joblib
import shutil
import tarfile
import numpy as np
from sklearn.base import BaseEstimator
from sagemaker.sklearn import SKLearnModel
from metaflow import S3
from dotenv import load_dotenv
load_dotenv('my.env')
def to_sagemaker(
model:BaseEstimator = None,
sagemaker_model_name:str = "model",
model_save_name:str = "model",
endpoint_name:str = "sklearn endpoint",
instance_type:str = "ml.c5.2xlarge",
entry_point:str = "sagemaker_entry_point.py",
sklearn_version:str = "1.0-1",
role:str = os.getenv('ROLE'),
code_location:str = os.getenv('CODE_LOCATION'),
run = None,
):
# save model to local folder
# this should match what is in sagemaker_entry_point
model_save_name = "model"
os.makedirs(model_save_name, exist_ok=True)
out_path = "{}/{}.joblib".format(model_save_name, model_save_name)
joblib.dump(model, out_path)
# save model as tar.gz
local_tar_name = "{}.tar.gz".format(model_save_name)
with tarfile.open(local_tar_name,
mode="w:gz") as _tar:
_tar.add(model_save_name, recursive=True)
# save model onto S3
with S3(run=run) as s3:
with open(local_tar_name, "rb") as in_file:
data = in_file.read()
model_s3_path = s3.put(local_tar_name, data)
# remove local model folder and tar
shutil.rmtree(model_save_name)
os.remove(local_tar_name)
print("Creating and deploying Sagemaker model...")
sklearn_model = SKLearnModel(
name=sagemaker_model_name,
model_data=model_s3_path,
role=role,
entry_point=entry_point,
framework_version=sklearn_version,
code_location=code_location
)
predictor = sklearn_model.deploy(
instance_type=instance_type,
initial_instance_count=1,
endpoint_name=endpoint_name
)
return model_s3_path
5Run Flow
Here is a flow that shows how to:
- Train a scikit-learn model.
- Save the model on S3.
- Deploy the model with Sagemaker.
- Verify the model is predicting the same locally and in deployment.
- (optionally) Clean up the Sagemaker deployment resources.
from metaflow import FlowSpec, step, S3, conda_base
import os
import json
class DeployToSagemakerFlow(FlowSpec):
@step
def start(self):
from sklearn import datasets
from sklearn.model_selection import train_test_split
self.iris = datasets.load_iris()
X, y = self.iris['data'], self.iris['target']
self.labels = self.iris['target_names']
split = train_test_split(X, y, test_size=0.2)
self.X_train, self.X_test = split[0], split[1]
self.y_train, self.y_test = split[2], split[3]
self.next(self.train_rf_model)
@step
def train_rf_model(self):
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
self.clf = RandomForestClassifier(random_state=0)
self.clf.fit(self.X_train, self.y_train)
# will use local preds as test of `deploy` step.
self.local_y_pred = self.clf.predict(self.X_test)
self.next(self.deploy)
@step
def deploy(self):
import time
import deployer
t = int(round(time.time() * 1000))
self.sagemaker_model_name = 'rf-model-{}'.format(t)
self.model_save_name = 'model'
self.endpoint_name = 'rf-endpoint-{}'.format(t)
self.instance_type = 'ml.c5.2xlarge'
self.entry_point = 'sagemaker_entry_point.py'
self.sklearn_sage_version = '1.0-1'
self.model_s3_path = deployer.to_sagemaker(
model = self.clf,
sagemaker_model_name = self.sagemaker_model_name,
model_save_name = self.model_save_name,
endpoint_name = self.endpoint_name,
instance_type = self.instance_type,
entry_point = self.entry_point,
sklearn_version = self.sklearn_sage_version,
role = os.getenv('ROLE'),
code_location=os.getenv('CODE_LOCATION'),
run = self
)
self.next(self.end)
@step
def end(self):
print("\nModel name is: {}".format(
self.sagemaker_model_name))
print("Endpoint name is: {}\n".format(
self.endpoint_name))
if __name__ == "__main__":
DeployToSagemakerFlow()
python deploy_to_sagemaker.py run
You can see your endpoint by going to the Sagemaker section of AWS console and clicking > inference > endpoints
. It will take a few minutes for the model and endpoint to create.
6Make Prediction
Now you can make a prediction with your deployed model! You can run the following code in a notebook cell or Python script after executing the flow. The snippet uses data that Metaflow has stored to:
- Load
X_test
data from thestart
step. - Send the data to the endpoint to request predictions.
- Parse the response.
- Ensure the deployed model predictions are the same as those of the local model version.
from metaflow import Flow
import boto3
import pandas as pd
import numpy as np
import re
sagemaker_runtime = boto3.client("sagemaker-runtime",
region_name='us-east-2')
# load metaflow run data
run_data = Flow('DeployToSagemakerFlow').latest_run.data
# request prediction from model
response = sagemaker_runtime.invoke_endpoint(
EndpointName=run_data.endpoint_name,
Body=pd.DataFrame(run_data.X_test).to_csv(
header=False, index=False),
ContentType="text/csv",
)
# parse response
predictions = [int(re.sub("[^0-9]", "", s))
for s in response["Body"]
.read().decode("utf-8").split(',')]
# ensure deployed model is behaving as expected
assert np.all(run_data.local_y_pred == predictions)
accuracy = sum(run_data.y_test == predictions) / len(predictions)
print("Accuracy: {}".format(round(accuracy, 3)))
7Clean up Sagemaker Resources
from sagemaker import Session
from metaflow import Flow
run_data = Flow('DeployToSagemakerFlow').latest_run.data
model_name=run_data.sagemaker_model_name
endpoint_name=run_data.endpoint_name
sagemaker_session = Session()
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_model(ModelName=model_name)