Skip to content

Executors

reflow.executors.Executor

reflow.executors.Executor(mode='', python='', config=None)

Bases: ABC

Base class for workload-manager executors.

Subclasses must implement :meth:submit, :meth:cancel, and :meth:job_state. The :attr:array_index_env_var class attribute tells the worker which environment variable carries the array task index at runtime.

Key normalisation

Users write scheduler-agnostic config (e.g. partition = "gpu" or queue = "batch"). Each executor declares a :attr:_KEY_ALIASES mapping that rewrites incoming keys to the backend's native vocabulary before rendering CLI flags. This means partition and queue are treated as synonyms — a user never needs to know which scheduler is active.

Source code in src/reflow/executors/__init__.py
def __init__(
    self, mode: str = "", python: str = "", config: Config | None = None
) -> None:
    self.python = python or sys.executable
    self._config = config or Config()

submit(resources, command) abstractmethod

Submit a job and return its identifier.

Source code in src/reflow/executors/__init__.py
@abc.abstractmethod
def submit(self, resources: JobResources, command: list[str]) -> str:
    """Submit a job and return its identifier."""

cancel(job_id) abstractmethod

Cancel a submitted or running job.

Source code in src/reflow/executors/__init__.py
@abc.abstractmethod
def cancel(self, job_id: str) -> None:
    """Cancel a submitted or running job."""

job_state(job_id) abstractmethod

Query the current state of a job.

Source code in src/reflow/executors/__init__.py
@abc.abstractmethod
def job_state(self, job_id: str) -> str | None:
    """Query the current state of a job."""

dependency_options(job_ids)

Return submit_options entries for depending on job_ids.

The dispatch loop calls this to build the dependency specification for the follow-up dispatch job. Each backend returns the correct key/value pair for its scheduler.

The default implementation produces Slurm-style {"dependency": "afterany:ID1:ID2"}.

Parameters:

Name Type Description Default
job_ids list[str]

Job identifiers to depend on.

required

Returns:

Type Description
dict[str, str]

Entries to merge into submit_options.

Source code in src/reflow/executors/__init__.py
def dependency_options(self, job_ids: list[str]) -> dict[str, str]:
    """Return *submit_options* entries for depending on *job_ids*.

    The dispatch loop calls this to build the dependency specification
    for the follow-up dispatch job.  Each backend returns the correct
    key/value pair for its scheduler.

    The default implementation produces Slurm-style
    ``{"dependency": "afterany:ID1:ID2"}``.

    Parameters
    ----------
    job_ids : list[str]
        Job identifiers to depend on.

    Returns
    -------
    dict[str, str]
        Entries to merge into *submit_options*.

    """
    return {"dependency": "afterany:" + ":".join(job_ids)}

reflow.executors.JobResources

reflow.executors.JobResources(job_name='', cpus=1, time_limit='00:30:00', mem='4G', array=None, output_path=None, error_path=None, submit_options=None, backend=None, extra=None) dataclass

Resource request for one task submission.

Source code in src/reflow/executors/__init__.py
def __init__(
    self,
    job_name: str = "",
    cpus: int = 1,
    time_limit: str = "00:30:00",
    mem: str = "4G",
    array: str | None = None,
    output_path: Path | None = None,
    error_path: Path | None = None,
    submit_options: dict[str, Any] | None = None,
    backend: str | None = None,
    extra: dict[str, Any] | None = None,
) -> None:
    merged: dict[str, Any] = dict(submit_options or {})
    if extra:
        merged.update(extra)

    self.job_name = job_name
    self.cpus = cpus
    self.time_limit = time_limit
    self.mem = mem
    self.array = array
    self.output_path = output_path
    self.error_path = error_path
    self.submit_options = merged
    self.backend = backend

extra property

Backward-compatible alias for scheduler-native submit options.

reflow.executors.slurm.SlurmExecutor

reflow.executors.slurm.SlurmExecutor(mode='sbatch', sbatch='sbatch', scancel='scancel', sacct='sacct', python='', config=None)

Bases: Executor

Submit, cancel, and query Slurm jobs.

