Skip to content

Other

reflow.signals.TaskInterrupted

reflow.signals.TaskInterrupted(signal_number)

Bases: Exception

Raised when a worker receives a termination signal.

Attributes:

Name Type Description
signal_number int

The signal that triggered the interruption.

signal_name str

Human-readable signal name.

Source code in src/reflow/signals.py
def __init__(self, signal_number: int) -> None:
    self.signal_number = signal_number
    self.signal_name = signal.Signals(signal_number).name
    super().__init__(
        f"Task interrupted by {self.signal_name} (signal {signal_number})"
    )

reflow.signals.graceful_shutdown

reflow.signals.graceful_shutdown()

Context manager that converts SIGTERM/SIGINT to TaskInterrupted.

Restores the original signal handlers on exit.

Examples:

>>> with graceful_shutdown():
...     result = expensive_computation()
Source code in src/reflow/signals.py
@contextmanager
def graceful_shutdown() -> Generator[None, None, None]:
    """Context manager that converts SIGTERM/SIGINT to TaskInterrupted.

    Restores the original signal handlers on exit.

    Examples
    --------
    >>> with graceful_shutdown():
    ...     result = expensive_computation()

    """
    old_sigterm = signal.getsignal(signal.SIGTERM)
    old_sigint = signal.getsignal(signal.SIGINT)

    signal.signal(signal.SIGTERM, _signal_handler)
    signal.signal(signal.SIGINT, _signal_handler)

    try:
        yield
    finally:
        signal.signal(signal.SIGTERM, old_sigterm)
        signal.signal(signal.SIGINT, old_sigint)

reflow.config.Config

reflow.config.Config(data=None)

Loaded user configuration.

Environment variables (REFLOW_*) override the config file. Scheduler-agnostic keys (partition / queue, account) are passed through to the active executor, which normalises them to the backend's native vocabulary via :meth:~reflow.executors.Executor._normalize_options.

Source code in src/reflow/config.py
def __init__(self, data: dict[str, Any] | None = None) -> None:
    self._data: dict[str, Any] = data or {}
    self.submit_options = {
        k: v
        for k, v in self._data.get("executor", {}).get("submit_options", {}).items()
    }

executor_submit_options property

Default submit options for task jobs.

Uses scheduler-agnostic canonical keys (partition, account, etc.) which the active executor normalises to its native vocabulary at submission time.

dispatch_submit_options property

Default submit options for the dispatch/coordinator job.

reflow.manifest.ManifestCodec

reflow.manifest.ManifestCodec(marker_key=_MARKER, schema_version=SCHEMA_VERSION) dataclass

Builtin-only JSON codec for manifest values.

The codec converts selected Python types to JSON-safe tagged structures and can reconstruct them on load.

dump_value(value)

Convert a Python value into a JSON-safe representation.

Source code in src/reflow/manifest.py
def dump_value(self, value: Any) -> Any:
    """Convert a Python value into a JSON-safe representation."""
    if isinstance(value, Enum):
        return {
            self.marker_key: True,
            _TYPE: "enum",
            _CLASS: self._qualname(type(value)),
            _VALUE: self.dump_value(value.value),
        }

    if isinstance(value, self._json_scalar_types):
        return value

    if isinstance(value, Path):
        return self._tag("path", str(value))

    if isinstance(value, datetime):
        return self._tag("datetime", value.isoformat())

    if isinstance(value, UUID):
        return self._tag("uuid", str(value))

    if is_dataclass(value) and not isinstance(value, type):
        payload = {
            name: self.dump_value(getattr(value, name))
            for name in (field.name for field in fields(value))
        }
        return {
            self.marker_key: True,
            _TYPE: "dataclass",
            _CLASS: self._qualname(type(value)),
            _FIELDS: payload,
        }

    if isinstance(value, tuple):
        return self._tag("tuple", [self.dump_value(item) for item in value])

    if isinstance(value, list):
        return [self.dump_value(item) for item in value]

    if isinstance(value, dict):
        bad_key = next((key for key in value if not isinstance(key, str)), None)
        if bad_key is not None:
            raise TypeError(
                "Manifest dictionaries must use string keys; "
                f"got key {bad_key!r} of type {type(bad_key).__name__}."
            )
        return {key: self.dump_value(item) for key, item in value.items()}

    raise TypeError(
        "Unsupported manifest value type: "
        f"{type(value).__module__}.{type(value).__qualname__}"
    )

