Skip to content

Orchestrator

Core task and agent lifecycle management.

Orchestrator — the central brain of the agent queue system.

Runs a ~5-second loop that drives the entire task lifecycle: promoting DEFINED tasks whose dependencies are met, scheduling READY tasks onto idle agents, launching agent execution as background asyncio tasks, managing git workspaces (clone/link/init), parsing plan files into chained subtasks, handling PR/approval workflows, and monitoring for stuck tasks.

Design principle: zero LLM calls for orchestration. All scheduling and state-machine logic is purely deterministic. Every token budget goes to actual agent work, not coordination overhead.

Heavy operations (agent execution, git clones) run as background asyncio tasks so the main loop stays responsive and can continue checking heartbeats, promoting tasks, and handling approvals while agents work.

Key method call hierarchy (read these to understand the full lifecycle)::

run_one_cycle()                       # Main loop entry (~5s interval)
├── _check_awaiting_approval()        # Poll PR merge status
│   ├── _handle_awaiting_no_pr()      # Auto-complete or remind
│   └── _check_pr_status()            # Merged/closed/open detection
├── _resume_paused_tasks()            # Backoff timer expiry
├── _check_defined_tasks()            # Dependency promotion
├── _check_stuck_defined_tasks()      # Monitoring alerts
├── _schedule()                       # Proportional fair-share assignment
└── _execute_task_safe(action)        # Background asyncio.Task per assignment
    └── _execute_task_safe_inner()    # Timeout + crash recovery wrapper
        └── _execute_task()           # Full pipeline:
            ├── _prepare_workspace()  #   Git branch/clone setup
            ├── adapter.start(ctx)    #   Launch agent process
            ├── adapter.wait()        #   Stream output + rate-limit retries
            ├── _complete_workspace() #   Commit/merge/PR post-completion
            │   ├── _merge_and_push() #     Direct merge path
            │   └── _create_pr_for_task() # PR-based approval path
            ├── _generate_tasks_from_plan()  # Auto-task from plan files
            └── cleanup               #   Release workspace + free agent

Workspace locking lifecycle::

_schedule() assigns task → _prepare_workspace() acquires lock
→ agent runs with exclusive workspace access
→ _complete_workspace() performs git operations
→ cleanup section releases lock via db.release_workspaces_for_task()

If the task times out, crashes, or is admin-stopped, the lock is
released in the error/timeout handler so the workspace isn't stuck.

Related modules:

  • src/scheduler.py — Pure-function proportional fair-share scheduler. Called by _schedule() with a SchedulerState snapshot; returns AssignAction objects with zero side effects. See that module's docstring for the deficit-based algorithm and min-task guarantee.

  • src/hooks.py — Event-driven and periodic hook engine. Ticked each cycle by run_one_cycle step 7; event hooks fire asynchronously via EventBus. See that module's docstring for the context pipeline, short-circuit checks, and LLM invocation with tool access.

  • src/state_machine.pyVALID_TASK_TRANSITIONS dict defining the legal (status, event) → status transitions. The orchestrator calls db.transition_task() which enforces these transitions.

  • src/adapters/base.py / src/adapters/claude.py — Agent adapter interface (start/wait/stop/is_alive). The orchestrator delegates all LLM interaction to adapters and only processes the resulting AgentOutput.

See specs/orchestrator.md for the full behavioral specification.

Classes

Orchestrator

Orchestrator(config: AppConfig, adapter_factory=None)

Coordinates the full task lifecycle across multiple projects and agents.

The orchestrator is deliberately decoupled from Discord: it communicates through injected callbacks (set_notify_callback, set_create_thread_callback) rather than importing Discord directly. This makes it testable in isolation and keeps the transport layer pluggable.

Key internal state:

  • _running_tasks — maps task_id to a background asyncio.Task for each agent execution currently in flight. The main loop checks this dict every cycle to clean up finished work and avoid double- launching.

  • _adapters — maps agent_id to the live adapter instance (e.g. ClaudeAdapter) so we can stop or cancel a running agent from admin commands like stop_task.

  • _paused — when True, the scheduler is skipped entirely (no new work is assigned) but monitoring, approvals, and promotions continue.