Source code in src/reflow/executors/slurm.py
def __init__(
    self,
    mode: str = "sbatch",
    sbatch: str = "sbatch",
    scancel: str = "scancel",
    sacct: str = "sacct",
    python: str = "",
    config: Config | None = None,
) -> None:
    self.mode = mode
    self.sbatch = sbatch
    self.scancel = scancel
    self.sacct = sacct
    super().__init__(python=python, config=config)

from_environment(config=None) classmethod

Build from REFLOW_* environment variables and config.

Source code in src/reflow/executors/slurm.py
@classmethod
def from_environment(cls, config: Config | None = None) -> SlurmExecutor:
    """Build from ``REFLOW_*`` environment variables and config."""
    cfg = config or Config()
    mode = os.getenv("REFLOW_MODE") or cfg.executor_mode or "sbatch"
    python = os.getenv("REFLOW_PYTHON") or cfg.executor_python or sys.executable
    return cls(
        mode=mode.strip().lower(),
        sbatch=os.getenv("REFLOW_SBATCH", "sbatch"),
        scancel=os.getenv("REFLOW_SCANCEL", "scancel"),
        sacct=os.getenv("REFLOW_SACCT", "sacct"),
        python=python,
    )

reflow.executors.pbs.PBSExecutor

reflow.executors.pbs.PBSExecutor(mode='qsub', qsub='qsub', qdel='qdel', qstat='qstat', array_flag=None, python='', config=None)

Bases: Executor

Submit, cancel, and query PBS / Torque jobs.

Parameters:

Name Type Description Default
mode str

"qsub" for live submission, "dry-run" for logging only.

'qsub'
qsub str

Path to the qsub binary.

'qsub'
qdel str

Path to the qdel binary.

'qdel'
qstat str

Path to the qstat binary.

'qstat'
array_flag str or None

"-J" for PBS Pro / OpenPBS, "-t" for Torque. Auto-detected when None.

None
python str

Python interpreter for worker jobs.

''
config Config or None

User configuration.

None
Source code in src/reflow/executors/pbs.py
def __init__(
    self,
    mode: str = "qsub",
    qsub: str = "qsub",
    qdel: str = "qdel",
    qstat: str = "qstat",
    array_flag: str | None = None,
    python: str = "",
    config: Config | None = None,
) -> None:
    self.mode = mode
    self.qsub = qsub
    self.qdel = qdel
    self.qstat = qstat
    self.array_flag = array_flag or _detect_pbs_variant(qstat)
    super().__init__(python=python, config=config)

from_environment(config=None) classmethod

Build from REFLOW_* environment variables and config.

Source code in src/reflow/executors/pbs.py
@classmethod
def from_environment(cls, config: Config | None = None) -> PBSExecutor:
    """Build from ``REFLOW_*`` environment variables and config."""
    cfg = config or Config()
    mode = os.getenv("REFLOW_MODE") or cfg.executor_mode or "qsub"
    python = os.getenv("REFLOW_PYTHON") or cfg.executor_python or sys.executable
    return cls(
        mode=mode.strip().lower(),
        qsub=os.getenv("REFLOW_QSUB", "qsub"),
        qdel=os.getenv("REFLOW_QDEL", "qdel"),
        qstat=os.getenv("REFLOW_QSTAT", "qstat"),
        python=python,
        config=cfg,
    )

dependency_options(job_ids)

PBS dependency: -W depend=afterany:ID1:ID2.

Source code in src/reflow/executors/pbs.py
def dependency_options(self, job_ids: list[str]) -> dict[str, str]:
    """PBS dependency: ``-W depend=afterany:ID1:ID2``."""
    return {"depend": "afterany:" + ":".join(job_ids)}

reflow.executors.lsf.LSFExecutor

reflow.executors.lsf.LSFExecutor(mode='bsub', bsub='bsub', bkill='bkill', bjobs='bjobs', python='', config=None)

Bases: Executor

Submit, cancel, and query LSF jobs.

Parameters:

Name Type Description Default
mode str

"bsub" for live submission, "dry-run" for logging only.

'bsub'
bsub str

Path to the bsub binary.

'bsub'
bkill str

Path to the bkill binary.

'bkill'
bjobs str

Path to the bjobs binary.

'bjobs'
python str

Python interpreter for worker jobs.

''
config Config or None

User configuration.

