Skip to content

Database

SQLite persistence layer with async operations via aiosqlite.

Persistence layer for the agent queue system.

Single SQLite database using WAL journal mode for concurrent reads from the orchestrator loop, Discord bot, and chat agent without blocking writers.

Follows the repository pattern -- all SQL is encapsulated here. The rest of the codebase interacts with the database exclusively through the :class:Database class, receiving and returning domain model dataclasses.

The schema covers 14 tables organized around the core domain concepts: projects, repos, tasks (with dependencies, criteria, context, tools, and results), agents, token_ledger, events, rate_limits, hooks, hook_runs, and system_config.

Migrations are applied as idempotent ALTER TABLE ADD COLUMN statements during initialization. If a column already exists the error is silently caught, so migrations are safe to re-run on every startup.

See specs/database.md for the full schema and behavioral specification.

Classes

Database

Database(path: str)

Async SQLite persistence layer implementing the repository pattern.

All database access in the system goes through this class. It owns the connection lifecycle, schema creation, migrations, and provides typed CRUD methods that accept and return domain dataclasses from :mod:src.models.

The connection uses WAL journal mode and has foreign keys enabled, so referential integrity is enforced at the database level. Row factory is set to aiosqlite.Row for dict-like column access.

State transitions go through :meth:transition_task, which validates against the state machine but always applies the update (logging-only enforcement) to avoid blocking production on unexpected edge cases.

Source code in src/database.py
def __init__(self, path: str):
    self._path = path
    self._db: aiosqlite.Connection | None = None

Functions

update_repo async
update_repo(repo_id: str, **kwargs) -> None

Update repo config fields (e.g. default_branch, url).

Source code in src/database.py
async def update_repo(self, repo_id: str, **kwargs) -> None:
    """Update repo config fields (e.g. default_branch, url)."""
    sets = []
    vals = []
    for key, value in kwargs.items():
        if isinstance(value, RepoSourceType):
            value = value.value
        sets.append(f"{key} = ?")
        vals.append(value)
    vals.append(repo_id)
    await self._db.execute(
        f"UPDATE repos SET {', '.join(sets)} WHERE id = ?", vals
    )
    await self._db.commit()
list_active_tasks async
list_active_tasks(project_id: str | None = None, exclude_statuses: set[TaskStatus] | None = None) -> list[Task]

List non-terminal tasks, optionally filtered by project.

Unlike :meth:list_tasks, this method performs status filtering at the SQL level so the database only returns actionable rows. This is more efficient for cross-project overviews where the majority of historical tasks may be completed.

Parameters

project_id: Optional project filter. When None, tasks from all projects are returned. exclude_statuses: Set of :class:TaskStatus values to exclude. Defaults to COMPLETED only — FAILED and BLOCKED tasks are kept visible since they still need attention.

Source code in src/database.py
async def list_active_tasks(
    self,
    project_id: str | None = None,
    exclude_statuses: set[TaskStatus] | None = None,
) -> list[Task]:
    """List non-terminal tasks, optionally filtered by project.

    Unlike :meth:`list_tasks`, this method performs status filtering at the
    SQL level so the database only returns actionable rows.  This is more
    efficient for cross-project overviews where the majority of historical
    tasks may be completed.

    Parameters
    ----------
    project_id:
        Optional project filter.  When *None*, tasks from **all** projects
        are returned.
    exclude_statuses:
        Set of :class:`TaskStatus` values to exclude.  Defaults to
        COMPLETED only — FAILED and BLOCKED tasks are kept visible
        since they still need attention.
    """
    if exclude_statuses is None:
        exclude_statuses = {
            TaskStatus.COMPLETED,
        }

    conditions: list[str] = []
    vals: list = []

    if exclude_statuses:
        placeholders = ", ".join("?" for _ in exclude_statuses)
        conditions.append(f"status NOT IN ({placeholders})")
        vals.extend(s.value for s in exclude_statuses)

    if project_id:
        conditions.append("project_id = ?")
        vals.append(project_id)

    where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    cursor = await self._db.execute(
        f"SELECT * FROM tasks {where} ORDER BY priority ASC, created_at ASC",
        vals,
    )
    rows = await cursor.fetchall()
    return [self._row_to_task(r) for r in rows]
list_active_tasks_all_projects async
list_active_tasks_all_projects() -> list[Task]

Return all non-completed tasks across every project.

Only COMPLETED tasks are excluded — FAILED and BLOCKED tasks are kept visible since they still need attention. Results are ordered by project_id first (so the caller can group by project) then by priority within each project.

Source code in src/database.py
async def list_active_tasks_all_projects(self) -> list[Task]:
    """Return all non-completed tasks across every project.

    Only COMPLETED tasks are excluded — FAILED and BLOCKED tasks are
    kept visible since they still need attention.  Results are ordered
    by project_id first (so the caller can group by project) then by
    priority within each project.
    """
    terminal = (
        TaskStatus.COMPLETED.value,
    )
    placeholders = ", ".join("?" for _ in terminal)
    cursor = await self._db.execute(
        f"SELECT * FROM tasks WHERE status NOT IN ({placeholders}) "
        "ORDER BY project_id ASC, priority ASC, created_at ASC",
        list(terminal),
    )
    rows = await cursor.fetchall()
    return [self._row_to_task(r) for r in rows]
count_tasks_by_status async
count_tasks_by_status(project_id: str | None = None) -> dict[str, int]

Return a {status_value: count} mapping for quick summary stats.

Useful for reporting how many tasks were hidden when filtering.

Source code in src/database.py
async def count_tasks_by_status(
    self,
    project_id: str | None = None,
) -> dict[str, int]:
    """Return a {status_value: count} mapping for quick summary stats.

    Useful for reporting how many tasks were hidden when filtering.
    """
    conditions: list[str] = []
    vals: list = []
    if project_id:
        conditions.append("project_id = ?")
        vals.append(project_id)
    where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    cursor = await self._db.execute(
        f"SELECT status, COUNT(*) as cnt FROM tasks {where} GROUP BY status",
        vals,
    )
    rows = await cursor.fetchall()
    return {r["status"]: r["cnt"] for r in rows}