Initialize the orchestrator with its configuration and subsystems.

Parameters:

Name Type Description Default
config AppConfig

The application configuration (loaded from YAML).

required
adapter_factory

Factory for creating agent adapters (e.g. ClaudeAdapterFactory). When None, the orchestrator can manage state and scheduling but cannot execute tasks.

None

The constructor wires up all subsystems but does NOT perform any async initialization (DB schema creation, stale-state recovery, hook subscriptions). Call await initialize() before running cycles.

Source code in src/orchestrator.py
def __init__(self, config: AppConfig, adapter_factory=None):
    """Initialize the orchestrator with its configuration and subsystems.

    Args:
        config: The application configuration (loaded from YAML).
        adapter_factory: Factory for creating agent adapters (e.g.
            ClaudeAdapterFactory).  When None, the orchestrator can
            manage state and scheduling but cannot execute tasks.

    The constructor wires up all subsystems but does NOT perform any
    async initialization (DB schema creation, stale-state recovery,
    hook subscriptions).  Call ``await initialize()`` before running
    cycles.
    """
    self.config = config
    self.db = Database(config.database_path)
    self.bus = EventBus()
    self.budget = BudgetManager(
        global_budget=config.global_token_budget_daily
    )
    self.git = GitManager()
    self._adapter_factory = adapter_factory
    # Live adapter instances keyed by agent_id.  Stored so we can call
    # adapter.stop() from admin commands (stop_task, timeout recovery).
    self._adapters: dict[str, object] = {}
    # Background asyncio Tasks for in-flight agent executions, keyed by
    # task_id.  Cleaned up each cycle; prevents double-launching.
    self._running_tasks: dict[str, asyncio.Task] = {}
    self._notify: NotifyCallback | None = None
    self._create_thread: CreateThreadCallback | None = None
    # Discord message objects for task-started notifications, keyed by
    # task_id.  Stored so we can delete the message when the task finishes
    # to keep the chat window clean.
    self._task_started_messages: dict[str, Any] = {}
    self._paused: bool = False
    self._restart_requested: bool = False
    # Throttle: approval polling runs at most once per 60s.
    self._last_approval_check: float = 0.0
    # LLM interaction logger — records all LLM API calls (both direct
    # chat provider calls and agent sessions) to JSONL files for cost
    # analysis and prompt optimization.  See ``src/llm_logger.py``.
    self.llm_logger = LLMLogger(
        base_dir=os.path.join(config.data_dir, "logs", "llm"),
        enabled=config.llm_logging.enabled,
        retention_days=config.llm_logging.retention_days,
    )
    self._last_log_cleanup: float = 0.0
    self._last_auto_archive: float = 0.0
    self._config_watcher: ConfigWatcher | None = None
    # Chat provider for LLM-based plan parsing.  Optionally used by
    # ``_generate_tasks_from_plan`` to parse agent-written plan files
    # with an LLM instead of the regex parser, producing higher-quality
    # task splits.  Wrapped with ``LoggedChatProvider`` so plan-parsing
    # token usage appears in analytics under the ``plan_parser`` caller.
    self._chat_provider = None
    if config.auto_task.use_llm_parser:
        try:
            from src.chat_providers import create_chat_provider, LoggedChatProvider
            provider = create_chat_provider(config.chat_provider)
            if provider and self.llm_logger._enabled:
                provider = LoggedChatProvider(
                    provider, self.llm_logger, caller="plan_parser"
                )
            self._chat_provider = provider
        except Exception:
            pass
    # Tracks the last time we sent a reminder for an AWAITING_APPROVAL
    # task that has no PR URL (keyed by task_id).
    self._no_pr_reminded_at: dict[str, float] = {}
    # Tracks the last time a "stuck DEFINED" notification was sent for
    # each task (keyed by task_id) to rate-limit alerts.
    self._stuck_notified_at: dict[str, float] = {}
    self.hooks: HookEngine | None = None
    # Semantic memory manager — optional integration with memsearch.
    # Initialized only when config.memory.enabled is True and the
    # memsearch package is installed.
    self.memory_manager: "MemoryManager | None" = None
    if hasattr(config, "memory") and config.memory.enabled:
        try:
            from src.memory import MemoryManager
            self.memory_manager = MemoryManager(
                config.memory, storage_root=config.data_dir
            )
        except Exception as e:
            logger.warning("Memory manager initialization failed: %s", e)
    # Reference to the command handler, set by the bot after initialization.
    # Used to pass handler references to interactive Discord views (e.g.
    # Retry/Skip buttons on failed task notifications).
    self._command_handler: Any = None
    # Tracks per-project budget warning thresholds already sent so we
    # don't spam the same warning.  Keyed by project_id, value is the
    # highest threshold percentage (e.g. 80, 95) already notified.
    self._budget_warned_at: dict[str, int] = {}

