Skip to content

Stores

reflow.stores.Store

reflow.stores.Store

Bases: ABC

Abstract manifest store.

init() abstractmethod

Create tables / schema if they do not exist.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def init(self) -> None:
    """Create tables / schema if they do not exist."""

close() abstractmethod

Release any held resources.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def close(self) -> None:
    """Release any held resources."""

insert_run(run_id, graph_name, user_id, parameters) abstractmethod

Insert a new run.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_run(
    self,
    run_id: str,
    graph_name: str,
    user_id: str,
    parameters: dict[str, Any],
) -> None:
    """Insert a new run."""

get_run(run_id) abstractmethod

Load a single run record.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_run(self, run_id: str) -> dict[str, Any] | None:
    """Load a single run record."""

get_run_parameters(run_id) abstractmethod

Load the run parameters. Raises KeyError if missing.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_run_parameters(self, run_id: str) -> dict[str, Any]:
    """Load the run parameters.  Raises KeyError if missing."""

update_run_status(run_id, status) abstractmethod

Update the top-level run status.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_run_status(self, run_id: str, status: RunState) -> None:
    """Update the top-level run status."""

list_runs(graph_name=None, user_id=None, *, limit=None, since=None, until=None, status=None) abstractmethod

List runs, optionally filtered.

Parameters:

Name Type Description Default
graph_name str or None

Filter by workflow name.

None
user_id str or None

Filter by user.

None
limit int or None

Maximum number of runs to return (most recent first).

None
since str or None

Only runs created at or after this ISO-8601 timestamp.

None
until str or None

Only runs created before this ISO-8601 timestamp.

None
status str or None

Only runs with this status (e.g. "RUNNING", "FAILED").

None
Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_runs(
    self,
    graph_name: str | None = None,
    user_id: str | None = None,
    *,
    limit: int | None = None,
    since: str | None = None,
    until: str | None = None,
    status: str | None = None,
) -> list[dict[str, Any]]:
    """List runs, optionally filtered.

    Parameters
    ----------
    graph_name : str or None
        Filter by workflow name.
    user_id : str or None
        Filter by user.
    limit : int or None
        Maximum number of runs to return (most recent first).
    since : str or None
        Only runs created at or after this ISO-8601 timestamp.
    until : str or None
        Only runs created before this ISO-8601 timestamp.
    status : str or None
        Only runs with this status (e.g. ``"RUNNING"``, ``"FAILED"``).

    """

insert_task_spec(run_id, task_name, is_array, config_json, dependencies) abstractmethod

Persist one task specification and its dependencies.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_task_spec(
    self,
    run_id: str,
    task_name: str,
    is_array: bool,
    config_json: dict[str, Any],
    dependencies: list[str],
) -> None:
    """Persist one task specification and its dependencies."""

list_task_dependencies(run_id, task_name) abstractmethod

Return upstream dependency names for a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_task_dependencies(self, run_id: str, task_name: str) -> list[str]:
    """Return upstream dependency names for a task."""

insert_task_instance(run_id, task_name, array_index, state, input_payload, identity='', input_hash='') abstractmethod

Insert one task instance, return its database id.

Parameters:

Name Type Description Default
run_id str

Uniq identifier for this job.

required
task_name str

Name of the job.

required
identity str

Full Merkle identity hash.

''
input_hash str

Hash of direct inputs only (without upstream hashes).

''
array_index int | None

Number of array job. If any.

required
state TaskState

Stats of the task.

required
input_payload dict[str, Any]

payload of the input jobs

required
identity str
''
input_hash str
''
Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_task_instance(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None,
    state: TaskState,
    input_payload: dict[str, Any],
    identity: str = "",
    input_hash: str = "",
) -> int:
    """Insert one task instance, return its database id.

    Parameters
    ----------
    run_id: str
        Uniq identifier for this job.
    task_name: str
        Name of the job.
    identity : str
        Full Merkle identity hash.
    input_hash : str
        Hash of direct inputs only (without upstream hashes).
    array_index: int|None
        Number of array job. If any.
    state: TaskState
        Stats of the task.
    input_payload: dict
        payload of the input jobs
    identity: str
    input_hash: str

    """

get_task_instance(run_id, task_name, array_index) abstractmethod

Load one task instance.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_task_instance(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None,
) -> dict[str, Any] | None:
    """Load one task instance."""

list_task_instances(run_id, task_name=None, states=None) abstractmethod

List task instances with optional filters.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_task_instances(
    self,
    run_id: str,
    task_name: str | None = None,
    states: list[TaskState] | None = None,
) -> list[dict[str, Any]]:
    """List task instances with optional filters."""

count_task_instances(run_id, task_name) abstractmethod

Count instances of a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def count_task_instances(self, run_id: str, task_name: str) -> int:
    """Count instances of a task."""

get_singleton_output(run_id, task_name) abstractmethod

Load the output of a successful singleton task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_singleton_output(self, run_id: str, task_name: str) -> Any:
    """Load the output of a successful singleton task."""

get_array_outputs(run_id, task_name) abstractmethod