None
Source code in src/reflow/executors/lsf.py
def __init__(
    self,
    mode: str = "bsub",
    bsub: str = "bsub",
    bkill: str = "bkill",
    bjobs: str = "bjobs",
    python: str = "",
    config: Config | None = None,
) -> None:
    self.mode = mode
    self.bsub = bsub
    self.bkill = bkill
    self.bjobs = bjobs
    super().__init__(python=python, config=config)

from_environment(config=None) classmethod

Build from REFLOW_* environment variables and config.

Source code in src/reflow/executors/lsf.py
@classmethod
def from_environment(cls, config: Config | None = None) -> LSFExecutor:
    """Build from ``REFLOW_*`` environment variables and config."""
    cfg = config or Config()
    mode = os.getenv("REFLOW_MODE") or cfg.executor_mode or "bsub"
    python = os.getenv("REFLOW_PYTHON") or cfg.executor_python or sys.executable
    return cls(
        mode=mode.strip().lower(),
        bsub=os.getenv("REFLOW_BSUB", "bsub"),
        bkill=os.getenv("REFLOW_BKILL", "bkill"),
        bjobs=os.getenv("REFLOW_BJOBS", "bjobs"),
        python=python,
        config=cfg,
    )

dependency_options(job_ids)

LSF dependency: -w "ended(ID1) && ended(ID2)".

Source code in src/reflow/executors/lsf.py
def dependency_options(self, job_ids: list[str]) -> dict[str, str]:
    """LSF dependency: ``-w "ended(ID1) && ended(ID2)"``."""
    expr = " && ".join(f"ended({jid})" for jid in job_ids)
    return {"dependency_expr": expr}

reflow.executors.sge.SGEExecutor

reflow.executors.sge.SGEExecutor(mode='qsub', qsub='qsub', qdel='qdel', qstat='qstat', python='', config=None)

Bases: Executor

Submit, cancel, and query SGE / UGE jobs.

Parameters:

Name Type Description Default
mode str

"qsub" for live submission, "dry-run" for logging only.

'qsub'
qsub str

Path to the qsub binary.

'qsub'
qdel str

Path to the qdel binary.

'qdel'
qstat str

Path to the qstat binary.

'qstat'
python str

Python interpreter for worker jobs.

''
config Config or None

User configuration.

None
Source code in src/reflow/executors/sge.py
def __init__(
    self,
    mode: str = "qsub",
    qsub: str = "qsub",
    qdel: str = "qdel",
    qstat: str = "qstat",
    python: str = "",
    config: Config | None = None,
) -> None:
    self.mode = mode
    self.qsub = qsub
    self.qdel = qdel
    self.qstat = qstat
    super().__init__(python=python, config=config)

from_environment(config=None) classmethod

Build from REFLOW_* environment variables and config.

Source code in src/reflow/executors/sge.py
@classmethod
def from_environment(cls, config: Config | None = None) -> SGEExecutor:
    """Build from ``REFLOW_*`` environment variables and config."""
    cfg = config or Config()
    mode = os.getenv("REFLOW_MODE") or cfg.executor_mode or "qsub"
    python = os.getenv("REFLOW_PYTHON") or cfg.executor_python or sys.executable
    return cls(
        mode=mode.strip().lower(),
        qsub=os.getenv("REFLOW_QSUB", "qsub"),
        qdel=os.getenv("REFLOW_QDEL", "qdel"),
        qstat=os.getenv("REFLOW_QSTAT", "qstat"),
        python=python,
        config=cfg,
    )

dependency_options(job_ids)

SGE dependency: -hold_jid ID1,ID2.

Source code in src/reflow/executors/sge.py
def dependency_options(self, job_ids: list[str]) -> dict[str, str]:
    """SGE dependency: ``-hold_jid ID1,ID2``."""
    return {"hold_jid": ",".join(job_ids)}

reflow.executors.flux.FluxExecutor

reflow.executors.flux.FluxExecutor(mode='flux', flux='flux', python='', config=None)

Bases: Executor

Submit, cancel, and query Flux jobs.

Parameters:

Name Type Description Default
mode str

"flux" for live submission, "dry-run" for logging only.

'flux'
flux str

Path to the flux binary.

'flux'
python str

Python interpreter for worker jobs.

''
config Config or None

User configuration.