Functions

set_command_handler
set_command_handler(handler: Any) -> None

Store a reference to the command handler for interactive views.

Source code in src/orchestrator.py
def set_command_handler(self, handler: Any) -> None:
    """Store a reference to the command handler for interactive views."""
    self._command_handler = handler
pause
pause() -> None

Pause scheduling — no new tasks are assigned, but monitoring continues.

When paused, run_one_cycle still runs approvals, dependency promotion, stuck-task detection, hooks, and archival — only the scheduler step (_schedule) is skipped. In-flight agent work continues unaffected; this only prevents new assignments.

Source code in src/orchestrator.py
def pause(self) -> None:
    """Pause scheduling — no new tasks are assigned, but monitoring continues.

    When paused, ``run_one_cycle`` still runs approvals, dependency
    promotion, stuck-task detection, hooks, and archival — only the
    scheduler step (``_schedule``) is skipped.  In-flight agent work
    continues unaffected; this only prevents *new* assignments.
    """
    self._paused = True
resume
resume() -> None

Resume scheduling after a pause. New assignments start on the next cycle.

Source code in src/orchestrator.py
def resume(self) -> None:
    """Resume scheduling after a pause.  New assignments start on the next cycle."""
    self._paused = False
set_notify_callback
set_notify_callback(callback: NotifyCallback) -> None

Inject the notification transport (e.g. Discord channel posting).

All orchestrator notifications flow through _notify_channel which delegates to this callback. The callback signature is::

async def callback(message: str, project_id: str | None,
                   *, embed=None, view=None) -> None

This injection pattern keeps the orchestrator testable without a live Discord connection. In production, main.py wires this to the Discord bot's send_notification method.

Source code in src/orchestrator.py
def set_notify_callback(self, callback: NotifyCallback) -> None:
    """Inject the notification transport (e.g. Discord channel posting).

    All orchestrator notifications flow through ``_notify_channel`` which
    delegates to this callback.  The callback signature is::

        async def callback(message: str, project_id: str | None,
                           *, embed=None, view=None) -> None

    This injection pattern keeps the orchestrator testable without a live
    Discord connection.  In production, ``main.py`` wires this to the
    Discord bot's ``send_notification`` method.
    """
    self._notify = callback
set_create_thread_callback
set_create_thread_callback(callback: CreateThreadCallback) -> None

Inject the thread-creation transport for per-task output streaming.

Each task execution creates a Discord thread for streaming agent output in real time. The callback returns two send functions: one for the thread itself and one for posting brief summaries to the parent channel.

Set by main.py during bot initialization. When None, agent output is posted directly to the notifications channel (noisier but functional).

Source code in src/orchestrator.py
def set_create_thread_callback(self, callback: CreateThreadCallback) -> None:
    """Inject the thread-creation transport for per-task output streaming.

    Each task execution creates a Discord thread for streaming agent output
    in real time.  The callback returns two send functions: one for the
    thread itself and one for posting brief summaries to the parent channel.

    Set by ``main.py`` during bot initialization.  When None, agent output
    is posted directly to the notifications channel (noisier but functional).
    """
    self._create_thread = callback
skip_task async
skip_task(task_id: str) -> tuple[str | None, list[Task]]

Skip a BLOCKED or FAILED task to unblock its dependency chain.

This is an admin escape hatch: it marks the task as COMPLETED (even though no work was done) so that downstream dependents whose only remaining unmet dependency was this task can proceed. The method performs a forward walk of the dependency graph to report which tasks will be unblocked, giving the operator visibility into the blast radius before the next cycle promotes them.

