Skip to main content

Project utilities API

Prerequisites

Start with Project Assets basics before diving into the API reference.

ProjectFlow

Base class for all project flows. Inherit from ProjectFlow instead of FlowSpec:

from obproject import ProjectFlow

class MyFlow(ProjectFlow):
@step
def start(self):
self.prj.register_data("dataset", "data")
self.next(self.end)

Configuration

ProjectFlow reads configuration from two files:

obproject.toml - Project identity and settings:

project = "fraud-detection"

[dev-assets]
branch = "main" # Read assets from main branch during local dev

[dependencies]
include_pyproject_toml = true # Auto-apply pyproject.toml deps (default: true)

pyproject.toml - Python dependencies applied via @pypi_base:

[project]
dependencies = [
"scikit-learn>=1.3.0",
"pandas>=2.0.0",
]

prj Property

self.prj returns a ProjectContext with access to all project utilities. Initialized lazily on first access.

Attributes:

  • prj.project - Project name from config
  • prj.branch - Current write branch (from Metaflow @project)
  • prj.read_branch - Branch for reading assets (may differ during local dev)
  • prj.write_branch - Branch for writing assets
  • prj.asset - Low-level Asset client
  • prj.evals - Evaluation logger

Asset Registration

prj.register_data()

prj.register_data(name, artifact, annotations=None, tags=None, description=None)

Register a Metaflow artifact as a data asset.

ParameterTypeDescription
namestrAsset name (e.g., "user_transactions")
artifactstrArtifact name (must exist as self.<artifact>)
annotationsdictMetadata key-value pairs (values converted to strings)
tagsdictTags for categorization
descriptionstrHuman-readable description
self.features = compute_features(data)
self.prj.register_data("fraud_features", "features",
annotations={"n_samples": len(self.features)})

prj.register_external_data()

prj.register_external_data(name, blobs, kind, annotations=None, tags=None, description=None)

Register external data (S3, databases, etc.) as a data asset.

ParameterTypeDescription
namestrAsset name
blobslistURIs/references (e.g., ["s3://bucket/file.csv"])
kindstrData type (e.g., "s3", "database")
annotationsdictMetadata
tagsdictTags
descriptionstrDescription
self.prj.register_external_data("raw_logs",
blobs=["s3://data-lake/logs/2025-01-01/"],
kind="s3",
annotations={"size_gb": 450})

prj.register_model()

prj.register_model(name, artifact, annotations=None, tags=None, description=None)

Register a Metaflow artifact as a model asset.

ParameterTypeDescription
namestrAsset name (e.g., "fraud_classifier")
artifactstrArtifact name containing the model
annotationsdictModel metadata (accuracy, hyperparameters, etc.)
tagsdictTags (framework, algorithm, etc.)
descriptionstrDescription
self.model = RandomForestClassifier().fit(X, y)
self.prj.register_model("fraud_classifier", "model",
annotations={"accuracy": 0.95, "algorithm": "RandomForest"})

prj.register_external_model()

prj.register_external_model(name, blobs, kind, annotations=None, tags=None, description=None)

Register an external model (HuggingFace, checkpoints, etc.) as a model asset.

ParameterTypeDescription
namestrAsset name
blobslistURIs/references
kindstrModel type (e.g., "checkpoint", "huggingface")
annotationsdictMetadata
tagsdictTags
descriptionstrDescription
self.prj.register_external_model("base_llm",
blobs=["meta-llama/Llama-3.1-8B-Instruct"],
kind="huggingface",
annotations={"context_length": 8192})

Asset Consumption

prj.get_data()

prj.get_data(name, instance="latest")

Retrieve artifact data from a data asset registered with register_data().

ParameterTypeDescription
namestrAsset name
instancestrVersion: "latest", "latest-N", or "vN"

Returns: The artifact data

features = self.prj.get_data("fraud_features")
previous = self.prj.get_data("fraud_features", instance="latest-1")
note

Only works for artifact-based assets. For external assets or models, use prj.asset.consume_*_asset().

prj.asset.consume_data_asset()

prj.asset.consume_data_asset(name, instance="latest")

Low-level method returning the full asset reference.

Returns: Asset reference dict:

{
"id": "v123",
"created_by": {"entity_id": "FlowName/run_id/step/task"},
"data_properties": {
"data_kind": "artifact",
"annotations": {"key": "value"},
"blobs": []
}
}

prj.asset.consume_model_asset()

prj.asset.consume_model_asset(name, instance="latest")

Low-level method for consuming model assets.

Returns: Asset reference dict with model_properties instead of data_properties.

ref = self.prj.asset.consume_model_asset("fraud_classifier")
accuracy = float(ref["model_properties"]["annotations"]["accuracy"])

prj.asset.list_data_assets()

prj.asset.list_data_assets(tags=None)

List data assets in current project/branch.

ParameterTypeDescription
tagsdictFilter by tags (client-side filtering)

Returns: {"data": [...]}

prj.asset.list_model_assets()

prj.asset.list_model_assets(tags=None)

List model assets in current project/branch.

Returns: {"models": [...]}

danger

Tag filtering is client-side only. All assets are fetched, then filtered locally.


Standalone Asset Usage

Use Asset directly outside flow context (deployments, notebooks, scripts):

from obproject.assets import Asset

asset = Asset(
project="fraud-detection",
branch="main",
read_only=True # Required outside flow context
)

ref = asset.consume_model_asset("fraud_classifier")
ParameterTypeDescription
projectstrProject name
branchstrBranch name
read_onlyboolSet True outside flows (skips entity tracking)

When read_only=True:

  • Registration methods are no-ops
  • Consume methods use GET (no lineage tracking) instead of PUT

See Deployment Integration for full example.


Event Publishing

prj.publish_event()

prj.publish_event(name, payload=None)

Publish an event to trigger other flows.

ParameterTypeDescription
namestrEvent name (e.g., "retrain_model")
payloaddictJSON-serializable payload

Events are namespaced as prj.{project}.{branch}.{name}.

self.prj.publish_event("model_trained", payload={"accuracy": 0.95})

prj.safe_publish_event()

prj.safe_publish_event(name, payload=None)

Same as publish_event() but failures don't raise exceptions.

@project_trigger

Subscribe a flow to project events:

from obproject import ProjectFlow, project_trigger

@project_trigger(event="model_trained")
class EvaluationFlow(ProjectFlow):
@step
def start(self):
# Triggered when "model_trained" event is published
self.next(self.end)

The decorator resolves the full event name from project config.


Evaluation Logging

prj.evals.log()

prj.evals.log(message)

Log structured evaluation data with project/branch/run metadata.

ParameterTypeDescription
messagedict or strEvaluation data
self.prj.evals.log({
"model": "fraud_classifier",
"accuracy": 0.95,
"test_samples": 1000
})

Output includes a magic prefix for monitoring system ingestion.


Branch Resolution

ProjectContext determines read and write branches:

ContextWrite BranchRead Branch
DeployedDeployment branchSame as write
Local dev with [dev-assets]User branchConfig branch
Local dev without [dev-assets]User branchSame as write

Example with [dev-assets] branch = "main":

  • Reads from fraud-detection/main (production assets)
  • Writes to fraud-detection/user.alice (isolated experiments)

See Also