transition_task async
transition_task(task_id: str, new_status: TaskStatus, *, context: str = '', **kwargs) -> None

Update task status with state-machine validation.

Fetches the current status, checks it against the formal state machine defined in :mod:src.state_machine, and logs a warning if the transition is not valid. The update is always applied regardless of validation outcome (logging-only enforcement).

This deliberate design choice keeps production running when edge cases produce unexpected transitions (e.g. a race between the orchestrator loop and a Discord command). The warnings surface in logs for investigation without blocking task progress.

If new_status equals the current status, no transition validation occurs -- only the extra kwargs are applied (useful for updating metadata without changing state).

Any extra kwargs (e.g. assigned_agent_id, retry_count, resume_after) are forwarded to :meth:update_task.

Source code in src/database.py
async def transition_task(
    self,
    task_id: str,
    new_status: TaskStatus,
    *,
    context: str = "",
    **kwargs,
) -> None:
    """Update task status with state-machine validation.

    Fetches the current status, checks it against the formal state
    machine defined in :mod:`src.state_machine`, and logs a warning if
    the transition is not valid.  The update is **always applied**
    regardless of validation outcome (logging-only enforcement).

    This deliberate design choice keeps production running when edge
    cases produce unexpected transitions (e.g. a race between the
    orchestrator loop and a Discord command). The warnings surface in
    logs for investigation without blocking task progress.

    If *new_status* equals the current status, no transition validation
    occurs -- only the extra *kwargs* are applied (useful for updating
    metadata without changing state).

    Any extra *kwargs* (e.g. ``assigned_agent_id``, ``retry_count``,
    ``resume_after``) are forwarded to :meth:`update_task`.
    """
    task = await self.get_task(task_id)
    if task is None:
        logger.warning(
            "transition_task: task '%s' not found, cannot validate", task_id
        )
        # Still attempt the update in case of a race condition
        await self.update_task(task_id, status=new_status, **kwargs)
        return

    current_status = task.status

    if current_status == new_status:
        # Same-status "transition" — skip validation, just apply kwargs.
        if kwargs:
            await self.update_task(task_id, **kwargs)
        return

    if not is_valid_status_transition(current_status, new_status):
        ctx = f" ({context})" if context else ""
        logger.warning(
            "Invalid task status transition: %s -> %s for task '%s'%s",
            current_status.value,
            new_status.value,
            task_id,
            ctx,
        )

    await self.update_task(task_id, status=new_status, **kwargs)
get_task_updated_at async
get_task_updated_at(task_id: str) -> float | None

Return the updated_at timestamp for a task, or None.

Source code in src/database.py
async def get_task_updated_at(self, task_id: str) -> float | None:
    """Return the ``updated_at`` timestamp for a task, or *None*."""
    cursor = await self._db.execute(
        "SELECT updated_at FROM tasks WHERE id = ?", (task_id,)
    )
    row = await cursor.fetchone()
    return row["updated_at"] if row else None
get_task_created_at async
get_task_created_at(task_id: str) -> float | None

Return the created_at timestamp for a task, or None.

Source code in src/database.py
async def get_task_created_at(self, task_id: str) -> float | None:
    """Return the ``created_at`` timestamp for a task, or *None*."""
    cursor = await self._db.execute(
        "SELECT created_at FROM tasks WHERE id = ?", (task_id,)
    )
    row = await cursor.fetchone()
    return row["created_at"] if row else None
add_task_context async
add_task_context(task_id: str, *, type: str, label: str, content: str) -> str

Insert a task_context row and return its generated ID.

Source code in src/database.py
async def add_task_context(
    self, task_id: str, *, type: str, label: str, content: str
) -> str:
    """Insert a task_context row and return its generated ID."""
    ctx_id = str(uuid.uuid4())[:12]
    await self._db.execute(
        "INSERT INTO task_context (id, task_id, type, label, content) "
        "VALUES (?, ?, ?, ?, ?)",
        (ctx_id, task_id, type, label, content),
    )
    await self._db.commit()
    return ctx_id
get_task_contexts async
get_task_contexts(task_id: str) -> list[dict]

Return all task_context rows for task_id as dicts.

Source code in src/database.py
async def get_task_contexts(self, task_id: str) -> list[dict]:
    """Return all task_context rows for *task_id* as dicts."""
    cursor = await self._db.execute(
        "SELECT id, task_id, type, label, content FROM task_context WHERE task_id = ?",
        (task_id,),
    )
    rows = await cursor.fetchall()
    return [dict(r) for r in rows]
get_task_tree async
get_task_tree(root_task_id: str) -> dict | None

Return a nested dict representing the full task hierarchy.

The root task is fetched by root_task_id, then all descendants are collected recursively via :meth:get_subtasks.

The returned structure looks like::

{
    "task": <Task>,          # the root Task object
    "children": [            # list of child sub-trees
        {"task": <Task>, "children": [...]},
        ...
    ],
}

Uses :meth:get_subtasks as the building block and recurses through all descendants. Returns None if root_task_id does not exist in the database.

Source code in src/database.py
async def get_task_tree(self, root_task_id: str) -> dict | None:
    """Return a nested dict representing the full task hierarchy.

    The root task is fetched by *root_task_id*, then all descendants are
    collected recursively via :meth:`get_subtasks`.

    The returned structure looks like::

        {
            "task": <Task>,          # the root Task object
            "children": [            # list of child sub-trees
                {"task": <Task>, "children": [...]},
                ...
            ],
        }

    Uses :meth:`get_subtasks` as the building block and recurses
    through all descendants.  Returns ``None`` if *root_task_id*
    does not exist in the database.
    """
    root = await self.get_task(root_task_id)
    if root is None:
        return None

    async def _build_subtree(task: Task) -> dict:
        children = await self.get_subtasks(task.id)
        # Sort children by priority then creation order for deterministic output
        child_nodes = []
        for child in children:
            child_nodes.append(await _build_subtree(child))
        return {"task": task, "children": child_nodes}

    return await _build_subtree(root)