Returns (error_string | None, list_of_tasks_that_will_be_unblocked).

Source code in src/orchestrator.py
async def skip_task(self, task_id: str) -> tuple[str | None, list[Task]]:
    """Skip a BLOCKED or FAILED task to unblock its dependency chain.

    This is an admin escape hatch: it marks the task as COMPLETED (even
    though no work was done) so that downstream dependents whose only
    remaining unmet dependency was this task can proceed.  The method
    performs a forward walk of the dependency graph to report which
    tasks will be unblocked, giving the operator visibility into the
    blast radius before the next cycle promotes them.

    Returns (error_string | None, list_of_tasks_that_will_be_unblocked).
    """
    task = await self.db.get_task(task_id)
    if not task:
        return f"Task '{task_id}' not found", []
    if task.status not in (TaskStatus.BLOCKED, TaskStatus.FAILED):
        return (
            f"Task is not BLOCKED or FAILED (status: {task.status.value}). "
            f"Only blocked/failed tasks can be skipped.",
            [],
        )

    await self.db.transition_task(
        task_id,
        TaskStatus.COMPLETED,
        context="skip_task",
    )
    await self.db.log_event(
        "task_skipped",
        project_id=task.project_id,
        task_id=task.id,
        payload=f"skipped from {task.status.value}",
    )

    # Find which downstream tasks will now become unblocked.
    # After we set this task to COMPLETED, any direct dependents
    # whose other deps are also met will be promoted by the
    # next _check_defined_tasks cycle.
    unblocked: list[Task] = []
    dependents = await self.db.get_dependents(task_id)
    for dep_id in dependents:
        dep_task = await self.db.get_task(dep_id)
        if dep_task and dep_task.status == TaskStatus.DEFINED:
            # Check if all deps (including the now-skipped one) are met
            if await self.db.are_dependencies_met(dep_id):
                unblocked.append(dep_task)

    await self._notify_channel(
        f"**Task Skipped:** `{task_id}` — {task.title}\n"
        f"Marked as COMPLETED to unblock dependency chain."
        + (f"\n{len(unblocked)} task(s) will be unblocked in the next cycle."
           if unblocked else ""),
        project_id=task.project_id,
    )

    return None, unblocked
stop_task async
stop_task(task_id: str) -> str | None

Forcibly stop an in-progress task and release its agent.

Sends a stop signal to the running adapter, transitions the task to BLOCKED, resets the agent to IDLE, and checks whether stopping this task orphans any downstream dependency chain (notifying if so).

Returns None on success, or an error string if the task cannot be stopped.

Source code in src/orchestrator.py
async def stop_task(self, task_id: str) -> str | None:
    """Forcibly stop an in-progress task and release its agent.

    Sends a stop signal to the running adapter, transitions the task to
    BLOCKED, resets the agent to IDLE, and checks whether stopping this
    task orphans any downstream dependency chain (notifying if so).

    Returns None on success, or an error string if the task cannot be stopped.
    """
    task = await self.db.get_task(task_id)
    if not task:
        return f"Task '{task_id}' not found"
    if task.status != TaskStatus.IN_PROGRESS:
        return f"Task is not in progress (status: {task.status.value})"

    # Find and stop the adapter
    agent_id = task.assigned_agent_id
    if agent_id and agent_id in self._adapters:
        adapter = self._adapters[agent_id]
        try:
            await adapter.stop()
        except Exception as e:
            logger.error("Error stopping adapter for agent %s: %s", agent_id, e)

    # Cancel the background asyncio Task so the finally block in
    # _resilient_query fires even if the transport close didn't
    # immediately take effect.
    bg_task = self._running_tasks.get(task_id)
    if bg_task and not bg_task.done():
        bg_task.cancel()

    # Clean up sentinel and release workspace lock
    ws = await self.db.get_workspace_for_task(task_id)
    if ws:
        self._remove_sentinel(ws.workspace_path)
    await self.db.release_workspaces_for_task(task_id)
    await self.db.transition_task(task_id, TaskStatus.BLOCKED,
                                  context="stop_task",
                                  assigned_agent_id=None)
    if agent_id:
        await self.db.update_agent(agent_id, state=AgentState.IDLE,
                                   current_task_id=None)
        self._adapters.pop(agent_id, None)

    await self._notify_channel(
        f"**Task Stopped:** `{task_id}` — {task.title}",
        project_id=task.project_id,
    )
    # Delete the task-started message to reduce chat clutter
    started_msg = self._task_started_messages.pop(task_id, None)
    if started_msg is not None:
        try:
            await started_msg.delete()
        except Exception as e:
            logger.debug("Could not delete task-started message for %s: %s",
                         task_id, e)
    # Check if stopping this task blocks a dependency chain
    await self._notify_stuck_chain(task)
    return None
