Skip to main content

Advanced asset patterns

Prerequisites

Start with Project Assets basics first. For complete API documentation, see Project utilities API.

Asset Versioning

Every register_data() or register_model() call creates a new versioned instance. Access specific versions for comparison, rollback, or A/B testing:

# Inside a ProjectFlow step:
# Latest (default)
current = self.prj.asset.consume_model_asset("fraud_model")

# Previous versions
previous = self.prj.asset.consume_model_asset("fraud_model", instance="latest-1")
baseline = self.prj.asset.consume_model_asset("fraud_model", instance="v42")

Version formats: "latest" (default), "latest-N" (N versions back), "vN" (specific version)

Flow context required

The self.prj object is only available inside ProjectFlow steps. For deployments or scripts outside flows, see Deployment Integration.

Metadata Tracking

Store metrics in annotations for cross-version comparison:

self.prj.register_model("fraud_model", "model",
annotations={
"accuracy": 0.94,
"f1_score": 0.91,
"training_samples": len(self.data)
})

See model-registry-project for a complete example.


Model Registry Patterns

Champion/Challenger Pattern

Track champion models using Metaflow run tags, not Asset API tags. This enables server-side queries to find the current champion.

from metaflow import Flow, Run

class EvaluateModel(ProjectFlow):
min_improvement = Parameter("min_improvement", default=0.02)

@step
def start(self):
# Load candidate (latest model)
candidate = self.prj.asset.consume_model_asset("fraud_model", instance="latest")
self.candidate_f1 = float(candidate["model_properties"]["annotations"]["f1_score"])
self.candidate_training_run = candidate["model_properties"]["annotations"]["training_run_id"]

# Find current champion via Metaflow run tags
self.champion_f1 = 0
try:
for run in Flow("TrainModelFlow").runs("champion"):
self.champion_f1 = float(run.data.f1_score)
break # Only one champion
except Exception:
pass # No champion yet

self.promote = (self.candidate_f1 - self.champion_f1) >= self.min_improvement
self.next(self.end)

@step
def end(self):
if self.promote:
# Promote: tag the training run as champion
self._set_champion(self.candidate_training_run)

def _set_champion(self, training_run_id):
"""Set a training run as champion by tagging it."""
# Remove champion tag from previous champion
for run in Flow("TrainModelFlow").runs("champion"):
run.remove_tag("champion")
break
# Tag new champion
Run(f"TrainModelFlow/{training_run_id}").add_tag("champion")
Why Metaflow tags instead of Asset tags?

Asset API tags don't support server-side queries. Metaflow's Flow.runs("tag") provides efficient server-side filtering to find tagged runs.

See complete implementation →

Multi-Branch Model Promotion

Use branches for environment isolation (devstagingprod):

from obproject.assets import Asset

class PromoteModel(ProjectFlow):
source_branch = Parameter("source", default="dev")
target_branch = Parameter("target", default="staging")

@step
def start(self):
# Read from source branch (read_only=True for consuming only)
source = Asset(project=self.prj.project, branch=self.source_branch, read_only=True)
model_ref = source.consume_model_asset("fraud_model")

# Register in target branch
target = Asset(project=self.prj.project, branch=self.target_branch)
target.register_model_asset(
"fraud_model",
kind="promoted",
annotations={
"source_branch": self.source_branch,
"source_version": model_ref["id"]
})

self.next(self.end)

Automated Quality Gates

Automate promotion decisions based on metrics:

class QualityGate(ProjectFlow):
min_f1 = Parameter("min_f1", default=0.8)

@step
def validate(self):
ref = self.prj.asset.consume_model_asset("fraud_model")
f1 = float(ref["model_properties"]["annotations"]["f1_score"])

if f1 < self.min_f1:
self.prj.publish_event("quality_gate_failed")
raise ValueError(f"F1 {f1:.3f} below threshold {self.min_f1}")

self.prj.publish_event("promote_model")
self.next(self.end)

Data Asset Versioning

Feature Store Pattern

Pin feature versions for reproducibility:

class TrainModel(ProjectFlow):
feature_version = Parameter("features", default="latest")

@step
def start(self):
# For artifact-based data assets, use get_data()
self.features = self.prj.get_data(
"fraud_features",
instance=self.feature_version
)

self.model = train(self.features)

# Track lineage in annotations
self.prj.register_model("fraud_model", "model",
annotations={"feature_version": self.feature_version})

self.next(self.end)

Dataset Drift Detection

Compare distributions across versions to detect distribution shift:

class DriftMonitor(ProjectFlow):
@step
def start(self):
# Use get_data() for artifact-based data assets
self.current = self.prj.get_data("fraud_features")
self.previous = self.prj.get_data("fraud_features", instance="latest-1")

self.drift_score = calculate_ks_statistic(self.current, self.previous)
self.next(self.report)

@step
def report(self):
self.report = {"drift_score": self.drift_score}

self.prj.register_data("drift_reports", "report",
annotations={
"drift_score": self.drift_score,
"alert_level": "high" if self.drift_score > 0.1 else "normal"
})

if self.drift_score > 0.1:
self.prj.publish_event("retrain_required")

self.next(self.end)

Asset Naming Conventions

Name assets by source, not environment. Environment context comes from branches.

# ✅ Source-based names
self.prj.register_data("user_transactions", ...) # Source: transactions DB
self.prj.register_model("fraud_classifier", ...) # Role in system

# ❌ Environment adjectives (use branches instead)
self.prj.register_data("production_data", ...) # Branch = environment
self.prj.register_data("staging_features", ...) # Branch = environment

Why? Same asset name across dev, staging, prod branches enables clean promotion workflows and lineage tracking.

Using Annotations for Metadata

Store metadata in annotations for later inspection:

self.prj.register_data("user_features", "features",
annotations={
"source": "postgres",
"refresh": "daily",
"tier": "gold",
"n_rows": 100000
})
Tag filtering limitations

While prj.asset.list_data_assets(tags={...}) exists, filtering is performed client-side after fetching all assets. For queryable lifecycle state (like "champion"), use Metaflow run tags instead.


Deployment Integration

Serve models from the asset registry without redeployment. Use read_only=True when creating an Asset outside a Metaflow flow context:

from fastapi import FastAPI
from obproject.assets import Asset
import os

app = FastAPI()
asset = Asset(
project=os.environ["OB_PROJECT"],
branch=os.environ["OB_BRANCH"],
read_only=True # Required outside flow context
)

@app.on_event("startup")
async def load_model():
global model, model_version
ref = asset.consume_model_asset("fraud_model")
model_version = ref["id"] # Version ID like "v123"
# Load actual model from artifact reference
# model = load_from_ref(ref)

@app.post("/model/reload")
async def reload():
"""Hot-reload latest model."""
await load_model()
return {"version": model_version}

@app.get("/model/info")
async def info():
ref = asset.consume_model_asset("fraud_model")
return {
"version": ref["id"],
"metrics": ref["model_properties"]["annotations"]
}

Checkpoints and Model Assets

Use @checkpoint for fault tolerance during training. Use register_model() for the final deployable artifact:

from metaflow import checkpoint

class Training(ProjectFlow):
@checkpoint
@step
def train(self):
# Checkpoints handle preemption/crashes during training
for epoch in range(100):
train_epoch()
# Automatic checkpointing

# Register final model as asset for deployment
self.prj.register_model("fraud_model", "model",
annotations={"epochs": 100, "accuracy": 0.94})

Distinction:

  • Checkpoints: Operational (resume from failure during training)
  • Assets: Lifecycle (versioning, promotion, lineage, deployment)

See Also