get_parent_tasks async
get_parent_tasks(project_id: str) -> list[Task]

Return top-level tasks for a project (those with no parent).

A "parent task" here means a task whose parent_task_id is NULL -- i.e. it is not a subtask of any other task. Results are ordered by priority ascending, then creation time ascending, matching :meth:list_tasks.

Source code in src/database.py
async def get_parent_tasks(self, project_id: str) -> list[Task]:
    """Return top-level tasks for a project (those with no parent).

    A "parent task" here means a task whose ``parent_task_id`` is NULL --
    i.e. it is not a subtask of any other task.  Results are ordered by
    priority ascending, then creation time ascending, matching
    :meth:`list_tasks`.
    """
    cursor = await self._db.execute(
        "SELECT * FROM tasks WHERE project_id = ? AND parent_task_id IS NULL "
        "ORDER BY priority ASC, created_at ASC",
        (project_id,),
    )
    rows = await cursor.fetchall()
    return [self._row_to_task(r) for r in rows]
are_dependencies_met async
are_dependencies_met(task_id: str) -> bool

Check whether all upstream dependencies of a task are satisfied.

Returns True if every task that task_id depends on has reached COMPLETED status. Also returns True if the task has no dependencies at all (vacuous truth). This is the gate that controls the DEFINED -> READY promotion in the orchestrator loop.

Source code in src/database.py
async def are_dependencies_met(self, task_id: str) -> bool:
    """Check whether all upstream dependencies of a task are satisfied.

    Returns True if every task that ``task_id`` depends on has reached
    COMPLETED status.  Also returns True if the task has no dependencies
    at all (vacuous truth).  This is the gate that controls the
    DEFINED -> READY promotion in the orchestrator loop.
    """
    cursor = await self._db.execute(
        "SELECT d.depends_on_task_id, t.status "
        "FROM task_dependencies d "
        "JOIN tasks t ON t.id = d.depends_on_task_id "
        "WHERE d.task_id = ?",
        (task_id,),
    )
    rows = await cursor.fetchall()
    return all(r["status"] == TaskStatus.COMPLETED.value for r in rows)
get_stuck_defined_tasks async
get_stuck_defined_tasks(threshold_seconds: int) -> list[Task]

Return DEFINED tasks that are truly stuck — blocked by a dependency in a terminal failure state (BLOCKED or FAILED).

A DEFINED task waiting on READY/IN_PROGRESS/DEFINED dependencies is normal and will eventually be promoted once the upstream work completes. Only tasks whose dependency chain contains a BLOCKED or FAILED task are reported.

Source code in src/database.py
async def get_stuck_defined_tasks(self, threshold_seconds: int) -> list[Task]:
    """Return DEFINED tasks that are truly stuck — blocked by a dependency in
    a terminal failure state (BLOCKED or FAILED).

    A DEFINED task waiting on READY/IN_PROGRESS/DEFINED dependencies is normal
    and will eventually be promoted once the upstream work completes.  Only tasks
    whose dependency chain contains a BLOCKED or FAILED task are reported.
    """
    cursor = await self._db.execute(
        "SELECT DISTINCT t.* FROM tasks t "
        "JOIN task_dependencies d ON d.task_id = t.id "
        "JOIN tasks dep ON dep.id = d.depends_on_task_id "
        "WHERE t.status = ? AND dep.status IN (?, ?) "
        "ORDER BY t.created_at ASC",
        (
            TaskStatus.DEFINED.value,
            TaskStatus.BLOCKED.value,
            TaskStatus.FAILED.value,
        ),
    )
    rows = await cursor.fetchall()
    return [self._row_to_task(r) for r in rows]
get_blocking_dependencies async
get_blocking_dependencies(task_id: str) -> list[tuple[str, str, str]]

Return (dep_task_id, dep_title, dep_status) for unmet dependencies.

Only returns dependencies whose status is NOT COMPLETED.

Source code in src/database.py
async def get_blocking_dependencies(self, task_id: str) -> list[tuple[str, str, str]]:
    """Return (dep_task_id, dep_title, dep_status) for unmet dependencies.

    Only returns dependencies whose status is NOT COMPLETED.
    """
    cursor = await self._db.execute(
        "SELECT t.id, t.title, t.status "
        "FROM task_dependencies d "
        "JOIN tasks t ON t.id = d.depends_on_task_id "
        "WHERE d.task_id = ? AND t.status != ?",
        (task_id, TaskStatus.COMPLETED.value),
    )
    rows = await cursor.fetchall()
    return [(r["id"], r["title"], r["status"]) for r in rows]
get_dependents async
get_dependents(task_id: str) -> set[str]

Return task IDs that directly depend on task_id (reverse lookup).

Source code in src/database.py
async def get_dependents(self, task_id: str) -> set[str]:
    """Return task IDs that directly depend on *task_id* (reverse lookup)."""
    cursor = await self._db.execute(
        "SELECT task_id FROM task_dependencies WHERE depends_on_task_id = ?",
        (task_id,),
    )
    rows = await cursor.fetchall()
    return {r["task_id"] for r in rows}
get_dependency_map_for_tasks async
get_dependency_map_for_tasks(task_ids: list[str]) -> dict[str, dict]

Batch-fetch dependency data for multiple tasks in two queries.

Returns a mapping of task_id{"depends_on": [...], "blocks": [...]}. Each depends_on entry is {"id": ..., "status": ...}. Each blocks entry is a plain task ID string.

This replaces the previous N+1 pattern of calling get_dependencies() and get_dependents() per task, collapsing all lookups into two efficient queries regardless of the number of tasks.

