Skip to content

Stores

reflow.stores.Store

reflow.stores.Store

Bases: ABC

Abstract manifest store.

init() abstractmethod

Create tables / schema if they do not exist.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def init(self) -> None:
    """Create tables / schema if they do not exist."""

close() abstractmethod

Release any held resources.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def close(self) -> None:
    """Release any held resources."""

insert_run(run_id, graph_name, user_id, parameters) abstractmethod

Insert a new run.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_run(
    self,
    run_id: str,
    graph_name: str,
    user_id: str,
    parameters: dict[str, Any],
) -> None:
    """Insert a new run."""

get_run(run_id) abstractmethod

Load a single run record.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_run(self, run_id: str) -> dict[str, Any] | None:
    """Load a single run record."""

get_run_parameters(run_id) abstractmethod

Load the run parameters. Raises KeyError if missing.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_run_parameters(self, run_id: str) -> dict[str, Any]:
    """Load the run parameters.  Raises KeyError if missing."""

update_run_status(run_id, status) abstractmethod

Update the top-level run status.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_run_status(self, run_id: str, status: RunState) -> None:
    """Update the top-level run status."""

list_runs(graph_name=None, user_id=None) abstractmethod

List runs, optionally filtered.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_runs(
    self,
    graph_name: str | None = None,
    user_id: str | None = None,
) -> list[dict[str, Any]]:
    """List runs, optionally filtered."""

insert_task_spec(run_id, task_name, is_array, config_json, dependencies) abstractmethod

Persist one task specification and its dependencies.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_task_spec(
    self,
    run_id: str,
    task_name: str,
    is_array: bool,
    config_json: dict[str, Any],
    dependencies: list[str],
) -> None:
    """Persist one task specification and its dependencies."""

list_task_dependencies(run_id, task_name) abstractmethod

Return upstream dependency names for a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_task_dependencies(self, run_id: str, task_name: str) -> list[str]:
    """Return upstream dependency names for a task."""

insert_task_instance(run_id, task_name, array_index, state, input_payload, identity='', input_hash='') abstractmethod

Insert one task instance, return its database id.

Parameters:

Name Type Description Default
run_id str

Uniq identifier for this job.

required
task_name str

Name of the job.

required
identity str

Full Merkle identity hash.

''
input_hash str

Hash of direct inputs only (without upstream hashes).

''
array_index int | None

Number of array job. If any.

required
state TaskState

Stats of the task.

required
input_payload dict[str, Any]

payload of the input jobs

required
identity str
''
input_hash str
''
Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def insert_task_instance(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None,
    state: TaskState,
    input_payload: dict[str, Any],
    identity: str = "",
    input_hash: str = "",
) -> int:
    """Insert one task instance, return its database id.

    Parameters
    ----------
    run_id: str
        Uniq identifier for this job.
    task_name: str
        Name of the job.
    identity : str
        Full Merkle identity hash.
    input_hash : str
        Hash of direct inputs only (without upstream hashes).
    array_index: int|None
        Number of array job. If any.
    state: TaskState
        Stats of the task.
    input_payload: dict
        payload of the input jobs
    identity: str
    input_hash: str

    """

get_task_instance(run_id, task_name, array_index) abstractmethod

Load one task instance.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_task_instance(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None,
) -> dict[str, Any] | None:
    """Load one task instance."""

list_task_instances(run_id, task_name=None, states=None) abstractmethod

List task instances with optional filters.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def list_task_instances(
    self,
    run_id: str,
    task_name: str | None = None,
    states: list[TaskState] | None = None,
) -> list[dict[str, Any]]:
    """List task instances with optional filters."""

count_task_instances(run_id, task_name) abstractmethod

Count instances of a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def count_task_instances(self, run_id: str, task_name: str) -> int:
    """Count instances of a task."""

get_singleton_output(run_id, task_name) abstractmethod

Load the output of a successful singleton task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_singleton_output(self, run_id: str, task_name: str) -> Any:
    """Load the output of a successful singleton task."""

get_array_outputs(run_id, task_name) abstractmethod

Load all outputs from a successful array task, ordered by index.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_array_outputs(self, run_id: str, task_name: str) -> list[Any]:
    """Load all outputs from a successful array task, ordered by index."""

get_output_hash(run_id, task_name, array_index=None) abstractmethod

Get the output hash for a specific successful task instance.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_output_hash(
    self,
    run_id: str,
    task_name: str,
    array_index: int | None = None,
) -> str:
    """Get the output hash for a specific successful task instance."""

get_all_output_hashes(run_id, task_name) abstractmethod

Get output hashes for all successful instances of a task.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def get_all_output_hashes(self, run_id: str, task_name: str) -> list[str]:
    """Get output hashes for all successful instances of a task."""

dependency_is_satisfied(run_id, task_name) abstractmethod

