Discord integration layer -- connects the orchestrator to Discord via discord.py.
AgentQueueBot extends commands.Bot with:
- LLM-powered chat (via ChatAgent) that lets users interact with the orchestrator
through natural language
- Per-project channel routing so each project's notifications land in the right place
- Thread-based task output streaming (one Discord thread per agent execution)
- Message history with compaction (older messages summarized, recent kept verbatim)
Key design decision: the bot maintains in-memory channel caches
(_project_channels, _channel_to_project) for O(1) message routing.
On startup, these are populated from the database via _resolve_project_channels,
and kept in sync at runtime when channels are created, reassigned, or deleted.
Message flow::
Discord message -> on_message routing -> _build_message_history
-> ChatAgent.chat() -> tool-use loop -> _send_long_message -> Discord reply
See specs/discord/discord.md for the full specification.
Classes
CachedMessage
dataclass
CachedMessage(id: int, author_name: str, is_bot: bool, content: str, created_at: float)
Lightweight representation of a Discord message for local buffering.
AgentQueueBot
AgentQueueBot(config: AppConfig, orchestrator: Orchestrator)
Bases: Bot
Discord bot that bridges user interaction to the AgentQueue orchestrator.
Responsibilities:
- Registers slash commands and an authorization guard on startup
- Resolves per-project Discord channels from the database for fast routing
- Sets orchestrator callbacks for notifications and thread creation
- Handles incoming messages: routes them through ChatAgent for LLM responses,
serializing concurrent requests per channel to avoid duplicate processing
Source code in src/discord/bot.py
| def __init__(self, config: AppConfig, orchestrator: Orchestrator):
intents = discord.Intents.default()
intents.message_content = True
super().__init__(command_prefix="!", intents=intents)
self.config = config
self.orchestrator = orchestrator
self.agent = ChatAgent(orchestrator, config, llm_logger=orchestrator.llm_logger)
# Register a callback so that project deletions (from any caller)
# automatically purge the bot's in-memory channel caches.
self.agent.handler._on_project_deleted = self.clear_project_channels
self._channel: discord.TextChannel | None = None
# Per-project channel cache: project_id -> channel
self._project_channels: dict[str, discord.TextChannel] = {}
# Reverse lookup: channel_id -> project_id
self._channel_to_project: dict[int, str] = {}
self._processed_messages: set[int] = set()
self._channel_summaries: dict[int, tuple[int, str]] = {} # channel_id -> (up_to_message_id, summary)
self._channel_locks: dict[int, asyncio.Lock] = {} # prevent concurrent LLM calls per channel
# Local message buffer — caches messages as they arrive via on_message
self._channel_buffers: dict[int, collections.deque[CachedMessage]] = {}
self._buffer_last_access: dict[int, float] = {} # channel_id -> last access timestamp
self._summarization_tasks: dict[int, asyncio.Task] = {} # channel_id -> background task
self._boot_time: float | None = None
self._notes_threads: dict[int, str] = {} # thread_id -> project_id
self._notes_toc_messages: dict[int, int] = {} # thread_id -> toc_message_id
self._notes_threads_path = os.path.join(
os.path.dirname(config.database_path), "notes_threads.json"
)
self._load_notes_threads_sync()
self._guild: discord.Guild | None = None
# Notes auto-refresh tracking
self._note_viewers: dict[int, dict[str, int]] = {} # thread_id -> {filename: msg_id}
self._note_refresh_timers: dict[str, asyncio.TimerHandle] = {} # debounce key -> timer
# Wire up the note-written callback
self.agent.handler.on_note_written = self._handle_note_written
|
Functions
update_project_channel
update_project_channel(project_id: str, channel: TextChannel) -> None
Update the cached channel for a project at runtime.
Called after /set-channel or /create-channel commands so the
bot immediately routes to the new channel without requiring a restart.
Also maintains the reverse _channel_to_project mapping.
Source code in src/discord/bot.py
| def update_project_channel(
self, project_id: str, channel: discord.TextChannel
) -> None:
"""Update the cached channel for a project at runtime.
Called after ``/set-channel`` or ``/create-channel`` commands so the
bot immediately routes to the new channel without requiring a restart.
Also maintains the reverse ``_channel_to_project`` mapping.
"""
# Remove stale reverse entry for old channel, if any
old_ch = self._project_channels.get(project_id)
if old_ch is not None and old_ch.id != channel.id:
self._channel_to_project.pop(old_ch.id, None)
self._project_channels[project_id] = channel
# Always update the reverse mapping
self._channel_to_project[channel.id] = project_id
|
clear_project_channels
clear_project_channels(project_id: str) -> None
Remove all cached channels for a project.
Called after project deletion to keep the in-memory cache in sync
with the database and avoid stale routing entries. Cleans up:
- _project_channels
- _notes_threads entries that map to the deleted project
- _channel_summaries and _channel_locks for the project's channels
Source code in src/discord/bot.py
| def clear_project_channels(self, project_id: str) -> None:
"""Remove all cached channels for a project.
Called after project deletion to keep the in-memory cache in sync
with the database and avoid stale routing entries. Cleans up:
- ``_project_channels``
- ``_notes_threads`` entries that map to the deleted project
- ``_channel_summaries`` and ``_channel_locks`` for the project's channels
"""
# Collect Discord channel IDs before removing them from the cache so we
# can clean up secondary caches keyed by channel ID.
stale_channel_ids: set[int] = set()
ch = self._project_channels.pop(project_id, None)
if ch is not None:
stale_channel_ids.add(ch.id)
self._channel_to_project.pop(ch.id, None)
# Remove notes-thread mappings that point to the deleted project.
# These are also persisted to disk, so we re-save afterwards.
stale_thread_ids = [
tid for tid, pid in self._notes_threads.items() if pid == project_id
]
if stale_thread_ids:
for tid in stale_thread_ids:
stale_channel_ids.add(tid)
del self._notes_threads[tid]
self._save_notes_threads_sync()
# Purge channel-level caches keyed by Discord channel/thread ID.
for cid in stale_channel_ids:
self._channel_summaries.pop(cid, None)
self._channel_locks.pop(cid, None)
self._channel_buffers.pop(cid, None)
self._buffer_last_access.pop(cid, None)
task = self._summarization_tasks.pop(cid, None)
if task and not task.done():
task.cancel()
|
get_project_for_channel
get_project_for_channel(channel_id: int) -> str | None
Return the project_id associated with a Discord channel, or None.
Performs an O(1) lookup against the reverse mapping that covers
both notification and control channels for every project.
Source code in src/discord/bot.py
| def get_project_for_channel(self, channel_id: int) -> str | None:
"""Return the project_id associated with a Discord channel, or ``None``.
Performs an O(1) lookup against the reverse mapping that covers
both notification and control channels for every project.
"""
return self._channel_to_project.get(channel_id)
|
on_message
async
on_message(message: Message) -> None
Route incoming Discord messages to the ChatAgent for LLM processing.
Routing logic (a message is handled if ANY of these match):
1. Posted in the global bot channel (configured in config.yaml)
2. Posted in a per-project channel (O(1) reverse lookup via _channel_to_project)
3. The bot is @mentioned anywhere in the guild
4. Posted in a registered notes thread
For project channels and notes threads, implicit project context is
injected into the prompt so the LLM defaults to the right project
without requiring the user to specify it every time.
Concurrency: a per-channel asyncio.Lock serializes LLM calls to
prevent duplicate or interleaved responses when messages arrive faster
than the LLM can respond.
Source code in src/discord/bot.py
| async def on_message(self, message: discord.Message) -> None:
"""Route incoming Discord messages to the ChatAgent for LLM processing.
Routing logic (a message is handled if ANY of these match):
1. Posted in the global bot channel (configured in config.yaml)
2. Posted in a per-project channel (O(1) reverse lookup via _channel_to_project)
3. The bot is @mentioned anywhere in the guild
4. Posted in a registered notes thread
For project channels and notes threads, implicit project context is
injected into the prompt so the LLM defaults to the right project
without requiring the user to specify it every time.
Concurrency: a per-channel asyncio.Lock serializes LLM calls to
prevent duplicate or interleaved responses when messages arrive faster
than the LLM can respond.
"""
# Buffer the message locally before any early-return guards.
# Bot's own messages arrive back via the gateway, so this captures
# both user AND bot messages for the local history cache.
channel_id = message.channel.id
if self._should_buffer(channel_id, message):
self._append_to_buffer(channel_id, CachedMessage(
id=message.id,
author_name="AgentQueue" if message.author == self.user else message.author.display_name,
is_bot=message.author == self.user,
content=message.content,
created_at=message.created_at.timestamp(),
))
# Ignore own messages
if message.author == self.user:
return
# Authorization guard — silently ignore unauthorized users
if not self._is_authorized(message.author.id):
return
# Dedup guard — prevent processing the same message twice
if message.id in self._processed_messages:
return
self._processed_messages.add(message.id)
# Keep the set from growing unbounded
if len(self._processed_messages) > 200:
self._processed_messages = set(list(self._processed_messages)[-100:])
# Skip messages created before the bot started (prevents reprocessing after restart)
if self._boot_time and message.created_at.timestamp() < self._boot_time:
return
# Only respond in the global bot channel, per-project channels,
# when mentioned, or in a notes thread
is_bot_channel = (
self._channel
and message.channel.id == self._channel.id
)
# Check if this is a per-project channel (O(1) reverse lookup)
project_channel_id: str | None = self._channel_to_project.get(message.channel.id)
is_mentioned = self.user in message.mentions
notes_project_id = self._notes_threads.get(message.channel.id)
is_notes_thread = notes_project_id is not None
if not is_bot_channel and project_channel_id is None and not is_mentioned and not is_notes_thread:
return
# Strip the bot mention from the message text
text = message.content
if self.user:
text = text.replace(f"<@{self.user.id}>", "").strip()
if not text:
await message.reply("How can I help? Ask me about status, projects, or tasks.")
return
# Serialize LLM processing per channel to avoid duplicate/concurrent responses
lock = self._channel_locks.setdefault(message.channel.id, asyncio.Lock())
async with lock:
# Notify on cold model loads (Ollama first-call latency)
try:
if not await self.agent.is_model_loaded():
await message.channel.send(
"\u23f3 Loading model, this may take a moment..."
)
except Exception:
pass # fail-open — skip notification silently
thinking_msg: discord.Message | None = None
async with message.channel.typing():
try:
if not self.agent.is_ready:
await message.reply(
"LLM not configured — I can only respond to slash commands. "
"Set `ANTHROPIC_API_KEY` or run `claude login`."
)
return
# Prepend project context for project channels and notes threads
user_text = text
if project_channel_id and not is_bot_channel:
user_text = (
f"[Context: this is the channel for project "
f"`{project_channel_id}`. Default to using "
f"project_id='{project_channel_id}' for all project-scoped "
f"commands.]\n{text}"
)
elif is_notes_thread and not is_bot_channel:
user_text = (
f"[NOTES MODE for project '{notes_project_id}'. "
f"BEHAVIOR: The user will type stream-of-consciousness thoughts. "
f"1. Call list_notes to see existing notes. "
f"2. Categorize input — decide which note it belongs to or create new. "
f"3. Use append_note to add to existing, or write_note for new. "
f"4. Respond with BRIEF confirmation: which note updated + 1-line summary. "
f"5. For browsing/management/comparison requests, use appropriate tools. "
f"Default project_id='{notes_project_id}'.]\n{text}"
)
# Set active project from channel context so that git
# commands (and other project-scoped tools) automatically
# infer the correct repository without the LLM needing to
# explicitly pass project_id in every tool call.
prev_active = self.agent._active_project_id
if project_channel_id and not is_bot_channel:
self.agent.set_active_project(project_channel_id)
elif is_notes_thread and not is_bot_channel:
self.agent.set_active_project(notes_project_id)
# Build history from Discord channel
history = await self._build_message_history(message.channel, before=message)
# Send a thinking indicator that updates as the agent works.
# This gives users real-time feedback on what the agent is
# doing: thinking, calling tools, or composing a reply.
thinking_msg = await message.reply("💭 Thinking...")
tool_names_used: list[str] = []
async def _on_progress(event: str, detail: str | None) -> None:
"""Update the thinking indicator as the agent progresses.
Events:
- "thinking" (detail=None): initial LLM call
- "thinking" (detail="round N"): subsequent LLM round
- "tool_use" (detail=tool_name): a tool is being called
- "responding": LLM is producing final text response
"""
nonlocal thinking_msg
if thinking_msg is None:
return
try:
if event == "thinking" and not detail:
# Initial thinking — already showing "Thinking..."
pass
elif event == "thinking" and detail:
# Subsequent LLM round after tool use
steps = " → ".join(f"`{t}`" for t in tool_names_used)
await thinking_msg.edit(
content=f"💭 Thinking... {steps} → 💭"
)
elif event == "tool_use" and detail:
tool_names_used.append(detail)
steps = " → ".join(f"`{t}`" for t in tool_names_used)
await thinking_msg.edit(
content=f"🔧 Working... {steps}"
)
elif event == "responding":
steps = " → ".join(f"`{t}`" for t in tool_names_used)
if steps:
await thinking_msg.edit(
content=f"✅ {steps} → composing reply..."
)
else:
await thinking_msg.edit(content="✍️ Composing reply...")
except discord.NotFound:
thinking_msg = None # Deleted externally; stop updating
except Exception:
pass # Fail-open — don't break the chat flow
try:
response = await self.agent.chat(
user_text, message.author.display_name,
history=history, on_progress=_on_progress,
)
except Exception as e:
import anthropic
if isinstance(e, anthropic.AuthenticationError):
# Token may have been refreshed — reload and retry once
print(f"Auth error — reloading credentials: {e}")
if self.agent.reload_credentials():
response = await self.agent.chat(
user_text, message.author.display_name,
history=history, on_progress=_on_progress,
)
else:
response = (
"Authentication failed. Run `claude login` "
"or set `ANTHROPIC_API_KEY`."
)
else:
raise
# Delete the thinking indicator and send the actual response
await self._delete_thinking_msg(thinking_msg)
thinking_msg = None
await self._send_long_message(
message.channel, response, reply_to=message
)
except Exception as e:
await self._delete_thinking_msg(thinking_msg)
thinking_msg = None
print(f"LLM error: {e}\n{traceback.format_exc()}")
await message.reply(f"**LLM error:** {e}")
finally:
# Restore previous active project to avoid leaking
# channel-specific context across concurrent requests.
self.agent.set_active_project(prev_active)
|
Functions