Python reference¶
Main imports¶
from reflow import (
Workflow,
Flow,
Run,
Param,
Result,
RunDir,
# Executors
SlurmExecutor,
PBSExecutor,
LSFExecutor,
SGEExecutor,
FluxExecutor,
LocalExecutor,
# Low-level
Executor,
JobResources,
Config,
ManifestCodec,
)
Workflow¶
The main entry point. Validates the graph, submits work, dispatches tasks, and exposes status / cancel / retry.
Key methods:
wf.job(...)— register a singleton taskwf.job(array=True, ...)— register an array taskwf.include(flow, prefix=...)— compose a reusable flowwf.validate()— check the graph (called automatically on submit)wf.cli(argv=...)— run the auto-generated CLIwf.submit(run_dir=, ...)— create and dispatch a runwf.submit_run(run_dir, parameters, ...)— lower-level submission (used by CLI)wf.dispatch(run_id, store, run_dir, ...)— continue dispatchingwf.worker(run_id, store, run_dir, task_name, ...)— execute one task instancewf.cancel_run(run_id, store, ...)— cancel active jobswf.retry_failed(run_id, store, run_dir, ...)— resubmit failed taskswf.run_status(run_id, store)— get run statewf.describe()— JSON workflow manifestwf.describe_typed()— typedWorkflowDescriptionobject
Submission API¶
run = wf.submit(
run_dir="/scratch/run-001",
executor="pbs", # or "lsf", "sge", "flux", "local"
force=False, # skip cache entirely
force_tasks=["prepare"], # skip cache for specific tasks
verify=True, # verify cached Path outputs
store=my_store, # optional custom store
start="2026-01-01", # task parameters as kwargs
)
Run handle¶
The Run object is returned by wf.submit() and provides a
user-facing control surface:
run.status() # dict with run state + per-task breakdown
run.cancel() # cancel all active jobs
run.retry() # resubmit failed/cancelled tasks
Attributes: run.run_id, run.run_dir, run.store, run.workflow.
Flow¶
A reusable task collection without execution machinery:
flow = Flow("shared-conversion")
@flow.job()
def step_a(...) -> ...: ...
wf.include(flow, prefix="cmip6")
Store API¶
Most users do not need to instantiate a store, but you can:
from reflow.stores.sqlite import SqliteStore
# Use the shared default database
store = SqliteStore.default(wf.config)
store.init()
# Or for an isolated test
store = SqliteStore.for_run_dir("/tmp/reflow-test")
Helper functions¶
Public helpers in reflow.executors.helpers:
default_executor(config)— return the right executor for the configuredmoderesolve_executor(executor)— resolve a shorthand string ("pbs","lsf", …) to an instance
Public helpers in reflow.workflow:
make_run_id(workflow_name)— generate a<name>-<date>-<hex>run IDbuild_kwargs(spec, run_parameters, task_input)— build function kwargs for a workerresolve_index(explicit)— resolve array index from env vars