Skip to content

Parameters and results

reflow.params.Param

reflow.params.Param(help='', short=None, namespace='global', **extra)

CLI parameter descriptor.

Parameters:

Name Type Description Default
help str

Help text shown in --help.

''
short str or None

Short flag, e.g. "-s".

None
namespace ``"global"`` or ``"local"``

"global" -- one shared CLI flag. "local" -- flag is prefixed with the task name.

'global'

Examples:

>>> start: Annotated[str, Param(help="Start date, ISO-8601")]
>>> chunk: Annotated[int, Param(help="Chunk size", namespace="local")] = 256
Source code in src/reflow/params.py
def __init__(
    self,
    help: str = "",
    short: str | None = None,
    namespace: Literal["global", "local"] = "global",
    **extra: Any,
) -> None:
    self.help: str = help
    self.short: str | None = short
    self.namespace: Literal["global", "local"] = namespace
    self.extra: dict[str, Any] = extra

reflow.params.Result

reflow.params.Result(step=None, steps=None)

Declare a data dependency on one or more upstream tasks.

Parameters:

Name Type Description Default
step str or None

Single upstream task name.

None
steps list[str] or None

Multiple upstream task names (outputs are concatenated).

None

Raises:

Type Description
ValueError

If neither step nor steps is provided, or both are.

Examples:

Single upstream::

nc_file: Annotated[str, Result(step="prepare")]

Multiple upstreams (concatenated)::

item: Annotated[str, Result(steps=["prepare_a", "prepare_b"])]
Source code in src/reflow/params.py
def __init__(
    self,
    step: str | None = None,
    steps: list[str] | None = None,
) -> None:
    if step is not None and steps is not None:
        raise ValueError("Provide either 'step' or 'steps', not both.")
    if step is None and steps is None:
        raise ValueError("Must provide 'step' or 'steps'.")
    self.steps: list[str] = [step] if step is not None else list(steps)  # type: ignore[arg-type]

reflow.params.RunDir

reflow.params.RunDir

Marker type for the run working directory.

A parameter typed as RunDir (or named run_dir) is injected with a pathlib.Path at runtime. It never appears on the CLI.

Examples:

>>> @workflow.job()
... def ingest(work_dir: RunDir, start: str) -> list[str]:
...     output = work_dir / "output"
...     ...