Skip to content

Main

Application entry point and daemon lifecycle.

Agent Queue daemon entry point — process lifecycle and signal handling.

Startup sequence
  1. Configure structured logging (JSON or human-readable, with correlation IDs)
  2. Load config from YAML (default: ~/.agent-queue/config.yaml)
  3. Validates all settings (fails fast on misconfiguration)
  4. Applies environment-specific overlay (config.{env}.yaml) if present
  5. Initialize the Orchestrator (database, event bus, scheduler)
  6. Start the health check HTTP server (if enabled)
  7. Start the Discord bot (connects to gateway, registers commands)
  8. Run the scheduler loop (~5s cycle: promote tasks, assign agents, execute)
  9. Wait for SIGTERM/SIGINT to trigger graceful shutdown

On shutdown, the bot, health check server, and orchestrator are closed cleanly. If a restart was requested (via the /restart Discord command), the process replaces itself using os.execv() — this ensures a fresh Python interpreter with updated code while the systemd/supervisor unit sees a continuous process.

See specs/main.md for the full specification.

Classes

Functions

run async

run(config_path: str, profile: str | None = None) -> bool

Run the daemon. Returns True if a restart was requested.

Source code in src/main.py
async def run(config_path: str, profile: str | None = None) -> bool:
    """Run the daemon. Returns True if a restart was requested."""
    # Set up structured logging early (before any other import logs)
    setup_logging(
        level=os.environ.get("AGENT_QUEUE_LOG_LEVEL", "INFO"),
        format="json" if os.environ.get("AGENT_QUEUE_LOG_FORMAT") == "json" else "text",
    )

    config = load_config(config_path, profile=profile)
    logger.info(
        "Starting with env=%s, profile=%s",
        config.env,
        config.profile or "no profile",
    )

    # Configure structured logging before anything else
    setup_logging(
        level=config.logging.level,
        format=config.logging.format,
        include_source=config.logging.include_source,
    )

    # Ensure database directory exists
    os.makedirs(os.path.dirname(config.database_path), exist_ok=True)

    orch = Orchestrator(config, adapter_factory=None)
    adapter_factory = AdapterFactory(llm_logger=orch.llm_logger)
    orch._adapter_factory = adapter_factory
    await orch.initialize()

    # Start health check server (if enabled)
    health_server = HealthCheckServer(
        config=config.health_check,
        health_provider=lambda: _health_checks(orch),
    )
    await health_server.start()

    # Start Discord bot
    bot = AgentQueueBot(config, orch)

    shutdown_event = asyncio.Event()

    def handle_signal():
        shutdown_event.set()

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, handle_signal)

    async def run_bot():
        try:
            await bot.start(config.discord.bot_token)
        except asyncio.CancelledError:
            pass

    async def run_scheduler():
        # Wait for the bot to be ready before scheduling
        await bot.wait_until_ready()
        while not shutdown_event.is_set():
            await orch.run_one_cycle()
            try:
                await asyncio.wait_for(shutdown_event.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                pass

    bot_task = asyncio.create_task(run_bot())
    scheduler_task = asyncio.create_task(run_scheduler())

    try:
        # Wait until shutdown is signaled
        await shutdown_event.wait()
    finally:
        # Shut down bot, health server, and orchestrator
        restart = orch._restart_requested
        await health_server.stop()
        await bot.close()
        bot_task.cancel()
        scheduler_task.cancel()
        await orch.shutdown()

    return restart