Database¶
SQLite persistence layer with async operations via aiosqlite.
Persistence layer for the agent queue system.
Single SQLite database using WAL journal mode for concurrent reads from the orchestrator loop, Discord bot, and chat agent without blocking writers.
Follows the repository pattern -- all SQL is encapsulated here. The rest of
the codebase interacts with the database exclusively through the
:class:Database class, receiving and returning domain model dataclasses.
The schema covers 14 tables organized around the core domain concepts: projects, repos, tasks (with dependencies, criteria, context, tools, and results), agents, token_ledger, events, rate_limits, hooks, hook_runs, and system_config.
Migrations are applied as idempotent ALTER TABLE ADD COLUMN statements
during initialization. If a column already exists the error is silently
caught, so migrations are safe to re-run on every startup.
See specs/database.md for the full schema and behavioral specification.
Classes¶
Database ¶
Async SQLite persistence layer implementing the repository pattern.
All database access in the system goes through this class. It owns the
connection lifecycle, schema creation, migrations, and provides typed
CRUD methods that accept and return domain dataclasses from
:mod:src.models.
The connection uses WAL journal mode and has foreign keys enabled, so
referential integrity is enforced at the database level. Row factory is
set to aiosqlite.Row for dict-like column access.
State transitions go through :meth:transition_task, which validates
against the state machine but always applies the update (logging-only
enforcement) to avoid blocking production on unexpected edge cases.
Source code in src/database.py
Functions¶
update_repo
async
¶
Update repo config fields (e.g. default_branch, url).
Source code in src/database.py
list_active_tasks
async
¶
list_active_tasks(project_id: str | None = None, exclude_statuses: set[TaskStatus] | None = None) -> list[Task]
List non-terminal tasks, optionally filtered by project.
Unlike :meth:list_tasks, this method performs status filtering at the
SQL level so the database only returns actionable rows. This is more
efficient for cross-project overviews where the majority of historical
tasks may be completed.
Parameters¶
project_id:
Optional project filter. When None, tasks from all projects
are returned.
exclude_statuses:
Set of :class:TaskStatus values to exclude. Defaults to
COMPLETED only — FAILED and BLOCKED tasks are kept visible
since they still need attention.
Source code in src/database.py
list_active_tasks_all_projects
async
¶
Return all non-completed tasks across every project.
Only COMPLETED tasks are excluded — FAILED and BLOCKED tasks are kept visible since they still need attention. Results are ordered by project_id first (so the caller can group by project) then by priority within each project.
Source code in src/database.py
count_tasks_by_status
async
¶
Return a {status_value: count} mapping for quick summary stats.
Useful for reporting how many tasks were hidden when filtering.
Source code in src/database.py
transition_task
async
¶
Update task status with state-machine validation.
Fetches the current status, checks it against the formal state
machine defined in :mod:src.state_machine, and logs a warning if
the transition is not valid. The update is always applied
regardless of validation outcome (logging-only enforcement).
This deliberate design choice keeps production running when edge cases produce unexpected transitions (e.g. a race between the orchestrator loop and a Discord command). The warnings surface in logs for investigation without blocking task progress.
If new_status equals the current status, no transition validation occurs -- only the extra kwargs are applied (useful for updating metadata without changing state).
Any extra kwargs (e.g. assigned_agent_id, retry_count,
resume_after) are forwarded to :meth:update_task.
Source code in src/database.py
get_task_updated_at
async
¶
Return the updated_at timestamp for a task, or None.
Source code in src/database.py
get_task_created_at
async
¶
Return the created_at timestamp for a task, or None.
Source code in src/database.py
add_task_context
async
¶
Insert a task_context row and return its generated ID.
Source code in src/database.py
get_task_contexts
async
¶
Return all task_context rows for task_id as dicts.
Source code in src/database.py
get_task_tree
async
¶
Return a nested dict representing the full task hierarchy.
The root task is fetched by root_task_id, then all descendants are
collected recursively via :meth:get_subtasks.
The returned structure looks like::
{
"task": <Task>, # the root Task object
"children": [ # list of child sub-trees
{"task": <Task>, "children": [...]},
...
],
}
Uses :meth:get_subtasks as the building block and recurses
through all descendants. Returns None if root_task_id
does not exist in the database.
Source code in src/database.py
get_parent_tasks
async
¶
Return top-level tasks for a project (those with no parent).
A "parent task" here means a task whose parent_task_id is NULL --
i.e. it is not a subtask of any other task. Results are ordered by
priority ascending, then creation time ascending, matching
:meth:list_tasks.
Source code in src/database.py
are_dependencies_met
async
¶
Check whether all upstream dependencies of a task are satisfied.
Returns True if every task that task_id depends on has reached
COMPLETED status. Also returns True if the task has no dependencies
at all (vacuous truth). This is the gate that controls the
DEFINED -> READY promotion in the orchestrator loop.
Source code in src/database.py
get_stuck_defined_tasks
async
¶
Return DEFINED tasks that are truly stuck — blocked by a dependency in a terminal failure state (BLOCKED or FAILED).
A DEFINED task waiting on READY/IN_PROGRESS/DEFINED dependencies is normal and will eventually be promoted once the upstream work completes. Only tasks whose dependency chain contains a BLOCKED or FAILED task are reported.
Source code in src/database.py
get_blocking_dependencies
async
¶
Return (dep_task_id, dep_title, dep_status) for unmet dependencies.
Only returns dependencies whose status is NOT COMPLETED.
Source code in src/database.py
get_dependents
async
¶
Return task IDs that directly depend on task_id (reverse lookup).
Source code in src/database.py
get_dependency_map_for_tasks
async
¶
Batch-fetch dependency data for multiple tasks in two queries.
Returns a mapping of task_id → {"depends_on": [...], "blocks": [...]}.
Each depends_on entry is {"id": ..., "status": ...}.
Each blocks entry is a plain task ID string.
This replaces the previous N+1 pattern of calling get_dependencies()
and get_dependents() per task, collapsing all lookups into two
efficient queries regardless of the number of tasks.
Source code in src/database.py
remove_dependency
async
¶
Remove a single dependency edge.
Source code in src/database.py
remove_all_dependencies_on
async
¶
Remove all dependency edges pointing to a given task.
Source code in src/database.py
delete_agent
async
¶
Delete an agent and all dependent records.
Cascading order (children before parent): 1. token_ledger – immutable token-usage rows 2. task_results – execution-history rows 3. agent_workspaces – per-project workspace mappings (legacy) 4. workspaces – release locks (don't delete — workspaces belong to projects) 5. tasks.assigned_agent_id – NULLify (don't delete the tasks) 6. agents – the agent record itself
Source code in src/database.py
get_workspace_by_name
async
¶
Find a workspace by name within a project.
Source code in src/database.py
acquire_workspace
async
¶
acquire_workspace(project_id: str, agent_id: str, task_id: str, preferred_workspace_id: str | None = None) -> Workspace | None
Atomically find an unlocked workspace for a project and lock it.
If preferred_workspace_id is given (e.g. a workspace known to contain a merge conflict), attempt to lock that specific workspace first. Falls back to any unlocked workspace if the preferred one is unavailable.
Returns the locked workspace, or None if all workspaces are locked.
Race-safety: Multiple coroutines may call this concurrently. The
UPDATE uses WHERE locked_by_agent_id IS NULL as an optimistic
lock, and we verify rowcount == 1 before returning. If another
coroutine locked the row between our SELECT and UPDATE, we retry
with the next available workspace instead of silently returning a
workspace we don't actually hold.
Source code in src/database.py
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 | |
release_workspace
async
¶
Clear lock columns on a workspace.
Source code in src/database.py
release_workspaces_for_agent
async
¶
Release all workspace locks held by an agent. Returns count released.
Source code in src/database.py
release_workspaces_for_task
async
¶
Release all workspace locks held by a task. Returns count released.
Source code in src/database.py
get_workspace_for_task
async
¶
Find the workspace currently locked by a task.
Source code in src/database.py
get_project_workspace_path
async
¶
Return the workspace_path of the first workspace for a project.
This is a non-locking read used by notes, archive, repo status, and
other commands that need a project directory without acquiring a lock.
Prefers clone workspaces over link workspaces since clones are always
project-specific. Returns None if the project has no workspaces.
Source code in src/database.py
count_available_workspaces
async
¶
Count workspaces for a project that are not currently locked.
Used by the scheduler to skip projects with no available workspaces.
Source code in src/database.py
save_task_result
async
¶
Persist an AgentOutput to the task_results table.
Source code in src/database.py
get_task_result
async
¶
Return the most recent result for a task.
Source code in src/database.py
get_task_results
async
¶
Return all results for a task (retry history).
Source code in src/database.py
delete_project
async
¶
Delete a project and all associated data (tasks, repos, results, ledger).
Source code in src/database.py
assign_task_to_agent
async
¶
Atomically bind a task to an agent, updating both sides.
This is the only method that should be used to start work on a task.
In a single commit it:
1. Transitions the task from READY to ASSIGNED and sets its
assigned_agent_id.
2. Transitions the agent from IDLE to BUSY and sets its
current_task_id.
3. Logs a task_assigned event for the audit trail.
Performing all three writes in one commit prevents inconsistent states where a task thinks it is assigned but the agent does not (or vice versa).
Source code in src/database.py
archive_task
async
¶
Move a single task from the active tasks table into archived_tasks.
The task must exist and be in a terminal status (COMPLETED, FAILED, or BLOCKED). Returns True if the task was archived, False if not found.
The operation copies the full task row into archived_tasks with the
current timestamp as archived_at, then deletes the task and its
related child rows from the active tables.
Source code in src/database.py
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 | |
archive_completed_tasks
async
¶
Archive all COMPLETED tasks, optionally filtered by project.
Returns the list of archived task IDs.
Source code in src/database.py
archive_old_terminal_tasks
async
¶
Archive terminal tasks whose updated_at is older than the threshold.
This is the engine behind automatic archiving: the orchestrator calls this once per cycle with the configured statuses and age threshold to silently sweep stale terminal tasks into the archive.
Parameters¶
statuses
Task status values eligible for archiving (e.g.
["COMPLETED", "FAILED", "BLOCKED"]).
older_than_seconds
Tasks whose updated_at timestamp is more than this many
seconds in the past will be archived.
Returns the list of archived task IDs.
Source code in src/database.py
list_archived_tasks
async
¶
Return archived tasks as dicts, newest archived first.
Unlike active tasks which are returned as :class:Task dataclasses,
archived tasks include the extra archived_at field so they are
returned as plain dicts.
Source code in src/database.py
get_archived_task
async
¶
Return a single archived task as a dict, or None if not found.
Source code in src/database.py
restore_archived_task
async
¶
Move an archived task back into the active tasks table.
The task is restored with status DEFINED so it can be re-evaluated by the orchestrator. Returns True if restored, False if the archived task was not found.
Source code in src/database.py
delete_archived_task
async
¶
Permanently delete an archived task. Returns True if found and deleted.
Source code in src/database.py
count_archived_tasks
async
¶
Return the total count of archived tasks.