User guide¶
This page covers the core abstractions: tasks, parameters, data wiring, array jobs, and reusable flows.
Tasks¶
A task is a Python function decorated with @wf.job(). Decorator
arguments control scheduler resources:
@wf.job(
cpus=4, # cores per task
time="02:00:00", # walltime
mem="16G", # memory
array=True, # run as an array job
array_parallelism=10, # max concurrent array elements
cache=True, # Merkle-DAG caching (default: True)
version="2", # bump to invalidate cache
after=["cleanup"], # explicit ordering dependency
partition="gpu", # scheduler-agnostic: works on all backends
)
def my_task(...) -> ...:
...
There are two task kinds:
- Singleton (
@wf.job()) - Produces a single task instance.
- Array (
@wf.job(array=True)) - Produces one instance per input element, submitted as a scheduler array job.
Parameters¶
Plain parameters¶
Any function parameter that is not a RunDir or Result becomes a
CLI flag automatically:
Rich metadata with Param¶
Use Param(...) inside Annotated[...] for help text, short flags,
or local scoping:
from typing import Annotated
from reflow import Param
@wf.job()
def ingest(
source: Annotated[str, Param(help="Input file", short="-s")],
limit: Annotated[int, Param(help="Maximum rows")] = 100,
) -> str:
return source
Global vs local parameters¶
By default, parameters are global — one shared CLI flag. Set
namespace="local" to prefix the flag with the task name:
@wf.job()
def convert(
chunk: Annotated[int, Param(help="Chunk size", namespace="local")] = 256,
) -> str:
return str(chunk)
This becomes --convert-chunk on the CLI, avoiding collisions when
multiple tasks share a parameter name.
Literal choices¶
Literal[...] annotations become argparse choices:
from typing import Literal
@wf.job()
def prepare(
model: Annotated[Literal["era5", "icon"], Param(help="Model")],
) -> str:
return model
RunDir¶
Parameters typed as RunDir (or simply named run_dir) are injected
with a pathlib.Path pointing to the shared working directory. They
never appear on the CLI.
from reflow import RunDir
@wf.job()
def prepare(run_dir: RunDir = RunDir()) -> str:
return str(run_dir / "output")
Data wiring with Result¶
Result declares that a parameter receives output from an upstream
task. The upstream automatically becomes a scheduling dependency.
from reflow import Result
@wf.job()
def prepare() -> str:
return "prepared"
@wf.job()
def publish(item: Annotated[str, Result(step="prepare")]) -> str:
return item.upper()
Multi-step input¶
Combine outputs from several upstream tasks with steps=[...]:
@wf.job()
def merge(
item: Annotated[str, Result(steps=["prepare_a", "prepare_b"])],
) -> str:
return item
Ordering without data¶
When you need scheduling order but no data transfer, use after=[...]:
Wire modes¶
Reflow inspects the upstream return type and the downstream parameter type to decide how data flows between tasks:
| Upstream return | Downstream param | Mode |
|---|---|---|
T (singleton) |
T (singleton) |
Direct — value passed as-is |
list[T] (singleton) |
T (array) |
Fan-out — one element per array slot |
T (array) |
list[T] (singleton) |
Gather — collect all into a list |
list[T] (array) |
list[T] (singleton) |
Gather + flatten |
T (array) |
T (array) |
Chain — index-wise 1:1 mapping |
Fan-out¶
The most common pattern: a singleton returns a list, and the downstream array task processes one element at a time.
@wf.job()
def prepare() -> list[str]:
return ["a.nc", "b.nc", "c.nc"]
@wf.job(array=True)
def convert(item: Annotated[str, Result(step="prepare")]) -> str:
return item.replace(".nc", ".zarr")
This creates three convert instances.
Gather¶
The reverse: collect all array outputs into a single downstream task.
@wf.job()
def publish(paths: Annotated[list[str], Result(step="convert")]) -> str:
return f"{len(paths)} done"
Chain¶
Two array tasks with matching element types chain index-wise:
@wf.job(array=True)
def preprocess(item: Annotated[str, Result(step="prepare")]) -> str:
return item
@wf.job(array=True)
def convert(item: Annotated[str, Result(step="preprocess")]) -> str:
return item
Limiting array parallelism¶
@wf.job(array=True, array_parallelism=20, time="01:00:00", mem="8G")
def convert(item: Annotated[str, Result(step="prepare")]) -> str:
return item
Reusable flows¶
A Flow is a reusable group of task definitions without execution
machinery. Build libraries of flows and compose them into workflows:
from reflow import Flow
preprocessing = Flow("preprocess")
@preprocessing.job()
def download(...) -> list[str]: ...
@preprocessing.job(array=True)
def convert(item: Annotated[str, Result(step="download")]) -> str: ...
# Compose into a workflow:
wf = Workflow("experiment")
wf.include(preprocessing, prefix="pre")
# Tasks become "pre.download", "pre.convert"
# Internal Result references are rewritten automatically.
You can include multiple flows, each with its own prefix, building complex pipelines from tested, reusable components.
Typed manifest serialisation¶
The manifest layer uses a built-in typed serialiser so values such as
Path, datetime, UUID, enums, tuples, and dataclasses can
round-trip cleanly through JSON storage.