Scale Model Training and Tuning to GPU
Question
How do I scale model training and hyperparameter tuning to GPUs with Metaflow?
Solution
To scale horizontally, you can structure Metaflow flows with branching and looping patterns. To scale vertically you can access more processor and memory resources for a task in your flow by using Metaflow's @batch
and @kubernetes
decorators.
1Define PyTorch Dependencies
This example will show how to tune a PyTorch model on GPUs. The script contains functions to
- Load data.
- Instantiate a neural net.
- Train and evaluate a neural net.
The original code for the example comes from the PyTorch documentation.
torch_steps.py
import torch
import torchvision
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
# flatten all dims except batch
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def load_data():
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
batch_size = 4
trainset = torchvision.datasets.CIFAR10(
root='./data',
train=True,
download=True,
transform=transform
)
trainloader = torch.utils.data.DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
num_workers=2
)
testset = torchvision.datasets.CIFAR10(
root='./data',
train=False,
download=True,
transform=transform
)
testloader = torch.utils.data.DataLoader(
testset,
batch_size=batch_size,
shuffle=False,
num_workers=2
)
classes = ('plane', 'car', 'bird', 'cat','deer',
'dog', 'frog', 'horse', 'ship', 'truck')
return trainloader, testloader, classes
def train_model(trainloader, lr, epochs=1):
net = Net()
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(),
lr=lr, momentum=0.9)
for epoch in range(epochs):
for i, data in enumerate(trainloader, 0):
inputs = data[0].to(device)
labels = data[1].to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
return net
def run_inference_and_tests(net, testloader):
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return 100 * correct // total
2Run Flow
This flow leverages the functions in torch_steps.py
to:
- Create dataloaders for training and testing.
- Train multiple models in parallel on GPU instances using Metaflow's
@batch
decorator in thetrain
step.- In this case each model will get a different learning rate assigned. This is simple for demo purposes, in practice you will want to explore more dimensions of hyperparameter space when you have the resources.
- Evaluate each model.
- Join the results of evaluation to select the best model.
- Print the highest accuracy model.
- You can improve the model by changing hyperparameters in
torch_steps.py
or by expanding the hyperparameter tuning task with more branches in the flow.
- You can improve the model by changing hyperparameters in
The example uses Metaflow's @conda
decorator to create a similar environment across local and remote compute.
scale_tuning_gpu.py
from metaflow import (FlowSpec, step, Parameter,
batch, JSONType, conda_base)
import json
import torch_steps
@conda_base(libraries={"pytorch":"1.11.0",
"torchvision":"0.12.0"},
python="3.8")
class GPUFlow(FlowSpec):
learning_rates = Parameter(
'learning-rates',
default=json.dumps([0.01,0.001]),
type=JSONType
)
@step
def start(self):
data = torch_steps.load_data()
self.trainloader = data[0]
self.testloader = data[1]
self.classes = data[2]
self.next(self.train, foreach='learning_rates')
@batch(gpu=1)
@step
def train(self):
self.model = torch_steps.train_model(
self.trainloader,
lr=self.input
)
self.next(self.evaluate_model)
@step
def evaluate_model(self):
result = torch_steps.run_inference_and_tests(
self.model,
self.testloader
)
self.accuracy = result
self.next(self.join)
@step
def join(self, inputs):
best_model = None; best_score = -1
for i in inputs:
if i.accuracy > best_score:
best_score = i.accuracy
best_model = i.model
self.best_model = best_model
self.best_score = best_score
print(f"Best model accuracy was {best_score}%.")
self.next(self.end)
@step
def end(self):
print("Done")
if __name__ == "__main__":
GPUFlow()
python scale_tuning_gpu.py --environment=conda run