Source code in src/database.py
async def get_dependency_map_for_tasks(
    self, task_ids: list[str],
) -> dict[str, dict]:
    """Batch-fetch dependency data for multiple tasks in two queries.

    Returns a mapping of ``task_id`` → ``{"depends_on": [...], "blocks": [...]}``.
    Each ``depends_on`` entry is ``{"id": ..., "status": ...}``.
    Each ``blocks`` entry is a plain task ID string.

    This replaces the previous N+1 pattern of calling ``get_dependencies()``
    and ``get_dependents()`` per task, collapsing all lookups into two
    efficient queries regardless of the number of tasks.
    """
    if not task_ids:
        return {}

    # Initialize result for all requested task IDs
    result: dict[str, dict] = {
        tid: {"depends_on": [], "blocks": []} for tid in task_ids
    }

    # Fetch all upstream dependencies (with status) in one query using a
    # JOIN so we don't need a follow-up get_task() per dependency.
    placeholders = ",".join("?" for _ in task_ids)
    cursor = await self._db.execute(
        "SELECT d.task_id, d.depends_on_task_id, t.status "
        "FROM task_dependencies d "
        "JOIN tasks t ON t.id = d.depends_on_task_id "
        f"WHERE d.task_id IN ({placeholders})",
        task_ids,
    )
    for row in await cursor.fetchall():
        tid = row["task_id"]
        if tid in result:
            result[tid]["depends_on"].append({
                "id": row["depends_on_task_id"],
                "status": row["status"],
            })

    # Fetch all downstream dependents (reverse lookup) in one query.
    cursor = await self._db.execute(
        "SELECT d.depends_on_task_id, d.task_id "
        "FROM task_dependencies d "
        f"WHERE d.depends_on_task_id IN ({placeholders})",
        task_ids,
    )
    for row in await cursor.fetchall():
        blocked_by = row["depends_on_task_id"]
        if blocked_by in result:
            result[blocked_by]["blocks"].append(row["task_id"])

    # Sort blocks lists for stable output
    for entry in result.values():
        entry["blocks"] = sorted(entry["blocks"])

    return result
remove_dependency async
remove_dependency(task_id: str, depends_on: str) -> None

Remove a single dependency edge.

Source code in src/database.py
async def remove_dependency(self, task_id: str, depends_on: str) -> None:
    """Remove a single dependency edge."""
    await self._db.execute(
        "DELETE FROM task_dependencies "
        "WHERE task_id = ? AND depends_on_task_id = ?",
        (task_id, depends_on),
    )
    await self._db.commit()
remove_all_dependencies_on async
remove_all_dependencies_on(depends_on_task_id: str) -> None

Remove all dependency edges pointing to a given task.

Source code in src/database.py
async def remove_all_dependencies_on(self, depends_on_task_id: str) -> None:
    """Remove all dependency edges pointing to a given task."""
    await self._db.execute(
        "DELETE FROM task_dependencies WHERE depends_on_task_id = ?",
        (depends_on_task_id,),
    )
    await self._db.commit()
delete_agent async
delete_agent(agent_id: str) -> None

Delete an agent and all dependent records.

Cascading order (children before parent): 1. token_ledger – immutable token-usage rows 2. task_results – execution-history rows 3. agent_workspaces – per-project workspace mappings (legacy) 4. workspaces – release locks (don't delete — workspaces belong to projects) 5. tasks.assigned_agent_id – NULLify (don't delete the tasks) 6. agents – the agent record itself

Source code in src/database.py
async def delete_agent(self, agent_id: str) -> None:
    """Delete an agent and all dependent records.

    Cascading order (children before parent):
    1. token_ledger  – immutable token-usage rows
    2. task_results  – execution-history rows
    3. agent_workspaces – per-project workspace mappings (legacy)
    4. workspaces – release locks (don't delete — workspaces belong to projects)
    5. tasks.assigned_agent_id – NULLify (don't delete the tasks)
    6. agents – the agent record itself
    """
    await self._db.execute(
        "DELETE FROM token_ledger WHERE agent_id = ?", (agent_id,),
    )
    await self._db.execute(
        "DELETE FROM task_results WHERE agent_id = ?", (agent_id,),
    )
    await self._db.execute(
        "DELETE FROM agent_workspaces WHERE agent_id = ?", (agent_id,),
    )
    # Release workspace locks — workspaces belong to the project, not the agent
    await self._db.execute(
        "UPDATE workspaces SET locked_by_agent_id = NULL, "
        "locked_by_task_id = NULL, locked_at = NULL "
        "WHERE locked_by_agent_id = ?",
        (agent_id,),
    )
    await self._db.execute(
        "UPDATE tasks SET assigned_agent_id = NULL WHERE assigned_agent_id = ?",
        (agent_id,),
    )
    await self._db.execute(
        "DELETE FROM agents WHERE id = ?", (agent_id,),
    )
    await self._db.commit()
get_workspace_by_name async
get_workspace_by_name(project_id: str, name: str) -> Workspace | None

Find a workspace by name within a project.

Source code in src/database.py
async def get_workspace_by_name(
    self, project_id: str, name: str,
) -> Workspace | None:
    """Find a workspace by name within a project."""
    cursor = await self._db.execute(
        "SELECT * FROM workspaces WHERE project_id = ? AND name = ?",
        (project_id, name),
    )
    row = await cursor.fetchone()
    if not row:
        return None
    return self._row_to_workspace(row)
acquire_workspace async
acquire_workspace(project_id: str, agent_id: str, task_id: str, preferred_workspace_id: str | None = None) -> Workspace | None

Atomically find an unlocked workspace for a project and lock it.

If preferred_workspace_id is given (e.g. a workspace known to contain a merge conflict), attempt to lock that specific workspace first. Falls back to any unlocked workspace if the preferred one is unavailable.

Returns the locked workspace, or None if all workspaces are locked.