initialize async
initialize() -> None

Bootstrap the orchestrator and all subsystems.

Initialization order matters — later steps depend on earlier ones:

  1. Database — create tables if needed, run migrations. Must be first because every other step queries the DB.
  2. Agent profiles — sync YAML-defined profiles into DB rows so tasks can reference them by ID. Depends on DB being initialized.
  3. Stale state recovery — after a daemon restart, no adapter processes are actually running, so any IN_PROGRESS tasks and BUSY agents left over from the previous run are reset to a schedulable state. Must run before the first scheduling cycle to avoid ghost assignments. See _recover_stale_state.
  4. Hook engine — subscribe to EventBus events and pre-populate last-run timestamps so periodic hooks don't all fire simultaneously on startup. Depends on DB for reading last-run times.

This method must be called (and awaited) before run_one_cycle. Called by main.py during startup, after load_config() but before the Discord bot connects.

Source code in src/orchestrator.py
async def initialize(self) -> None:
    """Bootstrap the orchestrator and all subsystems.

    Initialization order matters — later steps depend on earlier ones:

    1. **Database** — create tables if needed, run migrations.  Must be
       first because every other step queries the DB.
    2. **Agent profiles** — sync YAML-defined profiles into DB rows so
       tasks can reference them by ID.  Depends on DB being initialized.
    3. **Stale state recovery** — after a daemon restart, no adapter
       processes are actually running, so any IN_PROGRESS tasks and BUSY
       agents left over from the previous run are reset to a schedulable
       state.  Must run before the first scheduling cycle to avoid
       ghost assignments.  See ``_recover_stale_state``.
    4. **Hook engine** — subscribe to EventBus events and pre-populate
       last-run timestamps so periodic hooks don't all fire simultaneously
       on startup.  Depends on DB for reading last-run times.

    This method must be called (and awaited) before ``run_one_cycle``.
    Called by ``main.py`` during startup, after ``load_config()`` but
    before the Discord bot connects.
    """
    await self.db.initialize()
    await self._sync_profiles_from_config()
    await self._recover_stale_state()
    if self.config.hook_engine.enabled:
        self.hooks = HookEngine(self.db, self.bus, self.config)
        self.hooks.set_orchestrator(self)
        await self.hooks.initialize()

    # Start config file watcher for hot-reloading
    if self.config._config_path:
        self._config_watcher = ConfigWatcher(
            config_path=self.config._config_path,
            event_bus=self.bus,
            current_config=self.config,
        )
        self.bus.subscribe("config.reloaded", self._on_config_reloaded)
        self._config_watcher.start()
wait_for_running_tasks async
wait_for_running_tasks(timeout: float | None = None) -> None

Wait for all background task executions to finish.

This is primarily useful in tests where run_one_cycle fires off background coroutines and the caller needs to wait for them to complete before inspecting results.

Source code in src/orchestrator.py
async def wait_for_running_tasks(self, timeout: float | None = None) -> None:
    """Wait for all background task executions to finish.

    This is primarily useful in tests where ``run_one_cycle`` fires off
    background coroutines and the caller needs to wait for them to
    complete before inspecting results.
    """
    if not self._running_tasks:
        return
    tasks = list(self._running_tasks.values())
    if timeout is not None:
        await asyncio.wait(tasks, timeout=timeout)
    else:
        await asyncio.gather(*tasks, return_exceptions=True)
shutdown async
shutdown() -> None

Gracefully shut down all subsystems in dependency order.