Load all outputs from a successful array task, ordered by index.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_array_outputs(self, run_id: str, task_name: str) -> list[Any]:
    """Load all outputs from a successful array task, ordered by index."""

get_output_hash(run_id, task_name, array_index=None) abstractmethod

Get the output hash for a specific successful task instance.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_output_hash(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None = None,
) -> str:
    """Get the output hash for a specific successful task instance."""

get_all_output_hashes(run_id, task_name) abstractmethod

Get output hashes for all successful instances of a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_all_output_hashes(self, run_id: str, task_name: str) -> list[str]:
    """Get output hashes for all successful instances of a task."""

dependency_is_satisfied(run_id, task_name) abstractmethod

Check whether all instances of an upstream task succeeded.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def dependency_is_satisfied(self, run_id: str, task_name: str) -> bool:
    """Check whether all instances of an upstream task succeeded."""

update_task_submitted(run_id, task_name, job_id) abstractmethod

Mark pending/retrying instances as submitted.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_submitted(
    self,
    run_id: str,
    task_name: str,
    job_id: str,
) -> None:
    """Mark pending/retrying instances as submitted."""

update_task_running(instance_id) abstractmethod

Mark one instance as running.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_running(self, instance_id: int) -> None:
    """Mark one instance as running."""

update_task_success(instance_id, output, output_hash='') abstractmethod

Mark one instance as successful.

Parameters:

Name Type Description Default
output_hash str

Hash of the output value for downstream Merkle propagation.

''
instance_id int

Database id

required
output Any

Output for the job

required
output_hash str

hash of the output

''
Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_success(
    self,
    instance_id: int,
    output: Any,
    output_hash: str = "",
) -> None:
    """Mark one instance as successful.

    Parameters
    ----------
    output_hash : str
        Hash of the output value for downstream Merkle propagation.
    instance_id: int
        Database id
    output: Any
        Output for the job
    output_hash: str
        hash of the output

    """

update_task_failed(instance_id, error_text) abstractmethod

Mark one instance as failed.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_failed(self, instance_id: int, error_text: str) -> None:
    """Mark one instance as failed."""

update_task_cancelled(instance_id) abstractmethod

Mark one instance as cancelled.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_cancelled(self, instance_id: int) -> None:
    """Mark one instance as cancelled."""

mark_for_retry(instance_id) abstractmethod

Reset a failed/cancelled instance for retry.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def mark_for_retry(self, instance_id: int) -> None:
    """Reset a failed/cancelled instance for retry."""

find_cached(task_name, identity) abstractmethod

Find a successful task instance with a matching Merkle identity.

Searches across all runs. Returns the most recent match.

Parameters:

Name Type Description Default
task_name str

Task name.

required
identity str

Full Merkle identity hash.

required

Returns:

Type Description
dict[str, Any] or None

The cached instance row, or None if no match.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def find_cached(
    self,
    task_name: str,
    identity: str,
) -> dict[str, Any] | None:
    """Find a successful task instance with a matching Merkle identity.

    Searches across *all* runs.  Returns the most recent match.

    Parameters
    ----------
    task_name : str
        Task name.
    identity : str
        Full Merkle identity hash.

    Returns
    -------
    dict[str, Any] or None
        The cached instance row, or ``None`` if no match.

    """

task_state_summary(run_id) abstractmethod

Per-task state counts: {task: {state: count}}.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def task_state_summary(self, run_id: str) -> dict[str, dict[str, int]]:
    """Per-task state counts: ``{task: {state: count}}``."""

reflow.stores.sqlite.SqliteStore

reflow.stores.sqlite.SqliteStore(path, readonly=False)

Bases: Store

SQLite-backed manifest store.

Parameters:

Name Type Description Default
path Path or str

Path to the SQLite database file.

required
Source code in src/reflow/stores/sqlite.py
def __init__(self, path: Path | str, readonly: bool = False) -> None:
    self.path = Path(path)
    self.readonly = readonly
    if not readonly:
        self.path.parent.mkdir(parents=True, exist_ok=True)
    self._conn: sqlite3.Connection | None = None

for_run_dir(run_dir, readonly=False) classmethod

Create a store at <run_dir>/manifest.db.

This remains available for callers that want a run-local store, but the default reflow behaviour uses default instead so one manifest database can serve multiple run directories.

Source code in src/reflow/stores/sqlite.py
@classmethod
def for_run_dir(cls, run_dir: Path | str, readonly: bool = False) -> SqliteStore:
    """Create a store at ``<run_dir>/manifest.db``.

    This remains available for callers that want a run-local store,
    but the default reflow behaviour uses [`default`][default] instead so
    one manifest database can serve multiple run directories.
    """
    rd = Path(run_dir).expanduser().resolve()
    if not readonly:
        rd.mkdir(parents=True, exist_ok=True)
    return cls(rd / "manifest.db", readonly=readonly)

default_path(config=None) classmethod

Return the default shared manifest path.

By default this is the user cache directory, typically ~/.cache/reflow/manifest.db on Linux, unless overridden by config or REFLOW_STORE_PATH.

