Orchestrator — the central brain of the agent queue system.
Runs a ~5-second loop that drives the entire task lifecycle: promoting
DEFINED tasks whose dependencies are met, scheduling READY tasks onto idle
agents, launching agent execution as background asyncio tasks, managing git
workspaces (clone/link/init), parsing plan files into chained subtasks,
handling PR/approval workflows, and monitoring for stuck tasks.
Design principle: zero LLM calls for orchestration. All scheduling and
state-machine logic is purely deterministic. Every token budget goes to
actual agent work, not coordination overhead.
Heavy operations (agent execution, git clones) run as background asyncio
tasks so the main loop stays responsive and can continue checking heartbeats,
promoting tasks, and handling approvals while agents work.
Key method call hierarchy (read these to understand the full lifecycle)::
run_one_cycle() # Main loop entry (~5s interval)
├── _check_awaiting_approval() # Poll PR merge status
│ ├── _handle_awaiting_no_pr() # Auto-complete or remind
│ └── _check_pr_status() # Merged/closed/open detection
├── _resume_paused_tasks() # Backoff timer expiry
├── _check_defined_tasks() # Dependency promotion
├── _check_stuck_defined_tasks() # Monitoring alerts
├── _schedule() # Proportional fair-share assignment
└── _execute_task_safe(action) # Background asyncio.Task per assignment
└── _execute_task_safe_inner() # Timeout + crash recovery wrapper
└── _execute_task() # Full pipeline:
├── _prepare_workspace() # Git branch/clone setup
├── adapter.start(ctx) # Launch agent process
├── adapter.wait() # Stream output + rate-limit retries
├── _complete_workspace() # Commit/merge/PR post-completion
│ ├── _merge_and_push() # Direct merge path
│ └── _create_pr_for_task() # PR-based approval path
├── _generate_tasks_from_plan() # Auto-task from plan files
└── cleanup # Release workspace + free agent
Workspace locking lifecycle::
_schedule() assigns task → _prepare_workspace() acquires lock
→ agent runs with exclusive workspace access
→ _complete_workspace() performs git operations
→ cleanup section releases lock via db.release_workspaces_for_task()
If the task times out, crashes, or is admin-stopped, the lock is
released in the error/timeout handler so the workspace isn't stuck.
Related modules:
-
src/scheduler.py — Pure-function proportional fair-share scheduler.
Called by _schedule() with a SchedulerState snapshot; returns
AssignAction objects with zero side effects. See that module's
docstring for the deficit-based algorithm and min-task guarantee.
-
src/hooks.py — Event-driven and periodic hook engine. Ticked each
cycle by run_one_cycle step 7; event hooks fire asynchronously via
EventBus. See that module's docstring for the context pipeline,
short-circuit checks, and LLM invocation with tool access.
-
src/state_machine.py — VALID_TASK_TRANSITIONS dict defining the
legal (status, event) → status transitions. The orchestrator calls
db.transition_task() which enforces these transitions.
-
src/adapters/base.py / src/adapters/claude.py — Agent adapter
interface (start/wait/stop/is_alive). The orchestrator delegates all
LLM interaction to adapters and only processes the resulting
AgentOutput.
See specs/orchestrator.md for the full behavioral specification.
Classes
Orchestrator
Orchestrator(config: AppConfig, adapter_factory=None)
Coordinates the full task lifecycle across multiple projects and agents.
The orchestrator is deliberately decoupled from Discord: it communicates
through injected callbacks (set_notify_callback,
set_create_thread_callback) rather than importing Discord directly.
This makes it testable in isolation and keeps the transport layer
pluggable.
Key internal state:
-
_running_tasks — maps task_id to a background asyncio.Task
for each agent execution currently in flight. The main loop checks
this dict every cycle to clean up finished work and avoid double-
launching.
-
_adapters — maps agent_id to the live adapter instance (e.g.
ClaudeAdapter) so we can stop or cancel a running agent from
admin commands like stop_task.
-
_paused — when True, the scheduler is skipped entirely (no new
work is assigned) but monitoring, approvals, and promotions continue.
Initialize the orchestrator with its configuration and subsystems.
Parameters:
| Name |
Type |
Description |
Default |
config
|
AppConfig
|
The application configuration (loaded from YAML).
|
required
|
adapter_factory
|
|
Factory for creating agent adapters (e.g.
ClaudeAdapterFactory). When None, the orchestrator can
manage state and scheduling but cannot execute tasks.
|
None
|
The constructor wires up all subsystems but does NOT perform any
async initialization (DB schema creation, stale-state recovery,
hook subscriptions). Call await initialize() before running
cycles.
Source code in src/orchestrator.py
| def __init__(self, config: AppConfig, adapter_factory=None):
"""Initialize the orchestrator with its configuration and subsystems.
Args:
config: The application configuration (loaded from YAML).
adapter_factory: Factory for creating agent adapters (e.g.
ClaudeAdapterFactory). When None, the orchestrator can
manage state and scheduling but cannot execute tasks.
The constructor wires up all subsystems but does NOT perform any
async initialization (DB schema creation, stale-state recovery,
hook subscriptions). Call ``await initialize()`` before running
cycles.
"""
self.config = config
self.db = Database(config.database_path)
self.bus = EventBus()
self.budget = BudgetManager(
global_budget=config.global_token_budget_daily
)
self.git = GitManager()
self._adapter_factory = adapter_factory
# Live adapter instances keyed by agent_id. Stored so we can call
# adapter.stop() from admin commands (stop_task, timeout recovery).
self._adapters: dict[str, object] = {}
# Background asyncio Tasks for in-flight agent executions, keyed by
# task_id. Cleaned up each cycle; prevents double-launching.
self._running_tasks: dict[str, asyncio.Task] = {}
self._notify: NotifyCallback | None = None
self._create_thread: CreateThreadCallback | None = None
# Discord message objects for task-started notifications, keyed by
# task_id. Stored so we can delete the message when the task finishes
# to keep the chat window clean.
self._task_started_messages: dict[str, Any] = {}
self._paused: bool = False
self._restart_requested: bool = False
# Throttle: approval polling runs at most once per 60s.
self._last_approval_check: float = 0.0
# LLM interaction logger — records all LLM API calls (both direct
# chat provider calls and agent sessions) to JSONL files for cost
# analysis and prompt optimization. See ``src/llm_logger.py``.
self.llm_logger = LLMLogger(
base_dir=os.path.join(config.data_dir, "logs", "llm"),
enabled=config.llm_logging.enabled,
retention_days=config.llm_logging.retention_days,
)
self._last_log_cleanup: float = 0.0
self._last_auto_archive: float = 0.0
self._config_watcher: ConfigWatcher | None = None
# Chat provider for LLM-based plan parsing. Optionally used by
# ``_generate_tasks_from_plan`` to parse agent-written plan files
# with an LLM instead of the regex parser, producing higher-quality
# task splits. Wrapped with ``LoggedChatProvider`` so plan-parsing
# token usage appears in analytics under the ``plan_parser`` caller.
self._chat_provider = None
if config.auto_task.use_llm_parser:
try:
from src.chat_providers import create_chat_provider, LoggedChatProvider
provider = create_chat_provider(config.chat_provider)
if provider and self.llm_logger._enabled:
provider = LoggedChatProvider(
provider, self.llm_logger, caller="plan_parser"
)
self._chat_provider = provider
except Exception:
pass
# Tracks the last time we sent a reminder for an AWAITING_APPROVAL
# task that has no PR URL (keyed by task_id).
self._no_pr_reminded_at: dict[str, float] = {}
# Tracks the last time a "stuck DEFINED" notification was sent for
# each task (keyed by task_id) to rate-limit alerts.
self._stuck_notified_at: dict[str, float] = {}
self.hooks: HookEngine | None = None
# Semantic memory manager — optional integration with memsearch.
# Initialized only when config.memory.enabled is True and the
# memsearch package is installed.
self.memory_manager: "MemoryManager | None" = None
if hasattr(config, "memory") and config.memory.enabled:
try:
from src.memory import MemoryManager
self.memory_manager = MemoryManager(
config.memory, storage_root=config.data_dir
)
except Exception as e:
logger.warning("Memory manager initialization failed: %s", e)
# Reference to the command handler, set by the bot after initialization.
# Used to pass handler references to interactive Discord views (e.g.
# Retry/Skip buttons on failed task notifications).
self._command_handler: Any = None
# Tracks per-project budget warning thresholds already sent so we
# don't spam the same warning. Keyed by project_id, value is the
# highest threshold percentage (e.g. 80, 95) already notified.
self._budget_warned_at: dict[str, int] = {}
|
Functions
set_command_handler
set_command_handler(handler: Any) -> None
Store a reference to the command handler for interactive views.
Source code in src/orchestrator.py
| def set_command_handler(self, handler: Any) -> None:
"""Store a reference to the command handler for interactive views."""
self._command_handler = handler
|
pause
Pause scheduling — no new tasks are assigned, but monitoring continues.
When paused, run_one_cycle still runs approvals, dependency
promotion, stuck-task detection, hooks, and archival — only the
scheduler step (_schedule) is skipped. In-flight agent work
continues unaffected; this only prevents new assignments.
Source code in src/orchestrator.py
| def pause(self) -> None:
"""Pause scheduling — no new tasks are assigned, but monitoring continues.
When paused, ``run_one_cycle`` still runs approvals, dependency
promotion, stuck-task detection, hooks, and archival — only the
scheduler step (``_schedule``) is skipped. In-flight agent work
continues unaffected; this only prevents *new* assignments.
"""
self._paused = True
|
resume
Resume scheduling after a pause. New assignments start on the next cycle.
Source code in src/orchestrator.py
| def resume(self) -> None:
"""Resume scheduling after a pause. New assignments start on the next cycle."""
self._paused = False
|
set_notify_callback
set_notify_callback(callback: NotifyCallback) -> None
Inject the notification transport (e.g. Discord channel posting).
All orchestrator notifications flow through _notify_channel which
delegates to this callback. The callback signature is::
async def callback(message: str, project_id: str | None,
*, embed=None, view=None) -> None
This injection pattern keeps the orchestrator testable without a live
Discord connection. In production, main.py wires this to the
Discord bot's send_notification method.
Source code in src/orchestrator.py
| def set_notify_callback(self, callback: NotifyCallback) -> None:
"""Inject the notification transport (e.g. Discord channel posting).
All orchestrator notifications flow through ``_notify_channel`` which
delegates to this callback. The callback signature is::
async def callback(message: str, project_id: str | None,
*, embed=None, view=None) -> None
This injection pattern keeps the orchestrator testable without a live
Discord connection. In production, ``main.py`` wires this to the
Discord bot's ``send_notification`` method.
"""
self._notify = callback
|
set_create_thread_callback
set_create_thread_callback(callback: CreateThreadCallback) -> None
Inject the thread-creation transport for per-task output streaming.
Each task execution creates a Discord thread for streaming agent output
in real time. The callback returns two send functions: one for the
thread itself and one for posting brief summaries to the parent channel.
Set by main.py during bot initialization. When None, agent output
is posted directly to the notifications channel (noisier but functional).
Source code in src/orchestrator.py
| def set_create_thread_callback(self, callback: CreateThreadCallback) -> None:
"""Inject the thread-creation transport for per-task output streaming.
Each task execution creates a Discord thread for streaming agent output
in real time. The callback returns two send functions: one for the
thread itself and one for posting brief summaries to the parent channel.
Set by ``main.py`` during bot initialization. When None, agent output
is posted directly to the notifications channel (noisier but functional).
"""
self._create_thread = callback
|
skip_task
async
skip_task(task_id: str) -> tuple[str | None, list[Task]]
Skip a BLOCKED or FAILED task to unblock its dependency chain.
This is an admin escape hatch: it marks the task as COMPLETED (even
though no work was done) so that downstream dependents whose only
remaining unmet dependency was this task can proceed. The method
performs a forward walk of the dependency graph to report which
tasks will be unblocked, giving the operator visibility into the
blast radius before the next cycle promotes them.
Returns (error_string | None, list_of_tasks_that_will_be_unblocked).
Source code in src/orchestrator.py
| async def skip_task(self, task_id: str) -> tuple[str | None, list[Task]]:
"""Skip a BLOCKED or FAILED task to unblock its dependency chain.
This is an admin escape hatch: it marks the task as COMPLETED (even
though no work was done) so that downstream dependents whose only
remaining unmet dependency was this task can proceed. The method
performs a forward walk of the dependency graph to report which
tasks will be unblocked, giving the operator visibility into the
blast radius before the next cycle promotes them.
Returns (error_string | None, list_of_tasks_that_will_be_unblocked).
"""
task = await self.db.get_task(task_id)
if not task:
return f"Task '{task_id}' not found", []
if task.status not in (TaskStatus.BLOCKED, TaskStatus.FAILED):
return (
f"Task is not BLOCKED or FAILED (status: {task.status.value}). "
f"Only blocked/failed tasks can be skipped.",
[],
)
await self.db.transition_task(
task_id,
TaskStatus.COMPLETED,
context="skip_task",
)
await self.db.log_event(
"task_skipped",
project_id=task.project_id,
task_id=task.id,
payload=f"skipped from {task.status.value}",
)
# Find which downstream tasks will now become unblocked.
# After we set this task to COMPLETED, any direct dependents
# whose other deps are also met will be promoted by the
# next _check_defined_tasks cycle.
unblocked: list[Task] = []
dependents = await self.db.get_dependents(task_id)
for dep_id in dependents:
dep_task = await self.db.get_task(dep_id)
if dep_task and dep_task.status == TaskStatus.DEFINED:
# Check if all deps (including the now-skipped one) are met
if await self.db.are_dependencies_met(dep_id):
unblocked.append(dep_task)
await self._notify_channel(
f"**Task Skipped:** `{task_id}` — {task.title}\n"
f"Marked as COMPLETED to unblock dependency chain."
+ (f"\n{len(unblocked)} task(s) will be unblocked in the next cycle."
if unblocked else ""),
project_id=task.project_id,
)
return None, unblocked
|
stop_task
async
stop_task(task_id: str) -> str | None
Forcibly stop an in-progress task and release its agent.
Sends a stop signal to the running adapter, transitions the task to
BLOCKED, resets the agent to IDLE, and checks whether stopping this
task orphans any downstream dependency chain (notifying if so).
Returns None on success, or an error string if the task cannot be stopped.
Source code in src/orchestrator.py
| async def stop_task(self, task_id: str) -> str | None:
"""Forcibly stop an in-progress task and release its agent.
Sends a stop signal to the running adapter, transitions the task to
BLOCKED, resets the agent to IDLE, and checks whether stopping this
task orphans any downstream dependency chain (notifying if so).
Returns None on success, or an error string if the task cannot be stopped.
"""
task = await self.db.get_task(task_id)
if not task:
return f"Task '{task_id}' not found"
if task.status != TaskStatus.IN_PROGRESS:
return f"Task is not in progress (status: {task.status.value})"
# Find and stop the adapter
agent_id = task.assigned_agent_id
if agent_id and agent_id in self._adapters:
adapter = self._adapters[agent_id]
try:
await adapter.stop()
except Exception as e:
logger.error("Error stopping adapter for agent %s: %s", agent_id, e)
# Cancel the background asyncio Task so the finally block in
# _resilient_query fires even if the transport close didn't
# immediately take effect.
bg_task = self._running_tasks.get(task_id)
if bg_task and not bg_task.done():
bg_task.cancel()
# Clean up sentinel and release workspace lock
ws = await self.db.get_workspace_for_task(task_id)
if ws:
self._remove_sentinel(ws.workspace_path)
await self.db.release_workspaces_for_task(task_id)
await self.db.transition_task(task_id, TaskStatus.BLOCKED,
context="stop_task",
assigned_agent_id=None)
if agent_id:
await self.db.update_agent(agent_id, state=AgentState.IDLE,
current_task_id=None)
self._adapters.pop(agent_id, None)
await self._notify_channel(
f"**Task Stopped:** `{task_id}` — {task.title}",
project_id=task.project_id,
)
# Delete the task-started message to reduce chat clutter
started_msg = self._task_started_messages.pop(task_id, None)
if started_msg is not None:
try:
await started_msg.delete()
except Exception as e:
logger.debug("Could not delete task-started message for %s: %s",
task_id, e)
# Check if stopping this task blocks a dependency chain
await self._notify_stuck_chain(task)
return None
|
initialize
async
Bootstrap the orchestrator and all subsystems.
Initialization order matters — later steps depend on earlier ones:
- Database — create tables if needed, run migrations. Must be
first because every other step queries the DB.
- Agent profiles — sync YAML-defined profiles into DB rows so
tasks can reference them by ID. Depends on DB being initialized.
- Stale state recovery — after a daemon restart, no adapter
processes are actually running, so any IN_PROGRESS tasks and BUSY
agents left over from the previous run are reset to a schedulable
state. Must run before the first scheduling cycle to avoid
ghost assignments. See
_recover_stale_state.
- Hook engine — subscribe to EventBus events and pre-populate
last-run timestamps so periodic hooks don't all fire simultaneously
on startup. Depends on DB for reading last-run times.
This method must be called (and awaited) before run_one_cycle.
Called by main.py during startup, after load_config() but
before the Discord bot connects.
Source code in src/orchestrator.py
| async def initialize(self) -> None:
"""Bootstrap the orchestrator and all subsystems.
Initialization order matters — later steps depend on earlier ones:
1. **Database** — create tables if needed, run migrations. Must be
first because every other step queries the DB.
2. **Agent profiles** — sync YAML-defined profiles into DB rows so
tasks can reference them by ID. Depends on DB being initialized.
3. **Stale state recovery** — after a daemon restart, no adapter
processes are actually running, so any IN_PROGRESS tasks and BUSY
agents left over from the previous run are reset to a schedulable
state. Must run before the first scheduling cycle to avoid
ghost assignments. See ``_recover_stale_state``.
4. **Hook engine** — subscribe to EventBus events and pre-populate
last-run timestamps so periodic hooks don't all fire simultaneously
on startup. Depends on DB for reading last-run times.
This method must be called (and awaited) before ``run_one_cycle``.
Called by ``main.py`` during startup, after ``load_config()`` but
before the Discord bot connects.
"""
await self.db.initialize()
await self._sync_profiles_from_config()
await self._recover_stale_state()
if self.config.hook_engine.enabled:
self.hooks = HookEngine(self.db, self.bus, self.config)
self.hooks.set_orchestrator(self)
await self.hooks.initialize()
# Start config file watcher for hot-reloading
if self.config._config_path:
self._config_watcher = ConfigWatcher(
config_path=self.config._config_path,
event_bus=self.bus,
current_config=self.config,
)
self.bus.subscribe("config.reloaded", self._on_config_reloaded)
self._config_watcher.start()
|
wait_for_running_tasks
async
wait_for_running_tasks(timeout: float | None = None) -> None
Wait for all background task executions to finish.
This is primarily useful in tests where run_one_cycle fires off
background coroutines and the caller needs to wait for them to
complete before inspecting results.
Source code in src/orchestrator.py
| async def wait_for_running_tasks(self, timeout: float | None = None) -> None:
"""Wait for all background task executions to finish.
This is primarily useful in tests where ``run_one_cycle`` fires off
background coroutines and the caller needs to wait for them to
complete before inspecting results.
"""
if not self._running_tasks:
return
tasks = list(self._running_tasks.values())
if timeout is not None:
await asyncio.wait(tasks, timeout=timeout)
else:
await asyncio.gather(*tasks, return_exceptions=True)
|
shutdown
async
Gracefully shut down all subsystems in dependency order.
Shutdown order:
1. Wait for running agent tasks (with a 10s timeout so we don't
hang indefinitely if an adapter is stuck).
2. Shut down the hook engine (cancels any in-flight hook tasks).
3. Close the memory manager (flushes pending index writes).
4. Close the database connection.
The order matters: tasks and hooks use the DB, so we must wait
for them to finish before closing it.
Source code in src/orchestrator.py
| async def shutdown(self) -> None:
"""Gracefully shut down all subsystems in dependency order.
Shutdown order:
1. Wait for running agent tasks (with a 10s timeout so we don't
hang indefinitely if an adapter is stuck).
2. Shut down the hook engine (cancels any in-flight hook tasks).
3. Close the memory manager (flushes pending index writes).
4. Close the database connection.
The order matters: tasks and hooks use the DB, so we must wait
for them to finish before closing it.
"""
await self.wait_for_running_tasks(timeout=10)
if self._config_watcher:
await self._config_watcher.stop()
if self.hooks:
await self.hooks.shutdown()
if self.memory_manager:
try:
await self.memory_manager.close()
except Exception as e:
logger.warning("Memory manager shutdown error: %s", e)
await self.db.close()
|
run_one_cycle
async
Run one iteration of the orchestrator's main loop.
Called every ~5 seconds by main.py's scheduler loop. Each cycle
is designed to complete quickly (typically <1s of DB queries and
state checks) — heavy work (agent execution, git operations) is
delegated to background asyncio tasks that run concurrently.
The cycle is organized into three phases with numbered steps:
Phase 1 — Promotion cascade (steps 1-4):
- Approvals — complete tasks whose PRs were merged so
their dependents can be promoted in the same cycle.
- Resume paused — bring back rate-limited/token-exhausted tasks
whose backoff timers have expired.
- Promote DEFINED — check dependency satisfaction and move tasks
to READY. This must happen after approvals so freshly-completed
parent tasks unblock their children immediately.
- Stuck monitoring — rate-limited alerts for DEFINED tasks that
have been waiting too long (runs after promotion so we don't
false-alarm on tasks that just got promoted).
Phase 2 — Scheduling & launch (steps 5-6):
- Schedule — assign READY tasks to idle agents (skipped when
the orchestrator is paused).
- Launch — fire off background asyncio tasks for each new
assignment. These run concurrently with future cycles.
Phase 3 — Housekeeping (steps 7-10):
- Hook engine tick — process periodic hooks; event-driven hooks
fire asynchronously via the EventBus.
- Config hot-reload — periodically re-read non-critical settings
from disk (scheduling, archive, monitoring, etc.) without restart.
- Log cleanup — prune old LLM interaction logs and flush
prompt analytics for long-term cost analysis.
- Auto-archive — sweep terminal tasks older than the configured
threshold into the archive so they no longer clutter active views.
Ordering invariant: steps 1-3 form a "promotion cascade" where
completing an approval can immediately unblock a DEFINED task in
the same cycle. Breaking this order would add a 5s delay to
dependency chain progression.
Error handling: the entire cycle is wrapped in a try/except so
that a failure in one step (e.g., a DB query error) doesn't crash
the daemon — it logs the error and retries on the next cycle.
Source code in src/orchestrator.py
| async def run_one_cycle(self) -> None:
"""Run one iteration of the orchestrator's main loop.
Called every ~5 seconds by ``main.py``'s scheduler loop. Each cycle
is designed to complete quickly (typically <1s of DB queries and
state checks) — heavy work (agent execution, git operations) is
delegated to background asyncio tasks that run concurrently.
The cycle is organized into three phases with numbered steps:
**Phase 1 — Promotion cascade** (steps 1-4):
1. **Approvals** — complete tasks whose PRs were merged so
their dependents can be promoted in the same cycle.
2. **Resume paused** — bring back rate-limited/token-exhausted tasks
whose backoff timers have expired.
3. **Promote DEFINED** — check dependency satisfaction and move tasks
to READY. This must happen after approvals so freshly-completed
parent tasks unblock their children immediately.
4. **Stuck monitoring** — rate-limited alerts for DEFINED tasks that
have been waiting too long (runs after promotion so we don't
false-alarm on tasks that just got promoted).
**Phase 2 — Scheduling & launch** (steps 5-6):
5. **Schedule** — assign READY tasks to idle agents (skipped when
the orchestrator is paused).
6. **Launch** — fire off background asyncio tasks for each new
assignment. These run concurrently with future cycles.
**Phase 3 — Housekeeping** (steps 7-10):
7. **Hook engine tick** — process periodic hooks; event-driven hooks
fire asynchronously via the EventBus.
8. **Config hot-reload** — periodically re-read non-critical settings
from disk (scheduling, archive, monitoring, etc.) without restart.
9. **Log cleanup** — prune old LLM interaction logs and flush
prompt analytics for long-term cost analysis.
10. **Auto-archive** — sweep terminal tasks older than the configured
threshold into the archive so they no longer clutter active views.
Ordering invariant: steps 1-3 form a "promotion cascade" where
completing an approval can immediately unblock a DEFINED task in
the same cycle. Breaking this order would add a 5s delay to
dependency chain progression.
Error handling: the entire cycle is wrapped in a try/except so
that a failure in one step (e.g., a DB query error) doesn't crash
the daemon — it logs the error and retries on the next cycle.
"""
try:
# ── Phase 1: Promotion cascade ──────────────────────────────────
# Steps 1-3 form a "promotion cascade": completing an approval can
# immediately unblock a DEFINED task in the same cycle. Breaking
# this order would add a ~5s delay to dependency chain progression.
# 1. Poll PR merge/close status for AWAITING_APPROVAL tasks.
# Merged PRs → COMPLETED, which may satisfy downstream deps.
await self._check_awaiting_approval()
# 2. Promote PAUSED tasks whose backoff timer has expired → READY.
await self._resume_paused_tasks()
# 3. Promote DEFINED tasks whose dependencies are all met → READY.
# Runs after step 1 so freshly-completed approvals can unblock
# dependents within the same cycle.
await self._check_defined_tasks()
# 4. Monitoring: detect DEFINED tasks stuck beyond threshold.
# Runs after promotion so we don't false-alarm on tasks that
# were just promoted in step 3.
await self._check_stuck_defined_tasks()
# ── Phase 2: Scheduling & launch ────────────────────────────────
# 5. Schedule READY tasks onto idle agents (skipped when paused).
if not self._paused:
actions = await self._schedule()
else:
actions = []
# 6. Launch assigned tasks as background asyncio coroutines.
#
# First, reap completed background tasks from _running_tasks.
# We must do this BEFORE launching new tasks to free up task IDs
# (a task that just finished shouldn't block its re-assignment
# on a retry). We don't inspect results here — error handling
# is done inside _execute_task_safe_inner.
done = [tid for tid, t in self._running_tasks.items() if t.done()]
for tid in done:
self._running_tasks.pop(tid)
for action in actions:
if action.task_id in self._running_tasks:
continue # Already running — skip double-launch
bg = asyncio.create_task(self._execute_task_safe(action))
self._running_tasks[action.task_id] = bg
# ── Phase 3: Housekeeping ───────────────────────────────────────
# 7. Run hook engine tick (periodic hooks; event hooks fire async).
if self.hooks:
await self.hooks.tick()
# 8. Config hot-reload is handled by ConfigWatcher (background task).
# 9. Periodic log cleanup and analytics flush (~once per hour).
now = time.time()
if now - self._last_log_cleanup >= 3600:
self._last_log_cleanup = now
try:
removed = self.llm_logger.cleanup_old_logs()
if removed:
logger.info("LLM log cleanup: removed %d old directory(ies)", removed)
# Flush prompt analytics to disk for long-term analysis
self.llm_logger.flush_analytics()
except Exception as e:
logger.error("LLM log cleanup error: %s", e)
# 10. Auto-archive stale terminal tasks (~once per hour).
await self._auto_archive_tasks()
except Exception as e:
logger.error("Scheduler cycle error", exc_info=True)
|
Functions