load_value(value)

Reconstruct a Python value from a JSON-safe representation.

Source code in src/reflow/manifest.py
def load_value(self, value: Any) -> Any:
    """Reconstruct a Python value from a JSON-safe representation."""
    if isinstance(value, self._json_scalar_types):
        return value

    if isinstance(value, list):
        return [self.load_value(item) for item in value]

    if not isinstance(value, dict):
        raise TypeError(f"Unsupported manifest payload: {type(value).__name__}")

    if value.get(self.marker_key) is True:
        type_name = value.get(_TYPE)
        if type_name == "path":
            return Path(self._require(value, _VALUE, str))
        if type_name == "datetime":
            return datetime.fromisoformat(self._require(value, _VALUE, str))
        if type_name == "uuid":
            return UUID(self._require(value, _VALUE, str))
        if type_name == "tuple":
            items = self._require(value, _VALUE, list)
            return tuple(self.load_value(item) for item in items)
        if type_name == "enum":
            raw = self.load_value(value.get(_VALUE))
            cls = self._import_type(self._require(value, _CLASS, str))
            if cls is None or not issubclass(cls, Enum):
                return raw
            return cls(raw)
        if type_name == "dataclass":
            cls_name = self._require(value, _CLASS, str)
            payload = self._require(value, _FIELDS, dict)
            restored = {key: self.load_value(item) for key, item in payload.items()}
            cls = self._import_type(cls_name)
            if cls is None or not is_dataclass(cls):
                return restored
            return cls(**restored)
        raise TypeError(f"Unknown manifest payload type {type_name!r}")

    return {key: self.load_value(item) for key, item in value.items()}

dumps(value, *, pretty=False)

Serialise a Python value to JSON using manifest typing.

Source code in src/reflow/manifest.py
def dumps(self, value: Any, *, pretty: bool = False) -> str:
    """Serialise a Python value to JSON using manifest typing."""
    payload = self.dump_value(value)
    if pretty:
        return json.dumps(payload, sort_keys=True, indent=2)
    return json.dumps(payload, sort_keys=True, separators=(",", ":"))

loads(payload)

Load a value previously written with dumps.

Source code in src/reflow/manifest.py
def loads(self, payload: str | bytes | bytearray) -> Any:
    """Load a value previously written with [`dumps`][dumps]."""
    if isinstance(payload, (bytes, bytearray)):
        payload = payload.decode()
    return self.load_value(json.loads(payload))

canonical_dumps(value)

Return canonical JSON for hashing.

Source code in src/reflow/manifest.py
def canonical_dumps(self, value: Any) -> str:
    """Return canonical JSON for hashing."""
    return self.dumps(value, pretty=False)

reflow._types.TaskState

reflow._types.TaskState

Bases: str, Enum

Lifecycle state of a task instance.

terminal() classmethod

States that will not change without intervention.

Source code in src/reflow/_types.py
@classmethod
def terminal(cls) -> frozenset[TaskState]:
    """States that will not change without intervention."""
    return frozenset({cls.SUCCESS, cls.FAILED, cls.CANCELLED})

active() classmethod

States that represent in-progress work.

Source code in src/reflow/_types.py
@classmethod
def active(cls) -> frozenset[TaskState]:
    """States that represent in-progress work."""
    return frozenset({cls.PENDING, cls.SUBMITTED, cls.RUNNING, cls.RETRYING})

cancellable() classmethod

States from which cancellation is meaningful.

Source code in src/reflow/_types.py
@classmethod
def cancellable(cls) -> frozenset[TaskState]:
    """States from which cancellation is meaningful."""
    return frozenset({cls.PENDING, cls.SUBMITTED, cls.RUNNING})

retriable() classmethod

States from which a retry can be triggered.

Source code in src/reflow/_types.py
@classmethod
def retriable(cls) -> frozenset[TaskState]:
    """States from which a retry can be triggered."""
    return frozenset({cls.FAILED, cls.CANCELLED})

reflow._types.RunState

reflow._types.RunState

Bases: str, Enum

Top-level state of a workflow run.