Project utilities API
Start with Project Assets basics before diving into the API reference.
ProjectFlow
Base class for all project flows. Inherit from ProjectFlow instead of FlowSpec:
from obproject import ProjectFlow
class MyFlow(ProjectFlow):
@step
def start(self):
self.prj.register_data("dataset", "data")
self.next(self.end)
Configuration
ProjectFlow reads configuration from two files:
obproject.toml - Project identity and settings:
project = "fraud-detection"
[dev-assets]
branch = "main" # Read assets from main branch during local dev
[dependencies]
include_pyproject_toml = true # Auto-apply pyproject.toml deps (default: true)
| Section | Key | Default | Description |
|---|---|---|---|
| (top-level) | cicd_user | "${project//_/-}-cicd" | Override the machine-user name CI uses when calling outerbounds service-principal-configure. See CI/CD integration for the convention. |
[dev-assets] | branch | - | Branch to read assets from during local development |
[dependencies] | include_pyproject_toml | true | Auto-apply @pypi_base from pyproject.toml |
pyproject.toml - Python dependencies applied via @pypi_base:
[project]
dependencies = [
"scikit-learn>=1.3.0",
"pandas>=2.0.0",
]
prj Property
self.prj returns a ProjectContext with access to all project utilities. Initialized lazily on first access.
Attributes:
prj.project- Project name from configprj.branch- Current write branch (from Metaflow@project)prj.read_branch- Branch for reading assets (may differ during local dev)prj.write_branch- Branch for writing assetsprj.asset- Low-level Asset clientprj.evals- Evaluation logger
Asset Registration
Registration writes to two stores: the asset catalog (instances + lineage) and the project's flowproject metadata (declaration list shown in the UI Overview). Each register_* call adds the asset name to metadata if it isn't already declared. No effect if the project hasn't been deployed yet (no metadata exists). To later remove an asset from both stores, see Asset Deletion.
prj.register_data()
prj.register_data(name, artifact, annotations=None, tags=None, description=None)
Register a Metaflow artifact as a data asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name (e.g., "user_transactions") |
artifact | str | Artifact name (must exist as self.<artifact>) |
annotations | dict | Metadata key-value pairs (values converted to strings) |
tags | dict | Tags for categorization |
description | str | Human-readable description |
self.features = compute_features(data)
self.prj.register_data("fraud_features", "features",
annotations={"n_samples": len(self.features)})
prj.register_external_data()
prj.register_external_data(name, blobs, kind, annotations=None, tags=None, description=None)
Register external data (S3, databases, etc.) as a data asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
blobs | list | URIs/references (e.g., ["s3://bucket/file.csv"]) |
kind | str | Data type (e.g., "s3", "database") |
annotations | dict | Metadata |
tags | dict | Tags |
description | str | Description |
self.prj.register_external_data("raw_logs",
blobs=["s3://data-lake/logs/2025-01-01/"],
kind="s3",
annotations={"size_gb": 450})
prj.register_model()
prj.register_model(name, artifact, annotations=None, tags=None, description=None)
Register a Metaflow artifact as a model asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name (e.g., "fraud_classifier") |
artifact | str | Artifact name containing the model |
annotations | dict | Model metadata (accuracy, hyperparameters, etc.) |
tags | dict | Tags (framework, algorithm, etc.) |
description | str | Description |
self.model = RandomForestClassifier().fit(X, y)
self.prj.register_model("fraud_classifier", "model",
annotations={"accuracy": 0.95, "algorithm": "RandomForest"})
prj.register_external_model()
prj.register_external_model(name, blobs, kind, annotations=None, tags=None, description=None)
Register an external model (HuggingFace, checkpoints, etc.) as a model asset.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
blobs | list | URIs/references |
kind | str | Model type (e.g., "checkpoint", "huggingface") |
annotations | dict | Metadata |
tags | dict | Tags |
description | str | Description |
self.prj.register_external_model("base_llm",
blobs=["meta-llama/Llama-3.1-8B-Instruct"],
kind="huggingface",
annotations={"context_length": 8192})
Asset Consumption
prj.get_data()
prj.get_data(name, instance="latest")
Retrieve artifact data from a data asset registered with register_data().
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
instance | str | Version: "latest", "latest-N", or "vN" |
Returns: The artifact data
features = self.prj.get_data("fraud_features")
previous = self.prj.get_data("fraud_features", instance="latest-1")
Only works for artifact-based assets. For external data, use prj.asset.consume_data_asset().
prj.get_model()
prj.get_model(name, instance="latest")
Retrieve artifact data from a model asset registered with register_model().
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name |
instance | str | Version: "latest", "latest-N", or "vN" |
Returns: The model artifact data
model = self.prj.get_model("fraud_classifier")
previous_model = self.prj.get_model("fraud_classifier", instance="latest-1")
Only works for artifact-based models. For external models (checkpoints, HuggingFace, etc.), use prj.asset.consume_model_asset() and load from the returned blobs.
prj.asset.consume_data_asset()
prj.asset.consume_data_asset(name, instance="latest")
Low-level method returning the full asset reference.
Returns: Asset reference dict:
{
"id": "v123",
"created_by": {"entity_id": "FlowName/run_id/step/task"},
"data_properties": {
"data_kind": "artifact",
"annotations": {"key": "value"},
"blobs": []
}
}
prj.asset.consume_model_asset()
prj.asset.consume_model_asset(name, instance="latest")
Low-level method for consuming model assets.
Returns: Asset reference dict with model_properties instead of data_properties.
ref = self.prj.asset.consume_model_asset("fraud_classifier")
accuracy = float(ref["model_properties"]["annotations"]["accuracy"])
prj.asset.list_data_assets()
prj.asset.list_data_assets(tags=None)
List data assets in current project/branch.
| Parameter | Type | Description |
|---|---|---|
tags | dict | Filter by tags (client-side filtering) |
Returns: {"data": [...]}
prj.asset.list_model_assets()
prj.asset.list_model_assets(tags=None)
List model assets in current project/branch.
Returns: {"models": [...]}
Tag filtering is client-side only. All assets are fetched, then filtered locally.
Asset Deletion
prj.asset.delete_data_asset()
prj.asset.delete_data_asset(name)
Hard delete a data asset and every instance attached to its name. Use this to prune orphan IDs left in the catalog after a rename. The call also removes the asset from the project's flowproject metadata so the UI Overview reflects the change. Idempotent: re-deleting a name that's already gone is a no-op.
| Parameter | Type | Description |
|---|---|---|
name | str | Asset name/id |
Returns: DeleteResult(catalog_deleted: bool, metadata_updated: bool) — catalog_deleted is False when the name was already absent from the catalog; metadata_updated is False when the name wasn't in the spec (or no flowproject metadata exists yet).
Raises RuntimeError on read-only clients or when the metadata sync fails after a successful catalog DELETE (the error message names both halves). The catalog has no soft-delete state — deletion removes every instance, alias, and lineage edge for the asset name. To remove an entire branch's metadata at once, use outerbounds flowproject teardown-branch.
prj.asset.delete_model_asset()
prj.asset.delete_model_asset(name)
Same as delete_data_asset() but for model assets.
Standalone Asset Usage
Use Asset directly outside flow context (deployments, notebooks, scripts):
from obproject.assets import Asset
asset = Asset(
project="fraud-detection",
branch="main",
read_only=True # for read-only access
)
ref = asset.consume_model_asset("fraud_classifier")
| Parameter | Type | Description |
|---|---|---|
project | str | Project name |
branch | str | Branch name |
entity_ref | dict | Pass for writes outside a flow; e.g. {"entity_kind": "user", "entity_id": "cleanup-script"}. The default resolves from current.pathspec, which is None outside a Metaflow run and is rejected by the backend. |
read_only | bool | Set True for read-only access (skips entity tracking) |
When read_only=True:
- Registration methods are no-ops
- Consume methods use GET (no lineage tracking) instead of PUT
- Delete methods raise
RuntimeError
For writes from a standalone script (register, consume, delete), pass an explicit entity_ref — current.pathspec isn't available outside a flow.
Cleanup scripts
Writable standalone clients are the right path for admin or CI cleanup. The script construct an Asset with an explicit user-entity ref, then loops over names to delete:
from obproject.assets import Asset
asset = Asset(
project="my-project",
branch="main",
entity_ref={"entity_kind": "user", "entity_id": "cleanup-script"},
)
for name in ("obsolete_dataset", "old_features"):
result = asset.delete_data_asset(name)
print(f"{name}: catalog={result.catalog_deleted} metadata={result.metadata_updated}")
Each call removes the asset from both the catalog and the flowproject metadata. DeleteResult lets the script render meaningful progress (e.g., already absent vs metadata only (catalog already absent) vs deleted (catalog+metadata)). Reruns are safe — names that are already gone return DeleteResult(False, False).
Scheduling & Triggering
Metaflow flows can be started by a time-based schedule or by an event published from another flow. The decorators below are project-aware wrappers around Metaflow's native @schedule and @trigger - they automatically scope to the correct project and branch so each deployed branch operates independently.
@project_schedule
Apply different schedules depending on which branch the flow is deployed to. If the branch doesn't match any pattern, no schedule is applied (the decorator is a no-op).
This wraps Metaflow's @schedule with branch-aware routing: production can run on a tight cron while staging runs daily and feature branches get no schedule at all.
from obproject import ProjectFlow, project_schedule
@project_schedule({
"main": {"cron": "0 8 * * 1-5", "timezone": "America/New_York"},
"develop": {"daily": True},
"release/*": {"hourly": True},
})
class MyFlow(ProjectFlow):
@step
def start(self):
self.next(self.end)
| Parameter | Type | Description |
|---|---|---|
schedule_map | dict | Maps branch glob patterns to schedule specs |
Each schedule spec is a dict with keys matching Metaflow's @schedule parameters:
| Key | Type | Description |
|---|---|---|
cron | str | Cron expression (e.g., "0 8 * * 1-5") |
daily | bool | Run daily (default if empty spec {} is given) |
weekly | bool | Run weekly |
hourly | bool | Run hourly |
timezone | str | IANA timezone (e.g., "America/New_York") |
Behavior:
- Patterns are matched using
fnmatchglob syntax (e.g.,release/*matchesrelease/v2) - First matching pattern wins (dict insertion order), so place more specific patterns first
- If no pattern matches the deployed branch, no schedule is created
- Cannot be combined with an explicit
@scheduledecorator on the same flow
@project_trigger
Subscribe a flow to project events published by other flows via prj.publish_event():
from obproject import ProjectFlow, project_trigger
@project_trigger(event="model_trained")
class EvaluationFlow(ProjectFlow):
@step
def start(self):
# Triggered when "model_trained" event is published
self.next(self.end)
The decorator resolves the full event name (prj.{project}.{branch}.{event}) from project config, so triggers are automatically scoped to the same branch.
Event Publishing
prj.publish_event()
prj.publish_event(name, payload=None)
Publish an event to trigger flows decorated with @project_trigger.
| Parameter | Type | Description |
|---|---|---|
name | str | Event name (must match the event= in a @project_trigger) |
payload | dict | JSON-serializable payload |
Events are namespaced as prj.{project}.{branch}.{name}, so events published on one branch only trigger flows deployed on the same branch.
self.prj.publish_event("model_trained", payload={"accuracy": 0.95})
prj.safe_publish_event()
prj.safe_publish_event(name, payload=None)
Same as publish_event() but failures don't raise exceptions.
ProjectEvent
from obproject.project_events import ProjectEvent
ProjectEvent(name, project=None, branch=None).publish(payload=None)
ProjectEvent(name, project=None, branch=None).safe_publish(payload=None)
Publish events from outside a flow - notebooks, deployed apps, scripts. Inside a @step, prefer prj.publish_event() above.
| Parameter | Type | Description |
|---|---|---|
name | str | Event name (must match the event= in a @project_trigger) |
project | str | Project name. If None, reads OB_PROJECT env or current.project_name. |
branch | str | Branch name. If None, reads OB_BRANCH env or extracts from current.branch_name. |
payload | dict | JSON-serializable payload |
ProjectEvent("model_trained", project="my_prj", branch="main").safe_publish(
payload={"accuracy": 0.95}
)
Events resolve to prj.{project}.{branch}.{name}, the same namespace consumed by @project_trigger. The equivalent low-level Metaflow call is ArgoEvent("prj.my_prj.main.model_trained").safe_publish(payload=...).
Asset Promotion
promote_assets()
from obproject.assets import promote_assets
promote_assets(project, source, target, alias="candidate")
Promote assets from one branch to another by copying metadata pointers (the underlying data is not duplicated). Each promoted instance gets an alias on the target branch for stable referencing.
| Parameter | Type | Default | Description |
|---|---|---|---|
project | str | Project name | |
source | str | Source branch name | |
target | str | Target branch name | |
kinds | list | ["data", "models"] | Asset types to promote |
asset | str | None | Specific asset name, or all if omitted |
instance | str | "latest" | Instance to promote ("latest", ID, or "@alias") |
alias | str | "candidate" | Alias to set on the promoted instance. Must be in the allowed list. Set to None to skip. |
with_aliases | bool | False | Copy existing aliases from source branch |
Returns: {"promoted": [...], "errors": [...]}
Promotion aliases
Promoted instances are tagged with aliases that represent lifecycle stages:
| Alias | Meaning | Typical setter |
|---|---|---|
@candidate | Promoted from a branch, ready for evaluation | promote_assets() (default) |
@validated | Passed quality gates | Evaluation flow |
@production | Actively consumed by downstream flows/apps | Approval step |
# Feature branch merges - model arrives on main as @candidate
promote_assets('my_project', source='feature-v2', target='main')
# Evaluation flow passes - re-alias to @validated
promote_assets('my_project', source='main', target='main',
asset='classifier', instance='@candidate',
alias='validated')
# Manual approval - promote to @production
promote_assets('my_project', source='main', target='main',
asset='classifier', instance='@validated',
alias='production')
Downstream consumers can then read a specific stage:
model = self.prj.get_model("classifier", instance="@production")
To customize the allowed aliases, add to obproject.toml:
[promotion]
aliases = ["candidate", "validated", "production"] # default
Promote on merge (CI pattern)
Add a promote job to your GitHub Actions workflow that runs before teardown when a PR is merged:
promote:
if: >
github.event_name == 'pull_request' &&
github.event.action == 'closed' &&
github.event.pull_request.merged == true
steps:
# ... setup steps ...
- name: Promote assets to main
run: |
BRANCH=${{ github.head_ref }}
PROJECT=$(yq .project obproject.toml)
python -c "
from obproject.assets import promote_assets
result = promote_assets('$PROJECT', source='$BRANCH', target='main')
for p in result['promoted']:
print(f\"Promoted {p['kind']}/{p['name']} with @{p.get('alias', 'candidate')}\")
"
teardown:
needs: promote
# ... existing teardown job ...
This ensures assets are promoted to main with @candidate before the feature branch is torn down.
[dev-assets] and promotion pipelines: [dev-assets] branch = "main" redirects all asset reads to main, which is ideal for consumer flows (dashboards, reports). But in a promotion pipeline where a flow trains a model and then evaluates it on the same branch, reads need to come from the branch that just wrote the asset. Either omit [dev-assets] in promotion projects, or use a try/except fallback to read from the write branch when the asset doesn't exist on main yet.
Evaluation Logging
prj.evals.log()
prj.evals.log(message)
Log structured evaluation data with project/branch/run metadata.
| Parameter | Type | Description |
|---|---|---|
message | dict or str | Evaluation data |
self.prj.evals.log({
"model": "fraud_classifier",
"accuracy": 0.95,
"test_samples": 1000
})
Output includes a magic prefix for monitoring system ingestion.
obproject-deploy
The obproject-deploy CLI deploys flows, apps, and assets from a project directory. It is distributed via pip install ob-project-utils.
CLI flags
obproject-deploy [--project NAME] [--all] [--skip-apps] [--skip-flows] [--skip-assets]
| Flag | Description |
|---|---|
--project NAME | Deploy only the specified project from obproject_multi.toml |
--all | Deploy all projects in obproject_multi.toml (default if no --project) |
--skip-apps | Skip all app/endpoint deployments |
--skip-flows | Skip all flow deployments |
--skip-assets | Skip all asset registration |
obproject_deploy.toml
Place an obproject_deploy.toml file in any deployments/<app>/ or flows/<flow>/ directory to control which branches deploy that component:
[deploy]
branches = ["main", "release/*"]
| Key | Type | Default | Description |
|---|---|---|---|
branches | list[str] | Deploy on all branches | Glob patterns for allowed branches |
Behavior:
- When no
obproject_deploy.tomlexists, the component deploys on all branches (backward compatible) - On non-main branches, an info message suggests adding the file
- When the current branch doesn't match any pattern, the component is skipped:
⏭️ Skipping app 'my-dashboard' (branch 'feature_foo' not in ['main', 'release/*'])
Add obproject_deploy.toml with branches = ["main"] to each app in deployments/ to prevent app proliferation on feature branches. See Project lifecycle for the full guide.
Deployment auto-tags ([deploy.tags] in obproject.toml)
Starting with ob-project-utils==0.2.35, every flow deployed by obproject-deploy is automatically tagged with lineage information sourced from the CI environment. Tags are attached to the Argo workflow template and propagate to every run produced by that template.
| Tag key | Source | When emitted |
|---|---|---|
commit-hash:<sha> | Source SHA (from CI env or git rev-parse HEAD) | Always, when a SHA is resolvable |
merge-commit-hash:<sha> | CI-synthesized merge commit (e.g. GitHub PR merge) | Only when distinct from source |
obproject-deploy-gh-action-run:<id> | GITHUB_RUN_ID | GitHub Actions deploys |
obproject-deploy-circleci-run:<id> | CIRCLE_BUILD_NUM | CircleCI deploys |
obproject-deploy-azure-pipeline-run:<id> | BUILD_BUILDID | Azure DevOps deploys |
obproject-deploy-gitlab-pipeline-run:<id> | CI_PIPELINE_ID | GitLab CI deploys |
The behavior is on by default. To disable, add to obproject.toml:
[deploy.tags]
auto = false
| Section | Key | Default | Description |
|---|---|---|---|
[deploy.tags] | auto | true | Inject lineage tags into deployed workflow templates |
Verify tags landed on a deployed run:
from metaflow import Flow
run = next(Flow('<project>.<branch>.<flowname>').runs())
print(run.tags) # expect commit-hash:<sha> and a provider-named run-id tag
outerbounds flowproject
The outerbounds flowproject subcommands manage deployed project resources - workflow templates, assets, apps, and metadata. These are the same primitives that obproject-deploy creates during CI/CD.
These commands require a configured Metaflow profile with access to the Outerbounds API. They read credentials from your ~/.metaflowconfig directory.
Common options
All outerbounds flowproject subcommands accept:
| Option | Default | Description |
|---|---|---|
-d, --config-dir | ~/.metaflowconfig | Path to Metaflow configuration directory |
-p, --profile | $METAFLOW_PROFILE | Named Metaflow profile to use |
Identifying a project branch
Several commands require --id in the format project/branch:
outerbounds flowproject list-templates --id my_project/main
outerbounds flowproject teardown-branch --id my_project/feature-v2
Branch names are normalized to match how obproject-deploy stores them: - and / characters are replaced with _, and the result is lowercased. So --id my_project/feature-v2 resolves to branch feature_v2.
get-metadata
Fetch the latest flowproject metadata for a project/branch.
outerbounds flowproject get-metadata --id <project/branch>
Returns the JSON metadata document that obproject-deploy registered, including workflow definitions, asset references, and app configurations.
# View metadata for production branch
outerbounds flowproject get-metadata --id fraud_detection/main
# Pretty-print with jq
outerbounds flowproject get-metadata --id fraud_detection/main | jq .
set-metadata
Register or update flowproject metadata for a project/branch.
outerbounds flowproject set-metadata '<json_string>'
| Argument | Description |
|---|---|
json_str | JSON string containing the flowproject metadata payload |
outerbounds flowproject set-metadata '{"project": "fraud_detection", "branch": "main", "workflows": [...]}'
This is a low-level command used by deployment tooling. Prefer obproject-deploy for standard deployments.
list-templates
List Argo workflow templates deployed for a project/branch.
outerbounds flowproject list-templates --id <project/branch> [-o json]
| Option | Description |
|---|---|
--id | Required. project/branch identifier |
-o, --output | Output format: json or human-readable (default) |
Templates are discovered by querying Argo directly and matching on metaflow/project_name and metaflow/branch_name annotations.
# Human-readable output
outerbounds flowproject list-templates --id fraud_detection/main
# Machine-readable
outerbounds flowproject list-templates --id fraud_detection/main -o json
# → {"templates": ["frauddetection.prod.trainflow", "frauddetection.prod.scoreflow"]}
delete-metadata
Delete all flowproject metadata for a project/branch.
outerbounds flowproject delete-metadata --id <project/branch> [--yes]
| Option | Description |
|---|---|
--id | Required. project/branch identifier |
--yes | Skip confirmation prompt |
-o, --output | Output format: json or human-readable (default) |
outerbounds flowproject delete-metadata --id fraud_detection/feature-v2 --yes
This removes the metadata record only. It does not delete workflow templates, assets, or apps. Use teardown-branch to remove all resources.
teardown-branch
Delete all deployed resources for a project/branch in a single operation.
outerbounds flowproject teardown-branch --id <project/branch> [--dry-run] [--yes] [-o json]
| Option | Description |
|---|---|
--id | Required. project/branch identifier |
--dry-run | Discover and list resources without deleting anything |
--yes | Skip confirmation prompt |
-o, --output | Output format: json or human-readable (default) |
Teardown discovers and deletes these resource types in order:
- Workflow templates - Argo templates matching the project/branch annotations. Deleting a template cascades to its associated CronWorkflows and Sensors.
- Data assets - As listed in the flowproject metadata.
- Model assets - As listed in the flowproject metadata.
- Apps - Capsules tagged with the project and branch.
- Flowproject metadata - The metadata record itself.
# Preview what would be deleted
outerbounds flowproject teardown-branch --id fraud_detection/feature-v2 --dry-run
# Execute teardown
outerbounds flowproject teardown-branch --id fraud_detection/feature-v2 --yes
# JSON output for scripting
outerbounds flowproject teardown-branch --id fraud_detection/feature-v2 --yes -o json
See Also
- Asset branch resolution - How read/write branches are determined across deployment contexts
- Project Assets basics - Introduction to assets
- Project Structure - Project file organization
- CI/CD integration - Setting up
obproject-deploywith GitHub Actions, GitLab, and more - Project lifecycle - Understanding what deploy creates and how to tear it down