Advanced asset patterns
Start with Project Assets basics first. For complete API documentation, see Project utilities API.
Asset Versioning
Every register_data() or register_model() call creates a new versioned instance. Access specific versions for comparison, rollback, or A/B testing:
# Inside a ProjectFlow step:
# Latest (default)
current = self.prj.asset.consume_model_asset("fraud_model")
# Previous versions
previous = self.prj.asset.consume_model_asset("fraud_model", instance="latest-1")
baseline = self.prj.asset.consume_model_asset("fraud_model", instance="v42")
Version formats: "latest" (default), "latest-N" (N versions back), "vN" (specific version)
The self.prj object is only available inside ProjectFlow steps. For deployments or scripts outside flows, see Deployment Integration.
Metadata Tracking
Store metrics in annotations for cross-version comparison:
self.prj.register_model("fraud_model", "model",
annotations={
"accuracy": 0.94,
"f1_score": 0.91,
"training_samples": len(self.data)
})
See model-registry-project for a complete example.
Model Registry Patterns
Champion/Challenger Pattern
Track champion models using Metaflow run tags, not Asset API tags. This enables server-side queries to find the current champion.
from metaflow import Flow, Run
class EvaluateModel(ProjectFlow):
min_improvement = Parameter("min_improvement", default=0.02)
@step
def start(self):
# Load candidate (latest model)
candidate = self.prj.asset.consume_model_asset("fraud_model", instance="latest")
self.candidate_f1 = float(candidate["model_properties"]["annotations"]["f1_score"])
self.candidate_training_run = candidate["model_properties"]["annotations"]["training_run_id"]
# Find current champion via Metaflow run tags
self.champion_f1 = 0
try:
for run in Flow("TrainModelFlow").runs("champion"):
self.champion_f1 = float(run.data.f1_score)
break # Only one champion
except Exception:
pass # No champion yet
self.promote = (self.candidate_f1 - self.champion_f1) >= self.min_improvement
self.next(self.end)
@step
def end(self):
if self.promote:
# Promote: tag the training run as champion
self._set_champion(self.candidate_training_run)
def _set_champion(self, training_run_id):
"""Set a training run as champion by tagging it."""
# Remove champion tag from previous champion
for run in Flow("TrainModelFlow").runs("champion"):
run.remove_tag("champion")
break
# Tag new champion
Run(f"TrainModelFlow/{training_run_id}").add_tag("champion")
Asset API tags don't support server-side queries. Metaflow's Flow.runs("tag") provides efficient server-side filtering to find tagged runs.
Multi-Branch Model Promotion
Use branches for environment isolation (dev → staging → prod):
from obproject.assets import Asset
class PromoteModel(ProjectFlow):
source_branch = Parameter("source", default="dev")
target_branch = Parameter("target", default="staging")
@step
def start(self):
# Read from source branch (read_only=True for consuming only)
source = Asset(project=self.prj.project, branch=self.source_branch, read_only=True)
model_ref = source.consume_model_asset("fraud_model")
# Register in target branch
target = Asset(project=self.prj.project, branch=self.target_branch)
target.register_model_asset(
"fraud_model",
kind="promoted",
annotations={
"source_branch": self.source_branch,
"source_version": model_ref["id"]
})
self.next(self.end)
Automated Quality Gates
Automate promotion decisions based on metrics:
class QualityGate(ProjectFlow):
min_f1 = Parameter("min_f1", default=0.8)
@step
def validate(self):
ref = self.prj.asset.consume_model_asset("fraud_model")
f1 = float(ref["model_properties"]["annotations"]["f1_score"])
if f1 < self.min_f1:
self.prj.publish_event("quality_gate_failed")
raise ValueError(f"F1 {f1:.3f} below threshold {self.min_f1}")
self.prj.publish_event("promote_model")
self.next(self.end)
Data Asset Versioning
Feature Store Pattern
Pin feature versions for reproducibility:
class TrainModel(ProjectFlow):
feature_version = Parameter("features", default="latest")
@step
def start(self):
# For artifact-based data assets, use get_data()
self.features = self.prj.get_data(
"fraud_features",
instance=self.feature_version
)
self.model = train(self.features)
# Track lineage in annotations
self.prj.register_model("fraud_model", "model",
annotations={"feature_version": self.feature_version})
self.next(self.end)
Dataset Drift Detection
Compare distributions across versions to detect distribution shift:
class DriftMonitor(ProjectFlow):
@step
def start(self):
# Use get_data() for artifact-based data assets
self.current = self.prj.get_data("fraud_features")
self.previous = self.prj.get_data("fraud_features", instance="latest-1")
self.drift_score = calculate_ks_statistic(self.current, self.previous)
self.next(self.report)
@step
def report(self):
self.report = {"drift_score": self.drift_score}
self.prj.register_data("drift_reports", "report",
annotations={
"drift_score": self.drift_score,
"alert_level": "high" if self.drift_score > 0.1 else "normal"
})
if self.drift_score > 0.1:
self.prj.publish_event("retrain_required")
self.next(self.end)
Asset Naming Conventions
Name assets by source, not environment. Environment context comes from branches.
# ✅ Source-based names
self.prj.register_data("user_transactions", ...) # Source: transactions DB
self.prj.register_model("fraud_classifier", ...) # Role in system
# ❌ Environment adjectives (use branches instead)
self.prj.register_data("production_data", ...) # Branch = environment
self.prj.register_data("staging_features", ...) # Branch = environment
Why? Same asset name across dev, staging, prod branches enables clean promotion workflows and lineage tracking.
Using Annotations for Metadata
Store metadata in annotations for later inspection:
self.prj.register_data("user_features", "features",
annotations={
"source": "postgres",
"refresh": "daily",
"tier": "gold",
"n_rows": 100000
})
While prj.asset.list_data_assets(tags={...}) exists, filtering is performed client-side after fetching all assets. For queryable lifecycle state (like "champion"), use Metaflow run tags instead.
Deployment Integration
Serve models from the asset registry without redeployment. Use read_only=True when creating an Asset outside a Metaflow flow context:
from fastapi import FastAPI
from obproject.assets import Asset
import os
app = FastAPI()
asset = Asset(
project=os.environ["OB_PROJECT"],
branch=os.environ["OB_BRANCH"],
read_only=True # Required outside flow context
)
@app.on_event("startup")
async def load_model():
global model, model_version
ref = asset.consume_model_asset("fraud_model")
model_version = ref["id"] # Version ID like "v123"
# Load actual model from artifact reference
# model = load_from_ref(ref)
@app.post("/model/reload")
async def reload():
"""Hot-reload latest model."""
await load_model()
return {"version": model_version}
@app.get("/model/info")
async def info():
ref = asset.consume_model_asset("fraud_model")
return {
"version": ref["id"],
"metrics": ref["model_properties"]["annotations"]
}
Checkpoints and Model Assets
Use @checkpoint for fault tolerance during training. Use register_model() for the final deployable artifact:
from metaflow import checkpoint
class Training(ProjectFlow):
@checkpoint
@step
def train(self):
# Checkpoints handle preemption/crashes during training
for epoch in range(100):
train_epoch()
# Automatic checkpointing
# Register final model as asset for deployment
self.prj.register_model("fraud_model", "model",
annotations={"epochs": 100, "accuracy": 0.94})
Distinction:
- Checkpoints: Operational (resume from failure during training)
- Assets: Lifecycle (versioning, promotion, lineage, deployment)
See Also
- Project utilities API - Complete API reference for all
prjmethods - Project Assets basics - Introduction to data and model assets
- Project Structure - Organizing your project files