Race-safety: Multiple coroutines may call this concurrently. The UPDATE uses WHERE locked_by_agent_id IS NULL as an optimistic lock, and we verify rowcount == 1 before returning. If another coroutine locked the row between our SELECT and UPDATE, we retry with the next available workspace instead of silently returning a workspace we don't actually hold.

Source code in src/database.py
async def acquire_workspace(
    self, project_id: str, agent_id: str, task_id: str,
    preferred_workspace_id: str | None = None,
) -> Workspace | None:
    """Atomically find an unlocked workspace for a project and lock it.

    If *preferred_workspace_id* is given (e.g. a workspace known to contain
    a merge conflict), attempt to lock that specific workspace first.  Falls
    back to any unlocked workspace if the preferred one is unavailable.

    Returns the locked workspace, or None if all workspaces are locked.

    Race-safety: Multiple coroutines may call this concurrently.  The
    UPDATE uses ``WHERE locked_by_agent_id IS NULL`` as an optimistic
    lock, and we verify ``rowcount == 1`` before returning.  If another
    coroutine locked the row between our SELECT and UPDATE, we retry
    with the next available workspace instead of silently returning a
    workspace we don't actually hold.
    """
    # Collect candidate workspace IDs, preferred first.
    candidate_ids: list[str] = []

    if preferred_workspace_id:
        cursor = await self._db.execute(
            "SELECT id FROM workspaces "
            "WHERE id = ? AND project_id = ? AND locked_by_agent_id IS NULL",
            (preferred_workspace_id, project_id),
        )
        row = await cursor.fetchone()
        if row:
            candidate_ids.append(row["id"])

    # All unlocked workspaces for the project (excluding preferred, already tried).
    cursor = await self._db.execute(
        "SELECT id FROM workspaces "
        "WHERE project_id = ? AND locked_by_agent_id IS NULL "
        "ORDER BY id",
        (project_id,),
    )
    for row in await cursor.fetchall():
        if row["id"] not in candidate_ids:
            candidate_ids.append(row["id"])

    if not candidate_ids:
        return None

    # Try each candidate until one is successfully locked.
    now = time.time()
    for ws_id in candidate_ids:
        # Re-fetch full row (it may have been locked between the list
        # query above and now by another coroutine).
        cursor = await self._db.execute(
            "SELECT * FROM workspaces WHERE id = ? AND locked_by_agent_id IS NULL",
            (ws_id,),
        )
        row = await cursor.fetchone()
        if not row:
            continue  # Already locked by another coroutine

        # Layer 1: Path-level lock check — prevent two workspace rows
        # pointing at the same filesystem path from being locked
        # simultaneously, even across different projects.
        cursor = await self._db.execute(
            "SELECT id FROM workspaces "
            "WHERE workspace_path = ? AND locked_by_agent_id IS NOT NULL "
            "AND id != ?",
            (row["workspace_path"], row["id"]),
        )
        conflict = await cursor.fetchone()
        if conflict:
            logger.warning(
                "Workspace path %s already locked by workspace %s — skipping %s",
                row["workspace_path"], conflict["id"], row["id"],
            )
            continue  # Try next candidate instead of giving up

        # Optimistic lock: UPDATE only if still unlocked.
        cursor = await self._db.execute(
            "UPDATE workspaces SET locked_by_agent_id = ?, "
            "locked_by_task_id = ?, locked_at = ? "
            "WHERE id = ? AND locked_by_agent_id IS NULL",
            (agent_id, task_id, now, row["id"]),
        )
        await self._db.commit()

        if cursor.rowcount != 1:
            # Another coroutine locked it between our SELECT and UPDATE.
            continue

        ws = self._row_to_workspace(row)
        ws.locked_by_agent_id = agent_id
        ws.locked_by_task_id = task_id
        ws.locked_at = now
        return ws

    return None  # All candidates were locked by the time we tried
release_workspace async
release_workspace(workspace_id: str) -> None

Clear lock columns on a workspace.

Source code in src/database.py
async def release_workspace(self, workspace_id: str) -> None:
    """Clear lock columns on a workspace."""
    await self._db.execute(
        "UPDATE workspaces SET locked_by_agent_id = NULL, "
        "locked_by_task_id = NULL, locked_at = NULL "
        "WHERE id = ?",
        (workspace_id,),
    )
    await self._db.commit()
release_workspaces_for_agent async
release_workspaces_for_agent(agent_id: str) -> int

Release all workspace locks held by an agent. Returns count released.

Source code in src/database.py
async def release_workspaces_for_agent(self, agent_id: str) -> int:
    """Release all workspace locks held by an agent. Returns count released."""
    cursor = await self._db.execute(
        "UPDATE workspaces SET locked_by_agent_id = NULL, "
        "locked_by_task_id = NULL, locked_at = NULL "
        "WHERE locked_by_agent_id = ?",
        (agent_id,),
    )
    await self._db.commit()
    return cursor.rowcount
release_workspaces_for_task async
release_workspaces_for_task(task_id: str) -> int

Release all workspace locks held by a task. Returns count released.

Source code in src/database.py
async def release_workspaces_for_task(self, task_id: str) -> int:
    """Release all workspace locks held by a task. Returns count released."""
    cursor = await self._db.execute(
        "UPDATE workspaces SET locked_by_agent_id = NULL, "
        "locked_by_task_id = NULL, locked_at = NULL "
        "WHERE locked_by_task_id = ?",
        (task_id,),
    )
    await self._db.commit()
    return cursor.rowcount
get_workspace_for_task async
get_workspace_for_task(task_id: str) -> Workspace | None

Find the workspace currently locked by a task.

Source code in src/database.py
async def get_workspace_for_task(self, task_id: str) -> Workspace | None:
    """Find the workspace currently locked by a task."""
    cursor = await self._db.execute(
        "SELECT * FROM workspaces WHERE locked_by_task_id = ?",
        (task_id,),
    )
    row = await cursor.fetchone()
    if not row:
        return None
    return self._row_to_workspace(row)