Shutdown order: 1. Wait for running agent tasks (with a 10s timeout so we don't hang indefinitely if an adapter is stuck). 2. Shut down the hook engine (cancels any in-flight hook tasks). 3. Close the memory manager (flushes pending index writes). 4. Close the database connection.

The order matters: tasks and hooks use the DB, so we must wait for them to finish before closing it.

Source code in src/orchestrator.py
async def shutdown(self) -> None:
    """Gracefully shut down all subsystems in dependency order.

    Shutdown order:
    1. Wait for running agent tasks (with a 10s timeout so we don't
       hang indefinitely if an adapter is stuck).
    2. Shut down the hook engine (cancels any in-flight hook tasks).
    3. Close the memory manager (flushes pending index writes).
    4. Close the database connection.

    The order matters: tasks and hooks use the DB, so we must wait
    for them to finish before closing it.
    """
    await self.wait_for_running_tasks(timeout=10)
    if self._config_watcher:
        await self._config_watcher.stop()
    if self.hooks:
        await self.hooks.shutdown()
    if self.memory_manager:
        try:
            await self.memory_manager.close()
        except Exception as e:
            logger.warning("Memory manager shutdown error: %s", e)
    await self.db.close()
run_one_cycle async
run_one_cycle() -> None

Run one iteration of the orchestrator's main loop.

Called every ~5 seconds by main.py's scheduler loop. Each cycle is designed to complete quickly (typically <1s of DB queries and state checks) — heavy work (agent execution, git operations) is delegated to background asyncio tasks that run concurrently.

The cycle is organized into three phases with numbered steps:

Phase 1 — Promotion cascade (steps 1-4):

  1. Approvals — complete tasks whose PRs were merged so their dependents can be promoted in the same cycle.
  2. Resume paused — bring back rate-limited/token-exhausted tasks whose backoff timers have expired.
  3. Promote DEFINED — check dependency satisfaction and move tasks to READY. This must happen after approvals so freshly-completed parent tasks unblock their children immediately.
  4. Stuck monitoring — rate-limited alerts for DEFINED tasks that have been waiting too long (runs after promotion so we don't false-alarm on tasks that just got promoted).

Phase 2 — Scheduling & launch (steps 5-6):

  1. Schedule — assign READY tasks to idle agents (skipped when the orchestrator is paused).
  2. Launch — fire off background asyncio tasks for each new assignment. These run concurrently with future cycles.

Phase 3 — Housekeeping (steps 7-10):

  1. Hook engine tick — process periodic hooks; event-driven hooks fire asynchronously via the EventBus.
  2. Config hot-reload — periodically re-read non-critical settings from disk (scheduling, archive, monitoring, etc.) without restart.
  3. Log cleanup — prune old LLM interaction logs and flush prompt analytics for long-term cost analysis.
  4. Auto-archive — sweep terminal tasks older than the configured threshold into the archive so they no longer clutter active views.

Ordering invariant: steps 1-3 form a "promotion cascade" where completing an approval can immediately unblock a DEFINED task in the same cycle. Breaking this order would add a 5s delay to dependency chain progression.

Error handling: the entire cycle is wrapped in a try/except so that a failure in one step (e.g., a DB query error) doesn't crash the daemon — it logs the error and retries on the next cycle.

Source code in src/orchestrator.py
async def run_one_cycle(self) -> None:
    """Run one iteration of the orchestrator's main loop.

    Called every ~5 seconds by ``main.py``'s scheduler loop.  Each cycle
    is designed to complete quickly (typically <1s of DB queries and
    state checks) — heavy work (agent execution, git operations) is
    delegated to background asyncio tasks that run concurrently.

    The cycle is organized into three phases with numbered steps:

    **Phase 1 — Promotion cascade** (steps 1-4):

    1. **Approvals** — complete tasks whose PRs were merged so
       their dependents can be promoted in the same cycle.
    2. **Resume paused** — bring back rate-limited/token-exhausted tasks
       whose backoff timers have expired.
    3. **Promote DEFINED** — check dependency satisfaction and move tasks
       to READY.  This must happen after approvals so freshly-completed
       parent tasks unblock their children immediately.
    4. **Stuck monitoring** — rate-limited alerts for DEFINED tasks that
       have been waiting too long (runs after promotion so we don't
       false-alarm on tasks that just got promoted).

    **Phase 2 — Scheduling & launch** (steps 5-6):

    5. **Schedule** — assign READY tasks to idle agents (skipped when
       the orchestrator is paused).
    6. **Launch** — fire off background asyncio tasks for each new
       assignment.  These run concurrently with future cycles.

    **Phase 3 — Housekeeping** (steps 7-10):

    7. **Hook engine tick** — process periodic hooks; event-driven hooks
       fire asynchronously via the EventBus.
    8. **Config hot-reload** — periodically re-read non-critical settings
       from disk (scheduling, archive, monitoring, etc.) without restart.
    9. **Log cleanup** — prune old LLM interaction logs and flush
       prompt analytics for long-term cost analysis.
    10. **Auto-archive** — sweep terminal tasks older than the configured
        threshold into the archive so they no longer clutter active views.

    Ordering invariant: steps 1-3 form a "promotion cascade" where
    completing an approval can immediately unblock a DEFINED task in
    the same cycle.  Breaking this order would add a 5s delay to
    dependency chain progression.

    Error handling: the entire cycle is wrapped in a try/except so
    that a failure in one step (e.g., a DB query error) doesn't crash
    the daemon — it logs the error and retries on the next cycle.
    """
    try:
        # ── Phase 1: Promotion cascade ──────────────────────────────────
        # Steps 1-3 form a "promotion cascade": completing an approval can
        # immediately unblock a DEFINED task in the same cycle.  Breaking
        # this order would add a ~5s delay to dependency chain progression.

        # 1. Poll PR merge/close status for AWAITING_APPROVAL tasks.
        #    Merged PRs → COMPLETED, which may satisfy downstream deps.
        await self._check_awaiting_approval()

        # 2. Promote PAUSED tasks whose backoff timer has expired → READY.
        await self._resume_paused_tasks()

        # 3. Promote DEFINED tasks whose dependencies are all met → READY.
        #    Runs after step 1 so freshly-completed approvals can unblock
        #    dependents within the same cycle.
        await self._check_defined_tasks()

        # 4. Monitoring: detect DEFINED tasks stuck beyond threshold.
        #    Runs after promotion so we don't false-alarm on tasks that
        #    were just promoted in step 3.
        await self._check_stuck_defined_tasks()

        # ── Phase 2: Scheduling & launch ────────────────────────────────

        # 5. Schedule READY tasks onto idle agents (skipped when paused).
        if not self._paused:
            actions = await self._schedule()
        else:
            actions = []

        # 6. Launch assigned tasks as background asyncio coroutines.
        #
        # First, reap completed background tasks from _running_tasks.
        # We must do this BEFORE launching new tasks to free up task IDs
        # (a task that just finished shouldn't block its re-assignment
        # on a retry).  We don't inspect results here — error handling
        # is done inside _execute_task_safe_inner.
        done = [tid for tid, t in self._running_tasks.items() if t.done()]
        for tid in done:
            self._running_tasks.pop(tid)

        for action in actions:
            if action.task_id in self._running_tasks:
                continue  # Already running — skip double-launch
            bg = asyncio.create_task(self._execute_task_safe(action))
            self._running_tasks[action.task_id] = bg

        # ── Phase 3: Housekeeping ───────────────────────────────────────

        # 7. Run hook engine tick (periodic hooks; event hooks fire async).
        if self.hooks:
            await self.hooks.tick()

        # 8. Config hot-reload is handled by ConfigWatcher (background task).

        # 9. Periodic log cleanup and analytics flush (~once per hour).
        now = time.time()
        if now - self._last_log_cleanup >= 3600:
            self._last_log_cleanup = now
            try:
                removed = self.llm_logger.cleanup_old_logs()
                if removed:
                    logger.info("LLM log cleanup: removed %d old directory(ies)", removed)
                # Flush prompt analytics to disk for long-term analysis
                self.llm_logger.flush_analytics()
            except Exception as e:
                logger.error("LLM log cleanup error: %s", e)

        # 10. Auto-archive stale terminal tasks (~once per hour).
        await self._auto_archive_tasks()
    except Exception as e:
        logger.error("Scheduler cycle error", exc_info=True)

Functions