None
Source code in src/reflow/executors/flux.py
def __init__(
    self,
    mode: str = "flux",
    flux: str = "flux",
    python: str = "",
    config: Config | None = None,
) -> None:
    self.mode = mode
    self.flux = flux
    super().__init__(python=python, config=config)

from_environment(config=None) classmethod

Build from REFLOW_* environment variables and config.

Source code in src/reflow/executors/flux.py
@classmethod
def from_environment(cls, config: Config | None = None) -> FluxExecutor:
    """Build from ``REFLOW_*`` environment variables and config."""
    cfg = config or Config()
    mode = os.getenv("REFLOW_MODE") or cfg.executor_mode or "flux"
    python = os.getenv("REFLOW_PYTHON") or cfg.executor_python or sys.executable
    return cls(
        mode=mode.strip().lower(),
        flux=os.getenv("REFLOW_FLUX", "flux"),
        python=python,
        config=cfg,
    )

dependency_options(job_ids)

Flux dependency: --dependency=afterany:ID1,ID2.

Source code in src/reflow/executors/flux.py
def dependency_options(self, job_ids: list[str]) -> dict[str, str]:
    """Flux dependency: ``--dependency=afterany:ID1,ID2``."""
    return {"dependency": "afterany:" + ",".join(job_ids)}

reflow.executors.local.LocalExecutor

reflow.executors.local.LocalExecutor(capture_output=False)

Bases: Executor

Run tasks as local subprocesses (synchronous).

Source code in src/reflow/executors/local.py
def __init__(self, capture_output: bool = False) -> None:
    super().__init__()
    self.capture_output = capture_output
    self._procs: dict[str, subprocess.CompletedProcess[str] | None] = {}

reflow.executors.helpers

reflow.executors.helpers.default_executor(config)

Return the default executor based on config mode.

Reads config.executor_mode (or the REFLOW_MODE env var) to decide which backend to instantiate. Falls back to :class:~reflow.executors.slurm.SlurmExecutor when no mode is configured.

Parameters:

Name Type Description Default
config Config

User configuration.

required

Returns:

Type Description
Executor

An executor instance for the configured backend.

Source code in src/reflow/executors/helpers.py
def default_executor(config: Any) -> Any:
    """Return the default executor based on config mode.

    Reads ``config.executor_mode`` (or the ``REFLOW_MODE`` env var)
    to decide which backend to instantiate.  Falls back to
    :class:`~reflow.executors.slurm.SlurmExecutor` when no mode is
    configured.

    Parameters
    ----------
    config : Config
        User configuration.

    Returns
    -------
    Executor
        An executor instance for the configured backend.

    """
    mode = (
        (os.getenv("REFLOW_MODE") or getattr(config, "executor_mode", None) or "")
        .strip()
        .lower()
    )

    if mode in ("qsub-pbs", "qsub"):
        return PBSExecutor.from_environment(config)

    if mode == "bsub":
        return LSFExecutor.from_environment(config)

    if mode == "qsub-sge":
        return SGEExecutor.from_environment(config)

    if mode == "flux":
        return FluxExecutor.from_environment(config)

    # Default: Slurm (covers "sbatch", "dry-run", and empty/unset).
    return SlurmExecutor.from_environment(config)

reflow.executors.helpers.resolve_executor(executor)

Resolve an executor shorthand string to an instance.

Parameters:

Name Type Description Default
executor Executor, str, or None

Supported shorthands: "local", "pbs", "lsf", "sge", "flux".

required
Source code in src/reflow/executors/helpers.py
def resolve_executor(executor: Any) -> Any:
    """Resolve an executor shorthand string to an instance.

    Parameters
    ----------
    executor : Executor, str, or None
        Supported shorthands: ``"local"``, ``"pbs"``, ``"lsf"``,
        ``"sge"``, ``"flux"``.

    """
    if isinstance(executor, str):
        name = executor.strip().lower()
        if name == "local":
            return LocalExecutor()
        if name == "pbs":
            return PBSExecutor.from_environment()
        if name == "lsf":
            return LSFExecutor.from_environment()
        if name == "sge":
            return SGEExecutor.from_environment()
        if name == "flux":
            return FluxExecutor.from_environment()
        raise ValueError(
            f"Unknown executor shorthand {executor!r}.  "
            f"Use 'local', 'pbs', 'lsf', 'sge', or 'flux'."
        )
    return executor