Check whether all instances of an upstream task succeeded.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def dependency_is_satisfied(self, run_id: str, task_name: str) -> bool:
    """Check whether all instances of an upstream task succeeded."""

update_task_submitted(run_id, task_name, job_id) abstractmethod

Mark pending/retrying instances as submitted.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_submitted(
    self,
    run_id: str,
    task_name: str,
    job_id: str,
) -> None:
    """Mark pending/retrying instances as submitted."""

update_task_running(instance_id) abstractmethod

Mark one instance as running.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_running(self, instance_id: int) -> None:
    """Mark one instance as running."""

update_task_success(instance_id, output, output_hash='') abstractmethod

Mark one instance as successful.

Parameters:

Name Type Description Default
output_hash str

Hash of the output value for downstream Merkle propagation.

''
instance_id int

Database id

required
output Any

Output for the job

required
output_hash str

hash of the output

''
Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_success(
    self,
    instance_id: int,
    output: Any,
    output_hash: str = "",
) -> None:
    """Mark one instance as successful.

    Parameters
    ----------
    output_hash : str
        Hash of the output value for downstream Merkle propagation.
    instance_id: int
        Database id
    output: Any
        Output for the job
    output_hash: str
        hash of the output

    """

update_task_failed(instance_id, error_text) abstractmethod

Mark one instance as failed.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_failed(self, instance_id: int, error_text: str) -> None:
    """Mark one instance as failed."""

update_task_cancelled(instance_id) abstractmethod

Mark one instance as cancelled.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def update_task_cancelled(self, instance_id: int) -> None:
    """Mark one instance as cancelled."""

mark_for_retry(instance_id) abstractmethod

Reset a failed/cancelled instance for retry.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def mark_for_retry(self, instance_id: int) -> None:
    """Reset a failed/cancelled instance for retry."""

find_cached(task_name, identity) abstractmethod

Find a successful task instance with a matching Merkle identity.

Searches across all runs. Returns the most recent match.

Parameters:

Name Type Description Default
task_name str

Task name.

required
identity str

Full Merkle identity hash.

required

Returns:

Type Description
dict[str, Any] or None

The cached instance row, or None if no match.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def find_cached(
    self,
    task_name: str,
    identity: str,
) -> dict[str, Any] | None:
    """Find a successful task instance with a matching Merkle identity.

    Searches across *all* runs.  Returns the most recent match.

    Parameters
    ----------
    task_name : str
        Task name.
    identity : str
        Full Merkle identity hash.

    Returns
    -------
    dict[str, Any] or None
        The cached instance row, or ``None`` if no match.

    """

task_state_summary(run_id) abstractmethod

Per-task state counts: {task: {state: count}}.

Source code in src/reflow/stores/__init__.py
@abc.abstractmethod
def task_state_summary(self, run_id: str) -> dict[str, dict[str, int]]:
    """Per-task state counts: ``{task: {state: count}}``."""

reflow.stores.sqlite.SqliteStore

reflow.stores.sqlite.SqliteStore(path, readonly=False)

Bases: Store

SQLite-backed manifest store.

Parameters:

Name Type Description Default
path Path or str

Path to the SQLite database file.

required
Source code in src/reflow/stores/sqlite.py
def __init__(self, path: Path | str, readonly: bool = False) -> None:
    self.path = Path(path)
    self.readonly = readonly
    if not readonly:
        self.path.parent.mkdir(parents=True, exist_ok=True)
    self._conn: sqlite3.Connection | None = None

for_run_dir(run_dir, readonly=False) classmethod

Create a store at <run_dir>/manifest.db.

This remains available for callers that want a run-local store, but the default reflow behaviour uses default instead so one manifest database can serve multiple run directories.

Source code in src/reflow/stores/sqlite.py
@classmethod
def for_run_dir(cls, run_dir: Path | str, readonly: bool = False) -> SqliteStore:
    """Create a store at ``<run_dir>/manifest.db``.

    This remains available for callers that want a run-local store,
    but the default reflow behaviour uses [`default`][default] instead so
    one manifest database can serve multiple run directories.
    """
    rd = Path(run_dir).expanduser().resolve()
    if not readonly:
        rd.mkdir(parents=True, exist_ok=True)
    return cls(rd / "manifest.db", readonly=readonly)

default_path(config=None) classmethod

Return the default shared manifest path.

By default this is the user cache directory, typically ~/.cache/reflow/manifest.db on Linux, unless overridden by config or REFLOW_STORE_PATH.

Source code in src/reflow/stores/sqlite.py
@classmethod
def default_path(cls, config: Config | None = None) -> Path:
    """Return the default shared manifest path.

    By default this is the user cache directory, typically
    ``~/.cache/reflow/manifest.db`` on Linux, unless overridden by
    config or ``REFLOW_STORE_PATH``.
    """
    cfg = config or Config()
    return Path(cfg.default_store_path).expanduser().resolve()

