Project utilities API
Start with Project Assets basics before diving into the API reference.
ProjectFlow
Base class for all project flows. Inherit from ProjectFlow instead of FlowSpec:
from obproject import ProjectFlow
class MyFlow(ProjectFlow):
@step
def start(self):
self.prj.register_data("dataset", "data")
self.next(self.end)
Configuration
ProjectFlow reads configuration from two files:
obproject.toml - Project identity and settings:
project = "fraud-detection"
[dev-assets]
branch = "main" # Read assets from main branch during local dev
[dependencies]
include_pyproject_toml = true # Auto-apply pyproject.toml deps (default: true)
pyproject.toml - Python dependencies applied via @pypi_base:
[project]
dependencies = [
"scikit-learn>=1.3.0",
"pandas>=2.0.0",
]
prj Property
self.prj returns a ProjectContext with access to all project utilities. Initialized lazily on first access.
Attributes:
prj.project- Project name from configprj.branch- Current write branch (from Metaflow@project)prj.read_branch- Branch for reading assets (may differ during local dev)prj.write_branch- Branch for writing assetsprj.asset- Low-level Asset clientprj.evals- Evaluation logger
Asset Registration
prj.register_data()
prj.register_data(name, artifact, annotations=None, tags=None, description=None)
Register a Metaflow artifact as a data asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name (e.g., "user_transactions") |
artifact | str | Artifact name (must exist as self.<artifact>) |
annotations | dict | Metadata key-value pairs (values converted to strings) |
tags | dict | Tags for categorization |
description | str | Human-readable description |
self.features = compute_features(data)
self.prj.register_data("fraud_features", "features",
annotations={"n_samples": len(self.features)})
prj.register_external_data()
prj.register_external_data(name, blobs, kind, annotations=None, tags=None, description=None)
Register external data (S3, databases, etc.) as a data asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
blobs | list | URIs/references (e.g., ["s3://bucket/file.csv"]) |
kind | str | Data type (e.g., "s3", "database") |
annotations | dict | Metadata |
tags | dict | Tags |
description | str | Description |
self.prj.register_external_data("raw_logs",
blobs=["s3://data-lake/logs/2025-01-01/"],
kind="s3",
annotations={"size_gb": 450})
prj.register_model()
prj.register_model(name, artifact, annotations=None, tags=None, description=None)
Register a Metaflow artifact as a model asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name (e.g., "fraud_classifier") |
artifact | str | Artifact name containing the model |
annotations | dict | Model metadata (accuracy, hyperparameters, etc.) |
tags | dict | Tags (framework, algorithm, etc.) |
description | str | Description |
self.model = RandomForestClassifier().fit(X, y)
self.prj.register_model("fraud_classifier", "model",
annotations={"accuracy": 0.95, "algorithm": "RandomForest"})
prj.register_external_model()
prj.register_external_model(name, blobs, kind, annotations=None, tags=None, description=None)
Register an external model (HuggingFace, checkpoints, etc.) as a model asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
blobs | list | URIs/references |
kind | str | Model type (e.g., "checkpoint", "huggingface") |
annotations | dict | Metadata |
tags | dict | Tags |
description | str | Description |
self.prj.register_external_model("base_llm",
blobs=["meta-llama/Llama-3.1-8B-Instruct"],
kind="huggingface",
annotations={"context_length": 8192})
Asset Consumption
prj.get_data()
prj.get_data(name, instance="latest")
Retrieve artifact data from a data asset registered with register_data().
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
instance | str | Version: "latest", "latest-N", or "vN" |
Returns: The artifact data
features = self.prj.get_data("fraud_features")
previous = self.prj.get_data("fraud_features", instance="latest-1")
Only works for artifact-based assets. For external assets or models, use prj.asset.consume_*_asset().
prj.asset.consume_data_asset()
prj.asset.consume_data_asset(name, instance="latest")
Low-level method returning the full asset reference.
Returns: Asset reference dict:
{
"id": "v123",
"created_by": {"entity_id": "FlowName/run_id/step/task"},
"data_properties": {
"data_kind": "artifact",
"annotations": {"key": "value"},
"blobs": []
}
}
prj.asset.consume_model_asset()
prj.asset.consume_model_asset(name, instance="latest")
Low-level method for consuming model assets.
Returns: Asset reference dict with model_properties instead of data_properties.
ref = self.prj.asset.consume_model_asset("fraud_classifier")
accuracy = float(ref["model_properties"]["annotations"]["accuracy"])
prj.asset.list_data_assets()
prj.asset.list_data_assets(tags=None)
List data assets in current project/branch.
| Parameter | Type | Description |
|---|---|---|
tags | dict | Filter by tags (client-side filtering) |
Returns: {"data": [...]}
prj.asset.list_model_assets()
prj.asset.list_model_assets(tags=None)
List model assets in current project/branch.
Returns: {"models": [...]}
Tag filtering is client-side only. All assets are fetched, then filtered locally.
Standalone Asset Usage
Use Asset directly outside flow context (deployments, notebooks, scripts):
from obproject.assets import Asset
asset = Asset(
project="fraud-detection",
branch="main",
read_only=True # Required outside flow context
)
ref = asset.consume_model_asset("fraud_classifier")
| Parameter | Type | Description |
|---|---|---|
project | str | Project name |
branch | str | Branch name |
read_only | bool | Set True outside flows (skips entity tracking) |
When read_only=True:
- Registration methods are no-ops
- Consume methods use GET (no lineage tracking) instead of PUT
See Deployment Integration for full example.
Event Publishing
prj.publish_event()
prj.publish_event(name, payload=None)
Publish an event to trigger other flows.
| Parameter | Type | Description |
|---|---|---|
name | str | Event name (e.g., "retrain_model") |
payload | dict | JSON-serializable payload |
Events are namespaced as prj.{project}.{branch}.{name}.
self.prj.publish_event("model_trained", payload={"accuracy": 0.95})
prj.safe_publish_event()
prj.safe_publish_event(name, payload=None)
Same as publish_event() but failures don't raise exceptions.
@project_trigger
Subscribe a flow to project events:
from obproject import ProjectFlow, project_trigger
@project_trigger(event="model_trained")
class EvaluationFlow(ProjectFlow):
@step
def start(self):
# Triggered when "model_trained" event is published
self.next(self.end)
The decorator resolves the full event name from project config.
Evaluation Logging
prj.evals.log()
prj.evals.log(message)
Log structured evaluation data with project/branch/run metadata.
| Parameter | Type | Description |
|---|---|---|
message | dict or str | Evaluation data |
self.prj.evals.log({
"model": "fraud_classifier",
"accuracy": 0.95,
"test_samples": 1000
})
Output includes a magic prefix for monitoring system ingestion.
Branch Resolution
ProjectContext determines read and write branches:
| Context | Write Branch | Read Branch |
|---|---|---|
| Deployed | Deployment branch | Same as write |
Local dev with [dev-assets] | User branch | Config branch |
Local dev without [dev-assets] | User branch | Same as write |
Example with [dev-assets] branch = "main":
- Reads from
fraud-detection/main(production assets) - Writes to
fraud-detection/user.alice(isolated experiments)
See Also
- Advanced Asset Patterns - Versioning, champion/challenger, deployment integration
- Project Assets basics - Introduction to assets
- Project Structure - Project file organization