Skip to content

Core workflow objects

reflow.flow.Flow

reflow.flow.Flow(name)

Reusable collection of task definitions.

Register tasks with job and array_job. A flow has no execution machinery -- attach it to a reflow.Workflow via Workflow.include.

Parameters:

Name Type Description Default
name str

Flow name.

required
Source code in src/reflow/flow.py
def __init__(self, name: str) -> None:
    self.name: str = name
    self.tasks: dict[str, TaskSpec] = {}
    self._registration_order: list[str] = []

job(name=None, *, array=False, array_parallelism=None, cpus=1, time='00:30:00', mem='4G', after=None, submit_options=None, extra=None, version='1', cache=True, verify=None, backend=None, **scheduler_options)

Register a task, singleton or array job.

Parameters:

Name Type Description Default
name str or None

Task name. Defaults to the function name.

None
array bool

Set this to True if you want to submit parallel jobs. Setting array=True is equivalent to using the array_job method.

False
array_parallelism int or None

Maximum concurrent array tasks.

None
cpus int

CPUs per task.

1
time str

Wall-clock time limit.

'00:30:00'
mem str

Memory request.

'4G'
after list[str] or None

Explicit ordering dependencies.

None
submit_options dict[str, Any] or None

Scheduler-native submit options, for example partition, account, qos, or queue.

None
extra dict[str, Any] or None

Backward-compatible alias for submit_options.

None
version str

Cache version string. Bump to invalidate cached results when the task logic changes.

'1'
cache bool

Whether to cache results across runs.

True
verify callable or None

Custom output verification function.

None
backend str or None

Optional backend hint for future multi-backend execution.

None
**scheduler_options Any

Additional scheduler-native submit options. These are merged with submit_options and stored verbatim.

