Skip to content

State Machine

Task state transitions and DAG validation.

Formal task state machine definition and dependency graph validation.

This module defines the authoritative set of valid task state transitions and provides utilities for DAG (directed acyclic graph) validation of task dependencies. It is the source of truth for which (TaskStatus, TaskEvent) pairs are legal moves in the task lifecycle.

IMPORTANT: As of now, this state machine is used only for validation logging and lookups — the orchestrator does NOT enforce transitions through this module. All status changes go directly through db.update_task(). This means invalid transitions can occur in practice if the orchestrator has bugs. Enforcing transitions is a planned improvement.

See specs/models-and-state-machine.md for the full behavioral specification.

Classes

Functions

task_transition

task_transition(current: TaskStatus, event: TaskEvent) -> TaskStatus

Look up the target status for a given (current_status, event) pair.

Raises InvalidTransition if no such transition is defined.

Source code in src/state_machine.py
def task_transition(current: TaskStatus, event: TaskEvent) -> TaskStatus:
    """Look up the target status for a given (current_status, event) pair.

    Raises ``InvalidTransition`` if no such transition is defined.
    """
    key = (current, event)
    if key not in VALID_TASK_TRANSITIONS:
        raise InvalidTransition(current, event)
    return VALID_TASK_TRANSITIONS[key]

is_valid_status_transition

is_valid_status_transition(from_status: TaskStatus, to_status: TaskStatus) -> bool

Return True if transitioning from from_status to to_status is covered by at least one event in the state machine.

Source code in src/state_machine.py
def is_valid_status_transition(
    from_status: TaskStatus, to_status: TaskStatus
) -> bool:
    """Return *True* if transitioning from *from_status* to *to_status* is
    covered by at least one event in the state machine."""
    return (from_status, to_status) in VALID_STATUS_TRANSITIONS

validate_dag

validate_dag(deps: dict[str, set[str]]) -> None

Validate that the task dependency graph contains no cycles.

Uses a three-color DFS (white/gray/black) to detect back-edges. This is called when creating tasks with dependencies and when adding new dependency edges to prevent circular chains that would leave tasks stuck in DEFINED forever.

Raises CyclicDependencyError if a cycle is found.

Source code in src/state_machine.py
def validate_dag(deps: dict[str, set[str]]) -> None:
    """Validate that the task dependency graph contains no cycles.

    Uses a three-color DFS (white/gray/black) to detect back-edges. This is
    called when creating tasks with dependencies and when adding new dependency
    edges to prevent circular chains that would leave tasks stuck in DEFINED
    forever.

    Raises CyclicDependencyError if a cycle is found.
    """
    WHITE, GRAY, BLACK = 0, 1, 2
    all_nodes = set(deps.keys())
    for targets in deps.values():
        all_nodes.update(targets)

    color: dict[str, int] = {n: WHITE for n in all_nodes}

    def dfs(node: str) -> None:
        color[node] = GRAY
        for dep in deps.get(node, set()):
            if color[dep] == GRAY:
                raise CyclicDependencyError([node, dep])
            if color[dep] == WHITE:
                dfs(dep)
        color[node] = BLACK

    for node in all_nodes:
        if color[node] == WHITE:
            dfs(node)

validate_dag_with_new_edge

validate_dag_with_new_edge(deps: dict[str, set[str]], task_id: str, depends_on: str) -> None

Check that adding a dependency edge (task_id -> depends_on) won't create a cycle.

Makes a copy of the dependency graph, adds the proposed edge, and runs full DAG validation. Used by the command handler before persisting a new dependency to the database.

Source code in src/state_machine.py
def validate_dag_with_new_edge(
    deps: dict[str, set[str]], task_id: str, depends_on: str
) -> None:
    """Check that adding a dependency edge (task_id -> depends_on) won't create a cycle.

    Makes a copy of the dependency graph, adds the proposed edge, and runs
    full DAG validation. Used by the command handler before persisting a new
    dependency to the database.
    """
    new_deps = {k: set(v) for k, v in deps.items()}
    new_deps.setdefault(task_id, set()).add(depends_on)
    validate_dag(new_deps)