Source code in src/reflow/stores/sqlite.py
@classmethod
def default_path(cls, config: Config | None = None) -> Path:
    """Return the default shared manifest path.

    By default this is the user cache directory, typically
    ``~/.cache/reflow/manifest.db`` on Linux, unless overridden by
    config or ``REFLOW_STORE_PATH``.
    """
    cfg = config or Config()
    return Path(cfg.default_store_path).expanduser().resolve()

default(config=None, readonly=False) classmethod

Create a store using the shared default manifest path.

Source code in src/reflow/stores/sqlite.py
@classmethod
def default(
    cls, config: Config | None = None, readonly: bool = False
) -> SqliteStore:
    """Create a store using the shared default manifest path."""
    return cls(cls.default_path(config), readonly=readonly)

get_output_hash(run_id, task_name, array_index=None)

Get the output hash for a specific task instance.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
task_name str

Task name.

required
array_index int or None

Array index, or None for singleton.

None

Returns:

Type Description
str

Output hash, or empty string if not found.

Source code in src/reflow/stores/sqlite.py
def get_output_hash(
    self, run_id: str, task_name: str, array_index: int | None = None
) -> str:
    """Get the output hash for a specific task instance.

    Parameters
    ----------
    run_id : str
        Run identifier.
    task_name : str
        Task name.
    array_index : int or None
        Array index, or None for singleton.

    Returns
    -------
    str
        Output hash, or empty string if not found.

    """
    if array_index is None:
        row = self.conn.execute(
            "SELECT output_hash FROM task_instances "
            "WHERE run_id = ? AND task_name = ? AND "
            "array_index IS NULL AND state = ?",
            (run_id, task_name, TaskState.SUCCESS.value),
        ).fetchone()
    else:
        row = self.conn.execute(
            "SELECT output_hash FROM task_instances "
            "WHERE run_id = ? AND task_name = ? AND array_index = ? AND state = ?",
            (run_id, task_name, array_index, TaskState.SUCCESS.value),
        ).fetchone()
    return row[0] if row and row[0] else ""

get_all_output_hashes(run_id, task_name)

Get output hashes for all successful instances of a task.

Source code in src/reflow/stores/sqlite.py
def get_all_output_hashes(self, run_id: str, task_name: str) -> list[str]:
    """Get output hashes for all successful instances of a task."""
    rows = self.conn.execute(
        "SELECT output_hash FROM task_instances "
        "WHERE run_id = ? AND task_name = ? AND state = ? "
        "ORDER BY array_index",
        (run_id, task_name, TaskState.SUCCESS.value),
    ).fetchall()
    return [r[0] if r[0] else "" for r in rows]

find_cached_record(task_name, identity)

Find the most recent successful instance matching the identity.

Searches across all runs.

Source code in src/reflow/stores/sqlite.py
def find_cached_record(
    self,
    task_name: str,
    identity: str,
) -> TaskInstanceRecord | None:
    """Find the most recent successful instance matching the identity.

    Searches across all runs.
    """
    if not identity:
        return None
    row = self.conn.execute(
        "SELECT * FROM task_instances "
        "WHERE task_name = ? AND identity = ? AND state = ? "
        "ORDER BY created_at DESC LIMIT 1",
        (task_name, identity, TaskState.SUCCESS.value),
    ).fetchone()
    return self._decode_task_instance_record(row)

reflow.stores.records.RunRecord

reflow.stores.records.RunRecord(run_id, graph_name, user_id, created_at, status, parameters) dataclass

Typed representation of one workflow run row.

to_public_dict()

Return the legacy public dictionary shape.

Source code in src/reflow/stores/records.py
def to_public_dict(self) -> dict[str, Any]:
    """Return the legacy public dictionary shape."""
    return {
        "run_id": self.run_id,
        "graph_name": self.graph_name,
        "user_id": self.user_id,
        "created_at": self.created_at.isoformat(),
        "status": self.status.value,
        "parameters": self.parameters,
    }

reflow.stores.records.TaskSpecRecord

reflow.stores.records.TaskSpecRecord(run_id, task_name, is_array, config, dependencies) dataclass

Typed representation of one persisted task specification.

reflow.stores.records.TaskInstanceRecord

reflow.stores.records.TaskInstanceRecord(id, run_id, task_name, array_index, state, job_id, input, output, error_text, identity, input_hash, output_hash, created_at, updated_at) dataclass

Typed representation of one persisted task instance.

to_public_dict()

Return the legacy public dictionary shape.

Source code in src/reflow/stores/records.py
def to_public_dict(self) -> dict[str, Any]:
    """Return the legacy public dictionary shape."""
    return {
        "id": self.id,
        "run_id": self.run_id,
        "task_name": self.task_name,
        "array_index": self.array_index,
        "state": self.state.value,
        "job_id": self.job_id,
        "input": self.input,
        "output": self.output,
        "error_text": self.error_text,
        "identity": self.identity,
        "input_hash": self.input_hash,
        "output_hash": self.output_hash,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat(),
    }