{}
Source code in src/reflow/flow.py
def job(
    self,
    name: str | None = None,
    *,
    array: bool = False,
    array_parallelism: int | None = None,
    cpus: int = 1,
    time: str = "00:30:00",
    mem: str = "4G",
    after: list[str] | None = None,
    submit_options: dict[str, Any] | None = None,
    extra: dict[str, Any] | None = None,
    version: str = "1",
    cache: bool = True,
    verify: Callable[[Any], bool] | None = None,
    backend: str | None = None,
    **scheduler_options: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Register a task, singleton or array job.

    Parameters
    ----------
    name : str or None
        Task name. Defaults to the function name.
    array: bool
        Set this to `True` if you want to submit parallel jobs.
        Setting `array=True` is equivalent to using the
        [`array_job`][array_job] method.
    array_parallelism : int or None
        Maximum concurrent array tasks.
    cpus : int
        CPUs per task.
    time : str
        Wall-clock time limit.
    mem : str
        Memory request.
    after : list[str] or None
        Explicit ordering dependencies.
    submit_options : dict[str, Any] or None
        Scheduler-native submit options, for example `partition`,
        `account`, `qos`, or `queue`.
    extra : dict[str, Any] or None
        Backward-compatible alias for ``submit_options``.
    version : str
        Cache version string. Bump to invalidate cached results
        when the task logic changes.
    cache : bool
        Whether to cache results across runs.
    verify : callable or None
        Custom output verification function.
    backend : str or None
        Optional backend hint for future multi-backend execution.
    **scheduler_options : Any
        Additional scheduler-native submit options. These are merged
        with ``submit_options`` and stored verbatim.

    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        task_name = name or func.__name__
        self._register(
            task_name,
            func,
            JobConfig(
                cpus=cpus,
                time=time,
                mem=mem,
                after=list(after or []),
                array=array,
                array_parallelism=array_parallelism,
                submit_options=submit_options,
                extra=extra,
                version=version,
                cache=cache,
                backend=backend,
                **scheduler_options,
            ),
            verify=verify,
        )
        return func

    return decorator

array_job(name=None, *, cpus=1, time='00:30:00', mem='4G', after=None, array_parallelism=None, submit_options=None, extra=None, version='1', cache=True, verify=None, backend=None, **scheduler_options)

Register an array task.

Parameters:

Name Type Description Default
name str or None

Task name. Defaults to the function name.

None
cpus int

CPUs per task.

1
time str

Wall-clock time limit.

'00:30:00'
mem str

Memory request.

'4G'
after list[str] or None

Additional explicit ordering dependencies.

None
array_parallelism int or None

Maximum concurrent array tasks.

None
submit_options dict[str, Any] or None

Scheduler-native submit options, for example partition, account, qos, or queue.

None
extra dict[str, Any] or None

Backward-compatible alias for submit_options.

None
version str

Cache version string.

'1'
cache bool

Whether to cache results across runs.

True
verify callable or None

Custom output verification function.

None
backend str or None

Optional backend hint for future multi-backend execution.

None
**scheduler_options Any

Additional scheduler-native submit options. These are merged with submit_options and stored verbatim.

{}
Source code in src/reflow/flow.py
def array_job(
    self,
    name: str | None = None,
    *,
    cpus: int = 1,
    time: str = "00:30:00",
    mem: str = "4G",
    after: list[str] | None = None,
    array_parallelism: int | None = None,
    submit_options: dict[str, Any] | None = None,
    extra: dict[str, Any] | None = None,
    version: str = "1",
    cache: bool = True,
    verify: Callable[[Any], bool] | None = None,
    backend: str | None = None,
    **scheduler_options: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Register an array task.

    Parameters
    ----------
    name : str or None
        Task name. Defaults to the function name.
    cpus : int
        CPUs per task.
    time : str
        Wall-clock time limit.
    mem : str
        Memory request.
    after : list[str] or None
        Additional explicit ordering dependencies.
    array_parallelism : int or None
        Maximum concurrent array tasks.
    submit_options : dict[str, Any] or None
        Scheduler-native submit options, for example ``partition``,
        ``account``, ``qos``, or ``queue``.
    extra : dict[str, Any] or None
        Backward-compatible alias for ``submit_options``.
    version : str
        Cache version string.
    cache : bool
        Whether to cache results across runs.
    verify : callable or None
        Custom output verification function.
    backend : str or None
        Optional backend hint for future multi-backend execution.
    **scheduler_options : Any
        Additional scheduler-native submit options. These are merged
        with ``submit_options`` and stored verbatim.

    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        task_name = name or func.__name__
        self._register(
            task_name,
            func,
            JobConfig(
                cpus=cpus,
                time=time,
                mem=mem,
                after=list(after or []),
                array=True,
                array_parallelism=array_parallelism,
                submit_options=submit_options,
                extra=extra,
                version=version,
                cache=cache,
                backend=backend,
                **scheduler_options,
            ),
            verify=verify,
        )
        return func

    return decorator

reflow.flow.JobConfig

reflow.flow.JobConfig(cpus=1, time='00:30:00', mem='4G', array=False, after=None, array_parallelism=None, submit_options=None, version='1', cache=True, backend=None, extra=None, **scheduler_options) dataclass

Resource and dependency configuration for a single task.

Source code in src/reflow/flow.py
def __init__(
    self,
    cpus: int = 1,
    time: str = "00:30:00",
    mem: str = "4G",
    array: bool = False,
    after: list[str] | None = None,
    array_parallelism: int | None = None,
    submit_options: dict[str, Any] | None = None,
    version: str = "1",
    cache: bool = True,
    backend: str | None = None,
    extra: dict[str, Any] | None = None,
    **scheduler_options: Any,
) -> None:
    merged = dict(submit_options or {})
    if extra:
        merged.update(extra)
    merged.update(scheduler_options)

    self.cpus = cpus
    self.time = time
    self.mem = mem
    self.array = array
    self.after = list(after or [])
    self.array_parallelism = array_parallelism
    self.submit_options = merged
    self.version = version
    self.cache = cache
    self.backend = backend

extra property

Backward-compatible alias for scheduler-native submit options.

reflow.workflow.Workflow

reflow.workflow.Workflow(name, config=None)

Bases: DispatchMixin, WorkerMixin, Flow

Runnable HPC workflow.

Extends Flow with execution: validation, CLI, submission, dispatch, worker, cancel, retry, status.

Parameters:

Name Type Description Default
name str

Workflow name.

required
config Config or None

User configuration. Loaded from ~/.config/reflow/config.toml when None.

None

Examples:

Direct task registration::

wf = Workflow("pipeline")

@wf.job()
def prepare(...) -> list[str]: ...

wf.cli()

Composing from flows::

conversion = Flow("conversion")
# ... tasks ...

wf = Workflow("experiment")
wf.include(conversion)
run = wf.submit(run_dir="/scratch/run1", start="2025-01-01")
run.status()
Source code in src/reflow/workflow/_core.py
def __init__(self, name: str, config: Config | None = None) -> None:
    super().__init__(name)
    self.config: Config = config or load_config()

include(flow, prefix=None)

Include all tasks from a flow into this workflow.

Parameters:

Name Type Description Default
flow Flow

Reusable flow.

required
prefix str or None

Optional name prefix. Internal Result/after references within the flow are rewritten automatically.

None
Source code in src/reflow/workflow/_core.py
def include(self, flow: Flow, prefix: str | None = None) -> None:
    """Include all tasks from a flow into this workflow.

    Parameters
    ----------
    flow : Flow
        Reusable flow.
    prefix : str or None
        Optional name prefix.  Internal Result/after references
        within the flow are rewritten automatically.

    """
    new_tasks, new_order = flow._prefixed_tasks(prefix)
    for task_name, spec in new_tasks.items():
        if task_name in self.tasks:
            raise ValueError(
                f"Cannot include flow {flow.name!r}: task "
                f"{task_name!r} already exists."
            )
        self.tasks[task_name] = spec
    self._registration_order.extend(new_order)

validate()

Run full graph validation.

Raises:

Type Description
ValueError

Unresolved references or cycles.

TypeError

Incompatible wired types.

Source code in src/reflow/workflow/_core.py
def validate(self) -> None:
    """Run full graph validation.

    Raises
    ------
    ValueError
        Unresolved references or cycles.
    TypeError
        Incompatible wired types.

    """
    for spec in self.tasks.values():
        for pname, result in spec.result_deps.items():
            for step in result.steps:
                if step not in self.tasks:
                    raise ValueError(
                        f"Task {spec.name!r} param {pname!r} "
                        f"references unknown task {step!r}."
                    )
        for dep in spec.config.after:
            if dep not in self.tasks:
                raise ValueError(
                    f"Task {spec.name!r} after=[{dep!r}] "
                    f"references unknown task {dep!r}."
                )

    for spec in self.tasks.values():
        try:
            hints = get_type_hints(spec.func, include_extras=True)
        except Exception:
            continue
        for pname, result in spec.result_deps.items():
            raw_ann = hints.get(pname)
            if raw_ann is None:
                continue
            param_type = extract_base_type(raw_ann)
            for step in result.steps:
                upstream = self.tasks[step]
                if upstream.return_type in (None, inspect.Parameter.empty):
                    continue
                check_type_compatibility(
                    upstream.return_type,
                    upstream.config.array,
                    param_type,
                    spec.config.array,
                    step,
                    spec.name,
                    pname,
                )

    self._topological_order()

cli(argv=None)

Run the argparse CLI.

Source code in src/reflow/workflow/_core.py
def cli(self, argv: Sequence[str] | None = None) -> int:
    """Run the argparse CLI."""
    self.validate()
    from ..cli import parse_args, run_command

    args = parse_args(self, list(argv) if argv is not None else None)
    return run_command(self, args)

submit(run_dir, *, store=None, executor=None, force=False, force_tasks=None, verify=False, **parameters)

Create a new run and return an interactive Run handle.

Parameters:

Name Type Description Default
run_dir str or Path

Shared working directory.

required
store Store or None

Manifest store. Defaults to the shared SqliteStore in the user cache directory.

None
executor Executor, str, or None

Workload-manager executor. "local" is a shorthand for reflow.LocalExecutor.

None
force bool

If True, skip the Merkle cache entirely and run all tasks fresh.

False
force_tasks list[str] or None

Task names to force-run even if cached. Other tasks may still use the cache.

None
verify bool

If True, run output verification (file existence for Path outputs, custom callables) on every cache hit. By default the Merkle identity is trusted on submit; verification runs automatically during retry.

False
**parameters Any

Run parameters injected into task functions by name.

{}

Returns:

Type Description
Run

Raises:

Type Description
TypeError

If required parameters are missing.

Source code in src/reflow/workflow/_core.py
def submit(
    self,
    run_dir: str | Path,
    *,
    store: Store | None = None,
    executor: Executor | str | None = None,
    force: bool = False,
    force_tasks: list[str] | None = None,
    verify: bool = False,
    **parameters: Any,
) -> Run:
    """Create a new run and return an interactive [`Run`][Run] handle.

    Parameters
    ----------
    run_dir : str or Path
        Shared working directory.
    store : Store or None
        Manifest store. Defaults to the shared
        [`SqliteStore`][reflow.stores.sqlite.SqliteStore] in the user
        cache directory.
    executor : Executor, str, or None
        Workload-manager executor.  ``"local"`` is a shorthand
        for [`reflow.LocalExecutor`][reflow.LocalExecutor].
    force : bool
        If ``True``, skip the Merkle cache entirely and run all
        tasks fresh.
    force_tasks : list[str] or None
        Task names to force-run even if cached.  Other tasks
        may still use the cache.
    verify : bool
        If ``True``, run output verification (file existence for
        ``Path`` outputs, custom callables) on every cache hit.
        By default the Merkle identity is trusted on submit;
        verification runs automatically during retry.
    **parameters
        Run parameters injected into task functions by name.

    Returns
    -------
    Run

    Raises
    ------
    TypeError
        If required parameters are missing.

    """
    real_executor = resolve_executor(executor)
    self.validate()
    self._check_submit_params(parameters)

    rd = Path(run_dir).expanduser().resolve()
    rd.mkdir(parents=True, exist_ok=True)
    (rd / "logs").mkdir(parents=True, exist_ok=True)

    st = store or SqliteStore.default(self.config)
    st.init()

    run_id = make_run_id(self.name)
    user_id = getpass.getuser()

    params = dict(parameters)
    params["run_dir"] = str(rd)
    if force:
        params["__force__"] = True
    if force_tasks:
        params["__force_tasks__"] = force_tasks

    st.insert_run(run_id, self.name, user_id, params)

    for spec in self.tasks.values():
        st.insert_task_spec(
            run_id,
            spec.name,
            spec.config.array,
            asdict(spec.config),
            self._effective_dependencies(spec),
        )

    for spec in self.tasks.values():
        if self._effective_dependencies(spec) or spec.config.array:
            continue
        st.insert_task_instance(
            run_id,
            spec.name,
            None,
            TaskState.PENDING,
            {},
        )

    exc = real_executor or default_executor(self.config)
    self._submit_dispatch(exc, run_id, rd, st, verify=verify)

    logger.info("Created run %s in %s", run_id, rd)
    return Run(workflow=self, run_id=run_id, run_dir=rd, store=st)

submit_run(run_dir, parameters, store=None, executor=None)

Lower-level submission used by the CLI. Returns run_id.

Source code in src/reflow/workflow/_core.py
def submit_run(
    self,
    run_dir: Path,
    parameters: dict[str, Any],
    store: Store | None = None,
    executor: Executor | None = None,
) -> str:
    """Lower-level submission used by the CLI.  Returns run_id."""
    self.validate()
    rd = Path(run_dir).expanduser().resolve()
    rd.mkdir(parents=True, exist_ok=True)
    (rd / "logs").mkdir(parents=True, exist_ok=True)

    st = store or SqliteStore.default(self.config)
    st.init()

    run_id = make_run_id(self.name)
    user_id = getpass.getuser()

    params = dict(parameters)
    params["run_dir"] = str(rd)

    st.insert_run(run_id, self.name, user_id, params)

    for spec in self.tasks.values():
        st.insert_task_spec(
            run_id,
            spec.name,
            spec.config.array,
            asdict(spec.config),
            self._effective_dependencies(spec),
        )

    for spec in self.tasks.values():
        if self._effective_dependencies(spec) or spec.config.array:
            continue
        st.insert_task_instance(
            run_id,
            spec.name,
            None,
            TaskState.PENDING,
            {},
        )

    exc = executor or default_executor(self.config)
    self._submit_dispatch(exc, run_id, rd, st)
    return run_id

retry_failed(run_id, store, run_dir, task_name=None, executor=None, verify=True)

Retry failed/cancelled instances and re-dispatch.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
store Store

Manifest store for the run.

required
run_dir Path

Working directory for the run.

required
task_name str or None

Optional task name filter.

None
executor Executor or None

Explicit executor override.

None
verify bool

If True (the default for retry), verify cached upstream outputs before resubmitting. This catches stale intermediate files that were deleted after the original run.

True
Source code in src/reflow/workflow/_core.py
def retry_failed(
    self,
    run_id: str,
    store: Store,
    run_dir: Path,
    task_name: str | None = None,
    executor: Executor | None = None,
    verify: bool = True,
) -> int:
    """Retry failed/cancelled instances and re-dispatch.

    Parameters
    ----------
    run_id : str
        Run identifier.
    store : Store
        Manifest store for the run.
    run_dir : Path
        Working directory for the run.
    task_name : str or None
        Optional task name filter.
    executor : Executor or None
        Explicit executor override.
    verify : bool
        If ``True`` (the default for retry), verify cached
        upstream outputs before resubmitting.  This catches
        stale intermediate files that were deleted after the
        original run.

    """
    instances = store.list_task_instances(
        run_id,
        task_name=task_name,
        states=list(TaskState.retriable()),
    )
    retried = 0
    for inst in instances:
        store.mark_for_retry(int(inst["id"]))
        retried += 1
    store.update_run_status(run_id, RunState.RUNNING)
    if retried > 0:
        exc = executor or default_executor(self.config)
        self._submit_dispatch(exc, run_id, run_dir, store, verify=verify)
    return retried

describe_typed()

Return a typed workflow description.

This is the canonical manifest description used by external tooling and future service integrations. describe converts it into a JSON-safe dictionary.

Source code in src/reflow/workflow/_core.py
def describe_typed(self) -> WorkflowDescription:
    """Return a typed workflow description.

    This is the canonical manifest description used by external tooling and
    future service integrations.  [`describe`][describe] converts it into a
    JSON-safe dictionary.
    """
    self.validate()
    tasks_desc: list[TaskDescription] = []
    for name in self._topological_order():
        spec = self.tasks[name]
        tasks_desc.append(
            TaskDescription(
                name=spec.name,
                is_array=spec.config.array,
                config=asdict(spec.config),
                dependencies=self._effective_dependencies(spec),
                result_deps={
                    pname: {"steps": result.steps}
                    for pname, result in spec.result_deps.items()
                },
                return_type=str(spec.return_type) if spec.return_type else None,
                has_verify=spec.verify is not None,
            )
        )

    cli_params: list[CliParamDescription] = []
    all_resolved = []
    for spec in self.tasks.values():
        all_resolved.extend(
            collect_cli_params(spec.name, spec.func, spec.signature)
        )
    for rp in merge_resolved_params(all_resolved):
        cli_params.append(
            CliParamDescription(
                name=rp.name,
                task=rp.task_name,
                flag=rp.cli_flag(),
                type_repr=str(rp.base_type),
                required=rp.required,
                default=rp.default,
                namespace=rp.namespace,
                help=rp.help_text,
                choices=list(rp.literal_choices) if rp.literal_choices else None,
            )
        )

    return WorkflowDescription(
        name=self.name,
        entrypoint=Path(sys.argv[0]).expanduser().resolve(),
        python=Path(sys.executable),
        working_dir=Path.cwd(),
        tasks=tasks_desc,
        cli_params=cli_params,
    )

describe()

Return a JSON-safe description of the workflow.

Includes task names, types, resource configs, dependencies, and CLI parameters -- everything a server would need to reconstruct the submit form and dispatch logic.

Source code in src/reflow/workflow/_core.py
def describe(self) -> dict[str, Any]:
    """Return a JSON-safe description of the workflow.

    Includes task names, types, resource configs, dependencies, and CLI
    parameters -- everything a server would need to reconstruct the submit
    form and dispatch logic.
    """
    return self.describe_typed().to_manifest_dict()

reflow.run.Run

reflow.run.Run(workflow, run_id, run_dir, store)

Bound handle to a submitted run.

Parameters:

Name Type Description Default
workflow Workflow

The workflow that owns this run.

required
run_id str

Unique run identifier.

required
run_dir Path

Shared working directory.

required
store Store

Manifest store.

required

Examples:

>>> run = workflow.submit(run_dir="/scratch/run1", start="2025-01-01")
>>> run.status()
>>> run.cancel()
>>> run.retry("convert")
Source code in src/reflow/run.py
def __init__(
    self,
    workflow: Workflow,
    run_id: str,
    run_dir: Path,
    store: Store,
) -> None:
    self.workflow: Workflow = workflow
    self.run_id: str = run_id
    self.run_dir: Path = Path(run_dir)
    self.store: Store = store

status(task=None, as_dict=False)

Show or return the current run status.

Parameters:

Name Type Description Default
task str or None

Filter by task name.

None
as_dict bool

If True, return the raw dict instead of printing.

False
Source code in src/reflow/run.py
def status(
    self,
    task: str | None = None,
    as_dict: bool = False,
) -> dict[str, Any] | None:
    """Show or return the current run status.

    Parameters
    ----------
    task : str or None
        Filter by task name.
    as_dict : bool
        If ``True``, return the raw dict instead of printing.

    """
    info = self.workflow.run_status(self.run_id, self.store)

    if as_dict:
        return info

    run = info["run"]
    print(f"Run      {self.run_id}")
    print(f"Workflow {run.get('graph_name', '?')}")
    print(f"Status   {run.get('status', '?')}")
    print(f"Created  {run.get('created_at', '?')}")
    print()

    summary = info["summary"]
    try:
        topo = self.workflow._topological_order()
        ordered = [t for t in topo if t in summary]
        ordered += [t for t in summary if t not in topo]
    except ValueError:
        ordered = sorted(summary)

    for tname in ordered:
        if task and tname != task:
            continue
        states = summary[tname]
        parts = [f"{s}={n}" for s, n in sorted(states.items())]
        print(f"  {tname:20s}  {', '.join(parts)}")

    if task:
        print()
        for inst in info["instances"]:
            if inst.get("task_name") != task:
                continue
            idx = inst.get("array_index")
            idx_str = f"[{idx}]" if idx is not None else "   "
            state = inst.get("state", "?")
            job_id = inst.get("job_id", "-")
            print(f"    {idx_str:6s}  {state:12s}  job={job_id}")

    return None

cancel(task=None, executor=None)

Cancel active task instances.

Parameters:

Name Type Description Default
task str or None

Cancel only this task.

None
executor Executor or None

Workload-manager executor.

None

Returns:

Type Description
int

Number of instances cancelled.

Source code in src/reflow/run.py
def cancel(
    self,
    task: str | None = None,
    executor: Executor | None = None,
) -> int:
    """Cancel active task instances.

    Parameters
    ----------
    task : str or None
        Cancel only this task.
    executor : Executor or None
        Workload-manager executor.

    Returns
    -------
    int
        Number of instances cancelled.

    """
    n = self.workflow.cancel_run(
        self.run_id, self.store, task_name=task, executor=executor,
    )
    print(f"Cancelled {n} task instance(s).")
    return n

retry(task=None, executor=None, verify=True)

Retry failed or cancelled instances.

By default, cached upstream outputs are verified (file existence for Path types, custom callables) before resubmitting. Pass verify=False to skip verification and trust the Merkle identity.

Parameters:

Name Type Description Default
task str or None

Retry only this task.

None
executor Executor or None

Workload-manager executor.

None
verify bool

If True (default), verify cached upstream outputs.

True

Returns:

Type Description
int

Number of instances marked for retry.

Source code in src/reflow/run.py
def retry(
    self,
    task: str | None = None,
    executor: Executor | None = None,
    verify: bool = True,
) -> int:
    """Retry failed or cancelled instances.

    By default, cached upstream outputs are verified (file
    existence for ``Path`` types, custom callables) before
    resubmitting.  Pass ``verify=False`` to skip verification
    and trust the Merkle identity.

    Parameters
    ----------
    task : str or None
        Retry only this task.
    executor : Executor or None
        Workload-manager executor.
    verify : bool
        If ``True`` (default), verify cached upstream outputs.

    Returns
    -------
    int
        Number of instances marked for retry.

    """
    n = self.workflow.retry_failed(
        self.run_id, self.store, self.run_dir,
        task_name=task, executor=executor, verify=verify,
    )
    print(f"Marked {n} task instance(s) for retry.")
    return n