get_project_workspace_path async
get_project_workspace_path(project_id: str) -> str | None

Return the workspace_path of the first workspace for a project.

This is a non-locking read used by notes, archive, repo status, and other commands that need a project directory without acquiring a lock. Prefers clone workspaces over link workspaces since clones are always project-specific. Returns None if the project has no workspaces.

Source code in src/database.py
async def get_project_workspace_path(self, project_id: str) -> str | None:
    """Return the workspace_path of the first workspace for a project.

    This is a non-locking read used by notes, archive, repo status, and
    other commands that need a project directory without acquiring a lock.
    Prefers clone workspaces over link workspaces since clones are always
    project-specific.  Returns ``None`` if the project has no workspaces.
    """
    cursor = await self._db.execute(
        "SELECT workspace_path FROM workspaces WHERE project_id = ? "
        "ORDER BY CASE source_type WHEN 'clone' THEN 0 ELSE 1 END, rowid "
        "LIMIT 1",
        (project_id,),
    )
    row = await cursor.fetchone()
    return row["workspace_path"] if row else None
count_available_workspaces async
count_available_workspaces(project_id: str) -> int

Count workspaces for a project that are not currently locked.

Used by the scheduler to skip projects with no available workspaces.

Source code in src/database.py
async def count_available_workspaces(self, project_id: str) -> int:
    """Count workspaces for a project that are not currently locked.

    Used by the scheduler to skip projects with no available workspaces.
    """
    cursor = await self._db.execute(
        "SELECT COUNT(*) AS cnt FROM workspaces "
        "WHERE project_id = ? AND locked_by_agent_id IS NULL",
        (project_id,),
    )
    row = await cursor.fetchone()
    return row["cnt"]
save_task_result async
save_task_result(task_id: str, agent_id: str, output) -> None

Persist an AgentOutput to the task_results table.

Source code in src/database.py
async def save_task_result(
    self, task_id: str, agent_id: str, output
) -> None:
    """Persist an AgentOutput to the task_results table."""
    await self._db.execute(
        "INSERT INTO task_results (id, task_id, agent_id, result, summary, "
        "files_changed, error_message, tokens_used, created_at) "
        "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
        (str(uuid.uuid4()), task_id, agent_id, output.result.value,
         output.summary, json.dumps(output.files_changed),
         output.error_message, output.tokens_used, time.time()),
    )
    await self._db.commit()
get_task_result async
get_task_result(task_id: str) -> dict | None

Return the most recent result for a task.

Source code in src/database.py
async def get_task_result(self, task_id: str) -> dict | None:
    """Return the most recent result for a task."""
    cursor = await self._db.execute(
        "SELECT * FROM task_results WHERE task_id = ? "
        "ORDER BY created_at DESC LIMIT 1",
        (task_id,),
    )
    row = await cursor.fetchone()
    if not row:
        return None
    return self._row_to_task_result(row)
get_task_results async
get_task_results(task_id: str) -> list[dict]

Return all results for a task (retry history).

Source code in src/database.py
async def get_task_results(self, task_id: str) -> list[dict]:
    """Return all results for a task (retry history)."""
    cursor = await self._db.execute(
        "SELECT * FROM task_results WHERE task_id = ? ORDER BY created_at ASC",
        (task_id,),
    )
    rows = await cursor.fetchall()
    return [self._row_to_task_result(r) for r in rows]
delete_project async
delete_project(project_id: str) -> None

Delete a project and all associated data (tasks, repos, results, ledger).

Source code in src/database.py
async def delete_project(self, project_id: str) -> None:
    """Delete a project and all associated data (tasks, repos, results, ledger)."""
    # Get all task IDs for this project
    cursor = await self._db.execute(
        "SELECT id FROM tasks WHERE project_id = ?", (project_id,)
    )
    task_rows = await cursor.fetchall()
    task_ids = [r["id"] for r in task_rows]

    for tid in task_ids:
        await self._db.execute("DELETE FROM task_results WHERE task_id = ?", (tid,))
        await self._db.execute("DELETE FROM task_dependencies WHERE task_id = ? OR depends_on_task_id = ?", (tid, tid))
        await self._db.execute("DELETE FROM task_criteria WHERE task_id = ?", (tid,))
        await self._db.execute("DELETE FROM task_context WHERE task_id = ?", (tid,))
        await self._db.execute("DELETE FROM task_tools WHERE task_id = ?", (tid,))

    await self._db.execute("DELETE FROM hook_runs WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM hooks WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM token_ledger WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM tasks WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM agent_workspaces WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM workspaces WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM repos WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM events WHERE project_id = ?", (project_id,))
    await self._db.execute("DELETE FROM projects WHERE id = ?", (project_id,))
    await self._db.commit()
assign_task_to_agent async
assign_task_to_agent(task_id: str, agent_id: str) -> None

Atomically bind a task to an agent, updating both sides.

This is the only method that should be used to start work on a task. In a single commit it: 1. Transitions the task from READY to ASSIGNED and sets its assigned_agent_id. 2. Transitions the agent from IDLE to BUSY and sets its current_task_id. 3. Logs a task_assigned event for the audit trail.

Performing all three writes in one commit prevents inconsistent states where a task thinks it is assigned but the agent does not (or vice versa).

