User guide¶
Tasks¶
A task is a Python function decorated with @wf.job():
@wf.job(cpus=4, time="02:00:00", mem="16G")
def process(run_dir: RunDir = RunDir()) -> str:
return str(run_dir / "output.txt")
The decorator arguments control scheduler resources. All of them are optional:
@wf.job(
cpus=4, # cores per task
time="02:00:00", # walltime limit
mem="16G", # memory
array=True, # run as a parallel array job
array_parallelism=10, # max concurrent array elements
cache=True, # Merkle-DAG caching (default)
version="2", # bump to invalidate cache
after=["cleanup"], # ordering dependency (no data)
partition="gpu", # scheduler-agnostic queue name
)
Parameters¶
Plain parameters¶
Any argument that isn't a RunDir or Result becomes a CLI flag:
Annotated parameters with Param¶
Add help text, short flags, or local scoping:
@wf.job()
def ingest(
source: Annotated[str, Param(help="Input file", short="-s")],
limit: Annotated[int, Param(help="Max rows")] = 100,
) -> str:
return source
Local namespacing¶
When two tasks share a parameter name, use namespace="local" to
prefix the flag with the task name:
@wf.job()
def step_a(
chunk: Annotated[int, Param(namespace="local")] = 256,
) -> str: ...
@wf.job()
def step_b(
chunk: Annotated[int, Param(namespace="local")] = 512,
) -> str: ...
This gives you --step-a-chunk and --step-b-chunk on the CLI.
Literal choices¶
Literal annotations become argparse choices:
@wf.job()
def train(
backend: Annotated[Literal["cpu", "gpu", "tpu"], Param(help="Device")],
) -> str:
return backend
RunDir¶
Parameters typed as RunDir receive the shared working directory as a
pathlib.Path. They never appear on the CLI:
@wf.job()
def process(run_dir: RunDir = RunDir()) -> str:
output = run_dir / "results"
output.mkdir(exist_ok=True)
return str(output / "data.csv")
Data wiring with Result¶
Result tells reflow that a parameter receives output from an upstream
task. The dependency is created automatically:
@wf.job()
def step_a() -> str:
return "hello"
@wf.job()
def step_b(msg: Annotated[str, Result(step="step_a")]) -> str:
return msg.upper()
Wire modes¶
Reflow looks at the types and figures out how data should flow:
| Upstream returns | Downstream takes | What happens |
|---|---|---|
str |
str |
Direct — value passed as-is |
list[str] |
str (array job) |
Fan-out — one element per job |
str (array job) |
list[str] |
Gather — all outputs collected |
str (array job) |
str (array job) |
Chain — 1:1 index mapping |
Fan-out¶
The most common pattern. A singleton returns a list, and an array job processes one element at a time:
@wf.job()
def find_files() -> list[str]:
return ["a.txt", "b.txt", "c.txt"]
@wf.job(array=True)
def process(path: Annotated[str, Result(step="find_files")]) -> str:
return path.upper()
This creates three parallel process jobs.
Gather¶
The reverse — collect all array outputs into one task:
@wf.job()
def report(
results: Annotated[list[str], Result(step="process")],
) -> str:
return f"processed {len(results)} files"
Chain¶
Two array jobs with matching types chain index-by-index:
@wf.job(array=True)
def step_1(item: Annotated[str, Result(step="find_files")]) -> str:
return item.lower()
@wf.job(array=True)
def step_2(item: Annotated[str, Result(step="step_1")]) -> str:
return item.strip()
Broadcast¶
Sometimes you want to pass a value to every element of an array job
without splitting it. If the types match (e.g. list[str] →
list[str]), reflow infers broadcast automatically:
@wf.job()
def load_config() -> dict:
return {"threshold": 0.5, "method": "linear"}
@wf.job()
def find_files() -> list[str]:
return ["a.txt", "b.txt", "c.txt"]
@wf.job(array=True)
def process(
path: Annotated[str, Result(step="find_files")],
config: Annotated[dict, Result(step="load_config", broadcast=True)],
) -> str:
# Every array element gets the full config dict
return f"{path}:{config['method']}"
Without broadcast=True, reflow would try to fan out the dict and
fail. The broadcast flag says "pass this whole value to every
element."
Type-inferred broadcast works too — if upstream returns list[str]
and the downstream array parameter is also list[str], reflow
broadcasts instead of fanning out:
@wf.job()
def all_labels() -> list[str]:
return ["cat", "dog", "bird"]
@wf.job(array=True)
def classify(
image: Annotated[str, Result(step="find_images")],
labels: Annotated[list[str], Result(step="all_labels")],
) -> str:
# 'labels' is the full list in every array element
return f"classified {image} into {labels}"
Combining multiple upstreams¶
Use steps=[...] to concatenate outputs from several tasks:
@wf.job(array=True)
def process(
item: Annotated[str, Result(steps=["source_a", "source_b"])],
) -> str:
return item
Ordering without data¶
When you need "run B after A" but no data transfer:
Typo protection¶
If you misspell a task name, reflow suggests the closest match:
Reusable flows¶
A Flow is a group of task definitions without execution machinery.
Build libraries of tested components and compose them:
from reflow import Flow, Workflow
# Define a reusable flow
ingestion = Flow("ingest")
@ingestion.job()
def download() -> list[str]:
return ["file_1", "file_2"]
@ingestion.job(array=True)
def validate(item: Annotated[str, Result(step="download")]) -> str:
return item
# Compose into a workflow
wf = Workflow("pipeline")
wf.include(ingestion, prefix="raw")
# Tasks become "raw_download" and "raw_validate"
# Internal Result references are rewritten automatically
You can include multiple flows, each with its own prefix.
Local execution¶
For development and testing, run the full pipeline without a scheduler:
run_local executes tasks in topological order in-process. It supports
the same caching, force, and verify options as submit:
run = wf.run_local(
run_dir="/tmp/test",
source="data.csv", # same parameters as submit
max_workers=4, # parallelize array jobs across processes
force=True, # skip cache
on_error="continue", # don't stop on first failure
)
With on_error="continue", failed tasks are recorded but the run
keeps going — downstream tasks that depend on failures are skipped.
Array parallelism¶
Limit how many array elements run at once:
@wf.job(array=True, array_parallelism=20)
def process(item: Annotated[str, Result(step="prepare")]) -> str:
return item
This submits all elements but tells the scheduler to run at most 20 concurrently.