Event-driven and periodic hook engine for automated workflows.
Hooks enable the system to react to task lifecycle events or run on a timer
without human intervention. Each hook follows a pipeline::
trigger -> gather context (shell/file/http/db/git steps)
-> short-circuit check -> render prompt template -> invoke LLM with tools
The LLM invocation uses a full ChatAgent instance with tool access, so hooks
can create tasks, check status, send notifications, etc. -- anything a human
user can do via Discord chat, a hook can do autonomously.
Two trigger types are supported:
- Periodic: fires on a timer (
interval_seconds), checked every
orchestrator tick (~5s). Actual firing granularity is bounded by the
tick interval — a 10s periodic hook will fire every 10-15s, not exactly
every 10s.
- Event: fires when a matching EventBus event arrives (e.g.
task.completed). Events are delivered asynchronously via
_on_event, which re-queries all enabled hooks for matches.
Concurrency and cooldown interaction::
Hook fires → _last_run_time[hook_id] = now, task added to _running
↓
Subsequent triggers check THREE gates before firing:
1. Concurrency: len(_running) < max_concurrent_hooks (global cap)
2. In-flight: hook_id not in _running (per-hook cap)
3. Cooldown: now - _last_run_time[hook_id] >= cooldown_seconds
↓
All three must pass → hook fires again
↓
When task finishes: tick() reaps it from _running,
freeing the per-hook and global concurrency slots
Tool access in hook LLM calls:
Each hook invocation creates a fresh ChatAgent instance with the same
tool set that human users have via Discord chat. This means hooks can:
- Create tasks (``/add-task``)
- Query task status
- Send notifications
- Use any registered MCP tools
The ChatAgent is stateless — no conversation history carries between
hook invocations. The rendered prompt is the entire context.
Integration with the orchestrator:
The orchestrator creates the HookEngine at ``initialize()`` and calls
``hooks.tick()`` every cycle (step 7 of ``run_one_cycle``). The engine
holds a back-reference to the orchestrator (via ``set_orchestrator``)
for LLM invocation (ChatAgent creation) and memory search access.
See ``src/orchestrator.py::initialize()`` and ``run_one_cycle()`` for
the integration points.
See ``specs/hooks.md`` for the full specification.
Classes
HookEngine
HookEngine(db: Database, bus: EventBus, config: AppConfig)
Manages hook lifecycle: scheduling, context gathering, LLM invocation.
The engine subscribes to all EventBus events (wildcard *) so it can
match event-driven hooks. Periodic hooks are checked on each tick()
call from the orchestrator loop. Running hooks are tracked as asyncio
Tasks with a configurable concurrency cap.
Source code in src/hooks.py
| def __init__(self, db: Database, bus: EventBus, config: AppConfig):
self.db = db
self.bus = bus
self.config = config
# In-flight hook executions. Keyed by hook_id so we can:
# (a) enforce "one run at a time per hook" — skip if already in-flight,
# (b) count towards the global max_concurrent_hooks cap,
# (c) reap finished tasks and surface exceptions in tick().
self._running: dict[str, asyncio.Task] = {}
# Tracks when each hook last started (epoch seconds). Used for both
# periodic interval checks ("has enough time passed?") and cooldown
# enforcement ("is the hook still in its cooldown window?").
# Pre-populated from DB at initialize() to survive daemon restarts.
self._last_run_time: dict[str, float] = {}
# FileWatcher for file/folder change monitoring. Created at
# initialize() if file_watcher_enabled is True. Watches are
# extracted from hook trigger configs that reference file.changed
# or folder.changed event types.
self.file_watcher: FileWatcher | None = None
|
Functions
initialize
async
Subscribe to EventBus for event-driven hooks and restore state.
Three setup steps:
1. Register a wildcard EventBus subscriber so _on_event receives
every event type. The method then filters for matching hooks.
2. Subscribe to config.reloaded so the hook engine picks up
changes to hook_engine settings at runtime.
3. Pre-populate _last_run_time from the DB so that periodic hooks
don't all fire immediately on daemon startup. Without this, a
restart would cause every periodic hook to trigger simultaneously
(because their in-memory last-run timestamps would default to 0).
Source code in src/hooks.py
| async def initialize(self) -> None:
"""Subscribe to EventBus for event-driven hooks and restore state.
Three setup steps:
1. Register a wildcard EventBus subscriber so ``_on_event`` receives
every event type. The method then filters for matching hooks.
2. Subscribe to ``config.reloaded`` so the hook engine picks up
changes to ``hook_engine`` settings at runtime.
3. Pre-populate ``_last_run_time`` from the DB so that periodic hooks
don't all fire immediately on daemon startup. Without this, a
restart would cause every periodic hook to trigger simultaneously
(because their in-memory last-run timestamps would default to 0).
"""
self.bus.subscribe("*", self._on_event)
self.bus.subscribe("config.reloaded", self._on_config_reloaded)
# Pre-populate last run times from DB
hooks = await self.db.list_hooks(enabled=True)
for hook in hooks:
last_run = await self.db.get_last_hook_run(hook.id)
if last_run:
self._last_run_time[hook.id] = last_run.started_at
# Initialize FileWatcher for file/folder change event hooks
if self.config.hook_engine.file_watcher_enabled:
self.file_watcher = FileWatcher(
self.bus,
debounce_seconds=self.config.hook_engine.file_watcher_debounce_seconds,
poll_interval=self.config.hook_engine.file_watcher_poll_interval,
)
await self._sync_file_watches(hooks)
|
tick
async
Called every orchestrator cycle (~5s). Manage hook lifecycle.
This method performs two duties each tick:
1. Reap completed hook tasks — scan _running for asyncio
Tasks that have finished. Surface any unhandled exceptions as log
errors (they are otherwise silently swallowed by asyncio). Remove
finished tasks from the dict so their hook_id is eligible to fire
again.
2. Check periodic hook schedules — for each enabled periodic
hook, compare the current time against its interval_seconds and
cooldown_seconds. If both thresholds are met and the global
concurrency cap (max_concurrent_hooks) has room, launch the
hook as a new asyncio task.
Event-driven hooks are NOT checked here — they are triggered
asynchronously via _on_event when the EventBus delivers a
matching event.
Source code in src/hooks.py
| async def tick(self) -> None:
"""Called every orchestrator cycle (~5s). Manage hook lifecycle.
This method performs two duties each tick:
**1. Reap completed hook tasks** — scan ``_running`` for asyncio
Tasks that have finished. Surface any unhandled exceptions as log
errors (they are otherwise silently swallowed by asyncio). Remove
finished tasks from the dict so their hook_id is eligible to fire
again.
**2. Check periodic hook schedules** — for each enabled periodic
hook, compare the current time against its ``interval_seconds`` and
``cooldown_seconds``. If both thresholds are met and the global
concurrency cap (``max_concurrent_hooks``) has room, launch the
hook as a new asyncio task.
Event-driven hooks are NOT checked here — they are triggered
asynchronously via ``_on_event`` when the EventBus delivers a
matching event.
"""
# Phase 1: Reap completed hook tasks and surface exceptions.
done = [hid for hid, t in self._running.items() if t.done()]
for hid in done:
task = self._running.pop(hid)
if task.done() and not task.cancelled():
exc = task.exception()
if exc:
logger.error("Hook task %s failed: %s", hid, exc)
# Phase 2: Check periodic hooks that are due to fire.
hooks = await self.db.list_hooks(enabled=True)
now = time.time()
max_concurrent = self.config.hook_engine.max_concurrent_hooks
for hook in hooks:
if len(self._running) >= max_concurrent:
break # Global concurrency cap reached
if hook.id in self._running:
continue # Already in-flight — skip
trigger = json.loads(hook.trigger)
trigger_type = trigger.get("type")
if trigger_type == "periodic":
interval = trigger.get("interval_seconds", 3600)
last = self._last_run_time.get(hook.id, 0)
if now - last >= interval:
if self._check_cooldown(hook, now):
now_iso = datetime.fromtimestamp(
now, tz=timezone.utc
).isoformat()
timing_data: dict = {
"current_time": now_iso,
"current_time_epoch": now,
}
if last:
timing_data["last_run_time"] = datetime.fromtimestamp(
last, tz=timezone.utc
).isoformat()
timing_data["last_run_time_epoch"] = last
timing_data["seconds_since_last_run"] = now - last
self._launch_hook(
hook, "periodic", event_data=timing_data
)
# Phase 3: Poll file watcher for filesystem changes.
# The file watcher emits file.changed / folder.changed events on
# the EventBus, which are then picked up by _on_event like any
# other event-driven hook.
if self.file_watcher:
try:
await self.file_watcher.check()
except Exception as e:
logger.warning("FileWatcher check failed: %s", e)
|
fire_hook
async
fire_hook(hook_id: str) -> str
Manually trigger a hook, ignoring cooldown.
Used by the /fire-hook Discord command to allow operators to
run a hook on-demand (e.g. for testing or urgent checks). Unlike
the normal periodic/event trigger path, this bypasses cooldown
checks — but it still respects the "already running" guard (a
hook cannot be fired if it's already in-flight).
Returns the hook ID (used as a proxy run identifier).
Source code in src/hooks.py
| async def fire_hook(self, hook_id: str) -> str:
"""Manually trigger a hook, ignoring cooldown.
Used by the ``/fire-hook`` Discord command to allow operators to
run a hook on-demand (e.g. for testing or urgent checks). Unlike
the normal periodic/event trigger path, this bypasses cooldown
checks — but it still respects the "already running" guard (a
hook cannot be fired if it's already in-flight).
Returns the hook ID (used as a proxy run identifier).
"""
hook = await self.db.get_hook(hook_id)
if not hook:
raise ValueError(f"Hook '{hook_id}' not found")
if hook.id in self._running:
raise ValueError(f"Hook '{hook_id}' is already running")
self._last_run_time[hook.id] = time.time()
task = asyncio.create_task(
self._execute_hook(hook, "manual")
)
self._running[hook.id] = task
return hook.id
|
shutdown
async
Cancel all running hook tasks and wait for them to finish.
Uses asyncio.gather(..., return_exceptions=True) to ensure we
don't propagate CancelledError — we just want everything stopped.
Source code in src/hooks.py
| async def shutdown(self) -> None:
"""Cancel all running hook tasks and wait for them to finish.
Uses ``asyncio.gather(..., return_exceptions=True)`` to ensure we
don't propagate CancelledError — we just want everything stopped.
"""
for hook_id, task in self._running.items():
if not task.done():
task.cancel()
# Wait for all to finish
if self._running:
await asyncio.gather(
*self._running.values(), return_exceptions=True
)
self._running.clear()
|
set_orchestrator
set_orchestrator(orchestrator) -> None
Store reference to the orchestrator (for LLM invocation).
The HookEngine needs access to the Orchestrator for two reasons:
1. _invoke_llm creates a ChatAgent which requires an orchestrator
reference to register its tools (task management, status queries).
2. _step_memory_search accesses orchestrator.memory_manager
for semantic search in context steps.
This is a circular reference (orchestrator owns hooks, hooks reference
orchestrator) that is broken at shutdown via hooks.shutdown().
Source code in src/hooks.py
| def set_orchestrator(self, orchestrator) -> None:
"""Store reference to the orchestrator (for LLM invocation).
The HookEngine needs access to the Orchestrator for two reasons:
1. ``_invoke_llm`` creates a ChatAgent which requires an orchestrator
reference to register its tools (task management, status queries).
2. ``_step_memory_search`` accesses ``orchestrator.memory_manager``
for semantic search in context steps.
This is a circular reference (orchestrator owns hooks, hooks reference
orchestrator) that is broken at shutdown via ``hooks.shutdown()``.
"""
self._orchestrator = orchestrator
|
Functions