Source code in src/database.py
async def assign_task_to_agent(self, task_id: str, agent_id: str) -> None:
    """Atomically bind a task to an agent, updating both sides.

    This is the only method that should be used to start work on a task.
    In a single commit it:
    1. Transitions the task from READY to ASSIGNED and sets its
       ``assigned_agent_id``.
    2. Transitions the agent from IDLE to BUSY and sets its
       ``current_task_id``.
    3. Logs a ``task_assigned`` event for the audit trail.

    Performing all three writes in one commit prevents inconsistent
    states where a task thinks it is assigned but the agent does not
    (or vice versa).
    """
    # Validate the READY -> ASSIGNED transition
    task = await self.get_task(task_id)
    if task and not is_valid_status_transition(task.status, TaskStatus.ASSIGNED):
        logger.warning(
            "Invalid task status transition: %s -> ASSIGNED for task '%s' "
            "(assign_task_to_agent)",
            task.status.value,
            task_id,
        )

    now = time.time()
    await self._db.execute(
        "UPDATE tasks SET status = ?, assigned_agent_id = ?, updated_at = ? "
        "WHERE id = ?",
        (TaskStatus.ASSIGNED.value, agent_id, now, task_id),
    )
    await self._db.execute(
        "UPDATE agents SET state = ?, current_task_id = ? WHERE id = ?",
        (AgentState.BUSY.value, task_id, agent_id),
    )
    await self._db.execute(
        "INSERT INTO events (event_type, project_id, task_id, agent_id, "
        "timestamp) VALUES (?, (SELECT project_id FROM tasks WHERE id = ?), "
        "?, ?, ?)",
        ("task_assigned", task_id, task_id, agent_id, now),
    )
    await self._db.commit()
archive_task async
archive_task(task_id: str) -> bool

Move a single task from the active tasks table into archived_tasks.

The task must exist and be in a terminal status (COMPLETED, FAILED, or BLOCKED). Returns True if the task was archived, False if not found.

The operation copies the full task row into archived_tasks with the current timestamp as archived_at, then deletes the task and its related child rows from the active tables.

Source code in src/database.py
async def archive_task(self, task_id: str) -> bool:
    """Move a single task from the active ``tasks`` table into ``archived_tasks``.

    The task must exist and be in a terminal status (COMPLETED, FAILED, or
    BLOCKED).  Returns *True* if the task was archived, *False* if not
    found.

    The operation copies the full task row into ``archived_tasks`` with the
    current timestamp as ``archived_at``, then deletes the task and its
    related child rows from the active tables.
    """
    task = await self.get_task(task_id)
    if task is None:
        return False

    now = time.time()
    await self._db.execute(
        "INSERT OR IGNORE INTO archived_tasks "
        "(id, project_id, parent_task_id, repo_id, title, description, "
        "priority, status, verification_type, retry_count, max_retries, "
        "assigned_agent_id, branch_name, resume_after, requires_approval, "
        "pr_url, plan_source, is_plan_subtask, task_type, profile_id, "
        "preferred_workspace_id, created_at, updated_at, archived_at) "
        "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
        (task.id, task.project_id, task.parent_task_id, task.repo_id,
         task.title, task.description, task.priority, task.status.value,
         task.verification_type.value, task.retry_count, task.max_retries,
         task.assigned_agent_id, task.branch_name, task.resume_after,
         int(task.requires_approval), task.pr_url, task.plan_source,
         int(task.is_plan_subtask),
         task.task_type.value if task.task_type else None,
         task.profile_id,
         task.preferred_workspace_id,
         0.0, 0.0, now),
    )
    # Read original timestamps from the tasks row directly.
    cursor = await self._db.execute(
        "SELECT created_at, updated_at FROM tasks WHERE id = ?", (task_id,)
    )
    row = await cursor.fetchone()
    if row:
        await self._db.execute(
            "UPDATE archived_tasks SET created_at = ?, updated_at = ? WHERE id = ?",
            (row["created_at"], row["updated_at"], task_id),
        )

    # Clean up child rows, then remove the task from the active table.
    await self._db.execute("DELETE FROM task_results WHERE task_id = ?", (task_id,))
    await self._db.execute("DELETE FROM token_ledger WHERE task_id = ?", (task_id,))
    await self._db.execute(
        "DELETE FROM task_dependencies WHERE task_id = ? OR depends_on_task_id = ?",
        (task_id, task_id),
    )
    await self._db.execute("DELETE FROM task_criteria WHERE task_id = ?", (task_id,))
    await self._db.execute("DELETE FROM task_context WHERE task_id = ?", (task_id,))
    await self._db.execute("DELETE FROM task_tools WHERE task_id = ?", (task_id,))
    # Null out foreign-key back-references from other tables so the
    # DELETE doesn't violate FK constraints.
    await self._db.execute(
        "UPDATE tasks SET parent_task_id = NULL WHERE parent_task_id = ?",
        (task_id,),
    )
    await self._db.execute(
        "UPDATE agents SET current_task_id = NULL WHERE current_task_id = ?",
        (task_id,),
    )
    await self._db.execute(
        "UPDATE workspaces SET locked_by_task_id = NULL, locked_at = NULL "
        "WHERE locked_by_task_id = ?",
        (task_id,),
    )
    await self._db.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
    await self._db.commit()
    return True
archive_completed_tasks async
archive_completed_tasks(project_id: str | None = None) -> list[str]

Archive all COMPLETED tasks, optionally filtered by project.

Returns the list of archived task IDs.

Source code in src/database.py
async def archive_completed_tasks(
    self, project_id: str | None = None,
) -> list[str]:
    """Archive all COMPLETED tasks, optionally filtered by project.

    Returns the list of archived task IDs.
    """
    conditions = ["status = ?"]
    vals: list = [TaskStatus.COMPLETED.value]
    if project_id:
        conditions.append("project_id = ?")
        vals.append(project_id)
    where = f"WHERE {' AND '.join(conditions)}"
    cursor = await self._db.execute(
        f"SELECT id FROM tasks {where}", vals,
    )
    rows = await cursor.fetchall()
    task_ids = [r["id"] for r in rows]

    for tid in task_ids:
        await self.archive_task(tid)

    return task_ids
archive_old_terminal_tasks async
archive_old_terminal_tasks(statuses: list[str], older_than_seconds: float) -> list[str]

Archive terminal tasks whose updated_at is older than the threshold.

