Agent Queue daemon entry point — process lifecycle and signal handling.
Startup sequence
- Configure structured logging (JSON or human-readable, with correlation IDs)
- Load config from YAML (default: ~/.agent-queue/config.yaml)
- Validates all settings (fails fast on misconfiguration)
- Applies environment-specific overlay (config.{env}.yaml) if present
- Initialize the Orchestrator (database, event bus, scheduler)
- Start the health check HTTP server (if enabled)
- Start the Discord bot (connects to gateway, registers commands)
- Run the scheduler loop (~5s cycle: promote tasks, assign agents, execute)
- Wait for SIGTERM/SIGINT to trigger graceful shutdown
On shutdown, the bot, health check server, and orchestrator are closed cleanly.
If a restart was requested (via the /restart Discord command), the process
replaces itself using os.execv() — this ensures a fresh Python interpreter
with updated code while the systemd/supervisor unit sees a continuous process.
See specs/main.md for the full specification.
Classes
Functions
run
async
run(config_path: str, profile: str | None = None) -> bool
Run the daemon. Returns True if a restart was requested.
Source code in src/main.py
| async def run(config_path: str, profile: str | None = None) -> bool:
"""Run the daemon. Returns True if a restart was requested."""
# Set up structured logging early (before any other import logs)
setup_logging(
level=os.environ.get("AGENT_QUEUE_LOG_LEVEL", "INFO"),
format="json" if os.environ.get("AGENT_QUEUE_LOG_FORMAT") == "json" else "text",
)
config = load_config(config_path, profile=profile)
logger.info(
"Starting with env=%s, profile=%s",
config.env,
config.profile or "no profile",
)
# Configure structured logging before anything else
setup_logging(
level=config.logging.level,
format=config.logging.format,
include_source=config.logging.include_source,
)
# Ensure database directory exists
os.makedirs(os.path.dirname(config.database_path), exist_ok=True)
orch = Orchestrator(config, adapter_factory=None)
adapter_factory = AdapterFactory(llm_logger=orch.llm_logger)
orch._adapter_factory = adapter_factory
await orch.initialize()
# Start health check server (if enabled)
health_server = HealthCheckServer(
config=config.health_check,
health_provider=lambda: _health_checks(orch),
)
await health_server.start()
# Start Discord bot
bot = AgentQueueBot(config, orch)
shutdown_event = asyncio.Event()
def handle_signal():
shutdown_event.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, handle_signal)
async def run_bot():
try:
await bot.start(config.discord.bot_token)
except asyncio.CancelledError:
pass
async def run_scheduler():
# Wait for the bot to be ready before scheduling
await bot.wait_until_ready()
while not shutdown_event.is_set():
await orch.run_one_cycle()
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=5.0)
except asyncio.TimeoutError:
pass
bot_task = asyncio.create_task(run_bot())
scheduler_task = asyncio.create_task(run_scheduler())
try:
# Wait until shutdown is signaled
await shutdown_event.wait()
finally:
# Shut down bot, health server, and orchestrator
restart = orch._restart_requested
await health_server.stop()
await bot.close()
bot_task.cancel()
scheduler_task.cancel()
await orch.shutdown()
return restart
|