default(config=None, readonly=False) classmethod

Create a store using the shared default manifest path.

Source code in src/reflow/stores/sqlite.py
@classmethod
def default(
    cls, config: Config | None = None, readonly: bool = False
) -> SqliteStore:
    """Create a store using the shared default manifest path."""
    return cls(cls.default_path(config), readonly=readonly)

get_output_hash(run_id, task_name, array_index=None)

Get the output hash for a specific task instance.

Parameters:

Name Type Description Default
run_id str

Run identifier.

required
task_name str

Task name.

required
array_index int or None

Array index, or None for singleton.

None

Returns:

Type Description
str

Output hash, or empty string if not found.

Source code in src/reflow/stores/sqlite.py
def get_output_hash(
    self, run_id: str, task_name: str, array_index: int | None = None
) -> str:
    """Get the output hash for a specific task instance.

    Parameters
    ----------
    run_id : str
        Run identifier.
    task_name : str
        Task name.
    array_index : int or None
        Array index, or None for singleton.

    Returns
    -------
    str
        Output hash, or empty string if not found.

    """
    if array_index is None:
        row = self.conn.execute(
            "SELECT output_hash FROM task_instances "
            "WHERE run_id = ? AND task_name = ? AND "
            "array_index IS NULL AND state = ?",
            (run_id, task_name, TaskState.SUCCESS.value),
        ).fetchone()
    else:
        row = self.conn.execute(
            "SELECT output_hash FROM task_instances "
            "WHERE run_id = ? AND task_name = ? AND array_index = ? AND state = ?",
            (run_id, task_name, array_index, TaskState.SUCCESS.value),
        ).fetchone()
    return row[0] if row and row[0] else ""

get_all_output_hashes(run_id, task_name)

Get output hashes for all successful instances of a task.

Source code in src/reflow/stores/sqlite.py
def get_all_output_hashes(self, run_id: str, task_name: str) -> list[str]:
    """Get output hashes for all successful instances of a task."""
    rows = self.conn.execute(
        "SELECT output_hash FROM task_instances "
        "WHERE run_id = ? AND task_name = ? AND state = ? "
        "ORDER BY array_index",
        (run_id, task_name, TaskState.SUCCESS.value),
    ).fetchall()
    return [r[0] if r[0] else "" for r in rows]

find_cached_record(task_name, identity)

Find the most recent successful instance matching the identity.

Searches across all runs.

Source code in src/reflow/stores/sqlite.py
def find_cached_record(
    self,
    task_name: str,
    identity: str,
) -> TaskInstanceRecord | None:
    """Find the most recent successful instance matching the identity.

    Searches across all runs.
    """
    if not identity:
        return None
    row = self.conn.execute(
        "SELECT * FROM task_instances "
        "WHERE task_name = ? AND identity = ? AND state = ? "
        "ORDER BY created_at DESC LIMIT 1",
        (task_name, identity, TaskState.SUCCESS.value),
    ).fetchone()
    return self._decode_task_instance_record(row)

reflow.stores.records.RunRecord

reflow.stores.records.RunRecord(run_id, graph_name, user_id, created_at, status, parameters) dataclass

Typed representation of one workflow run row.

to_public_dict()

Return the legacy public dictionary shape.

Source code in src/reflow/stores/records.py
def to_public_dict(self) -> dict[str, Any]:
    """Return the legacy public dictionary shape."""
    return {
        "run_id": self.run_id,
        "graph_name": self.graph_name,
        "user_id": self.user_id,
        "created_at": self.created_at.isoformat(),
        "status": self.status.value,
        "parameters": self.parameters,
    }

reflow.stores.records.TaskSpecRecord

reflow.stores.records.TaskSpecRecord(run_id, task_name, is_array, config, dependencies) dataclass

Typed representation of one persisted task specification.

reflow.stores.records.TaskInstanceRecord

reflow.stores.records.TaskInstanceRecord(id, run_id, task_name, array_index, state, job_id, input, output, error_text, identity, input_hash, output_hash, created_at, updated_at) dataclass

Typed representation of one persisted task instance.

to_public_dict()

Return the legacy public dictionary shape.

Source code in src/reflow/stores/records.py
def to_public_dict(self) -> dict[str, Any]:
    """Return the legacy public dictionary shape."""
    return {
        "id": self.id,
        "run_id": self.run_id,
        "task_name": self.task_name,
        "array_index": self.array_index,
        "state": self.state.value,
        "job_id": self.job_id,
        "input": self.input,
        "output": self.output,
        "error_text": self.error_text,
        "identity": self.identity,
        "input_hash": self.input_hash,
        "output_hash": self.output_hash,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat(),
    }