This is the engine behind automatic archiving: the orchestrator calls this once per cycle with the configured statuses and age threshold to silently sweep stale terminal tasks into the archive.

Parameters

statuses Task status values eligible for archiving (e.g. ["COMPLETED", "FAILED", "BLOCKED"]). older_than_seconds Tasks whose updated_at timestamp is more than this many seconds in the past will be archived.

Returns the list of archived task IDs.

Source code in src/database.py
async def archive_old_terminal_tasks(
    self,
    statuses: list[str],
    older_than_seconds: float,
) -> list[str]:
    """Archive terminal tasks whose ``updated_at`` is older than the threshold.

    This is the engine behind automatic archiving: the orchestrator calls
    this once per cycle with the configured statuses and age threshold to
    silently sweep stale terminal tasks into the archive.

    Parameters
    ----------
    statuses
        Task status values eligible for archiving (e.g.
        ``["COMPLETED", "FAILED", "BLOCKED"]``).
    older_than_seconds
        Tasks whose ``updated_at`` timestamp is more than this many
        seconds in the past will be archived.

    Returns the list of archived task IDs.
    """
    if not statuses:
        return []

    cutoff = time.time() - older_than_seconds
    placeholders = ", ".join("?" for _ in statuses)
    cursor = await self._db.execute(
        f"SELECT id FROM tasks "
        f"WHERE status IN ({placeholders}) AND updated_at <= ?",
        [*statuses, cutoff],
    )
    rows = await cursor.fetchall()
    task_ids = [r["id"] for r in rows]

    for tid in task_ids:
        await self.archive_task(tid)

    return task_ids
list_archived_tasks async
list_archived_tasks(project_id: str | None = None, limit: int = 50) -> list[dict]

Return archived tasks as dicts, newest archived first.

Unlike active tasks which are returned as :class:Task dataclasses, archived tasks include the extra archived_at field so they are returned as plain dicts.

Source code in src/database.py
async def list_archived_tasks(
    self,
    project_id: str | None = None,
    limit: int = 50,
) -> list[dict]:
    """Return archived tasks as dicts, newest archived first.

    Unlike active tasks which are returned as :class:`Task` dataclasses,
    archived tasks include the extra ``archived_at`` field so they are
    returned as plain dicts.
    """
    conditions: list[str] = []
    vals: list = []
    if project_id:
        conditions.append("project_id = ?")
        vals.append(project_id)
    where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    cursor = await self._db.execute(
        f"SELECT * FROM archived_tasks {where} "
        "ORDER BY archived_at DESC LIMIT ?",
        vals + [limit],
    )
    rows = await cursor.fetchall()
    return [self._row_to_archived_task(r) for r in rows]
get_archived_task async
get_archived_task(task_id: str) -> dict | None

Return a single archived task as a dict, or None if not found.

Source code in src/database.py
async def get_archived_task(self, task_id: str) -> dict | None:
    """Return a single archived task as a dict, or *None* if not found."""
    cursor = await self._db.execute(
        "SELECT * FROM archived_tasks WHERE id = ?", (task_id,)
    )
    row = await cursor.fetchone()
    if not row:
        return None
    return self._row_to_archived_task(row)
restore_archived_task async
restore_archived_task(task_id: str) -> bool

Move an archived task back into the active tasks table.

The task is restored with status DEFINED so it can be re-evaluated by the orchestrator. Returns True if restored, False if the archived task was not found.

Source code in src/database.py
async def restore_archived_task(self, task_id: str) -> bool:
    """Move an archived task back into the active ``tasks`` table.

    The task is restored with status DEFINED so it can be re-evaluated
    by the orchestrator.  Returns *True* if restored, *False* if the
    archived task was not found.
    """
    archived = await self.get_archived_task(task_id)
    if archived is None:
        return False

    now = time.time()
    task = Task(
        id=archived["id"],
        project_id=archived["project_id"],
        parent_task_id=archived["parent_task_id"],
        repo_id=archived["repo_id"],
        title=archived["title"],
        description=archived["description"],
        priority=archived["priority"],
        status=TaskStatus.DEFINED,
        verification_type=VerificationType(archived["verification_type"]),
        retry_count=0,
        max_retries=archived["max_retries"],
        assigned_agent_id=None,
        branch_name=archived["branch_name"],
        resume_after=None,
        requires_approval=archived["requires_approval"],
        pr_url=archived["pr_url"],
        plan_source=archived["plan_source"],
        is_plan_subtask=archived["is_plan_subtask"],
        task_type=TaskType(archived["task_type"]) if archived["task_type"] else None,
    )
    await self.create_task(task)
    await self._db.execute(
        "DELETE FROM archived_tasks WHERE id = ?", (task_id,)
    )
    await self._db.commit()
    return True
delete_archived_task async
delete_archived_task(task_id: str) -> bool

Permanently delete an archived task. Returns True if found and deleted.

Source code in src/database.py
async def delete_archived_task(self, task_id: str) -> bool:
    """Permanently delete an archived task. Returns *True* if found and deleted."""
    cursor = await self._db.execute(
        "SELECT id FROM archived_tasks WHERE id = ?", (task_id,)
    )
    row = await cursor.fetchone()
    if not row:
        return False
    await self._db.execute(
        "DELETE FROM archived_tasks WHERE id = ?", (task_id,)
    )
    await self._db.commit()
    return True
count_archived_tasks async
count_archived_tasks(project_id: str | None = None) -> int

Return the total count of archived tasks.

Source code in src/database.py
async def count_archived_tasks(
    self, project_id: str | None = None,
) -> int:
    """Return the total count of archived tasks."""
    conditions: list[str] = []
    vals: list = []
    if project_id:
        conditions.append("project_id = ?")
        vals.append(project_id)
    where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
    cursor = await self._db.execute(
        f"SELECT COUNT(*) as cnt FROM archived_tasks {where}", vals,
    )
    row = await cursor.fetchone()
    return row["cnt"] if row else 0

Functions