Hooks
Session-global middleware that observes and reacts to agent events — gate generations, attach metrics, retry with feedback, finish a turn.
A hook is an async function that fires on a specific agent event. Hooks are middleware: the runtime delivers each AgentEvent to every matching hook before the next step proceeds, and a hook can return a Reaction to steer what happens next — continue, retry with feedback, finish the turn, or fail.
from dreadnode.agents.events import ToolErrorfrom dreadnode.core.hook import hook
@hook(ToolError)async def log_tool_error(event: ToolError) -> None: print(f"tool {event.tool_call.name} failed: {event.error}")The runtime imports hooks/observer.py when the capability loads, registers log_tool_error against ToolError, and calls it for every tool failure on every turn.
Where hooks live
Section titled “Where hooks live”Hooks come from Python files declared in the manifest:
hooks: - hooks/observer.pyIf hooks: is omitted, the runtime auto-discovers any *.py in the hooks/ directory. Set hooks: [] to disable entirely.
The loader collects module-level Hook instances — anything produced by the @hook(...) decorator. Functions without the decorator are ignored.
Hooks are session-global middleware. Unlike tools, they are not filtered by per-agent rules — a capability that ships a @hook(GenerationStep) participates in every turn for every agent as long as the capability is loaded.
To disable a hook without removing the file, gate the capability behind a flag:
flags: observer-enabled: description: Enable the observer hook. default: true
hooks: - hooks/observer.pyCapability-level flags gate the entire capability’s load, which includes its hooks. For finer-grained control, read the flag inside the handler:
import os
@hook(ToolError)async def log_tool_error(event: ToolError) -> None: if os.environ.get("CAPABILITY_FLAG__OBSERVER__ENABLED") != "1": return ...The decorator
Section titled “The decorator”@hook(event_type, *, when=None, scorers=None) returns a Hook instance. The handler must be async def.
| Argument | Purpose |
|---|---|
event_type | An AgentEvent subclass. The hook only fires for events of this exact type (or a subclass). |
when | List of Conditions evaluated in order. The hook body runs only if every condition passes. |
scorers | List of Scorers run after when passes. Each scorer attaches a metric series to event.metrics. |
from dreadnode.agents.events import GenerationStepfrom dreadnode.core.hook import hook
@hook( GenerationStep, when=[quality.above(0.5)], scorers=[safety, toxicity],)async def gated(event: GenerationStep) -> None: # event.metrics["quality"], event.metrics["safety"], # event.metrics["toxicity"] are all populated. ...when predicates can attach metrics as a side effect (ScoringConditions do this), so the body can read event.metrics[...] without re-scoring. Bare conditions just gate execution.
@hook also works on methods. Use it on a class to share state across handlers:
class Observer: def __init__(self) -> None: self.failures: list[str] = []
@hook(ToolError) async def record(self, event: ToolError) -> None: self.failures.append(event.tool_call.name)
observer = Observer() # module-level instance — required for the loader to pick up its hooksCommon event types
Section titled “Common event types”Every hook subscribes to one event type. The runtime emits a fixed catalog; the most useful ones for capability authors:
| Event | When it fires |
|---|---|
AgentStart | New agent run begins. Useful for seeding per-run state. |
AgentEnd | Agent run finishes (success, fail, or stalled). |
AgentStep | Any step — generation, tool call, or react. Subclasses below. |
GenerationStep | Model produced a response (with optional tool calls). |
GenerationError | Model call failed before producing a response. |
ToolStep | A tool call completed (success or surfaced error). |
ToolError | Exception escaped a tool — the agent will see a structured error. |
Heartbeat | Periodic tick during a long step. Useful for cancellation polling. |
CompactionEvent | The runtime compacted the conversation to fit the context window. |
UserInputRequired | Agent paused awaiting human input via ask_user(). |
Subscribing to AgentStep covers all step subclasses except ReactStep — reactions trigger their own steps, and the runtime suppresses the cascade so a hook listening to AgentStep doesn’t fire on its own reaction. Use @hook(ReactStep) explicitly when you need that.
The full event surface lives at dreadnode.agents.events.
Reactions
Section titled “Reactions”A hook can return a Reaction to influence the runtime. Returning None (or having no return) is the no-op — the agent proceeds normally.
| Reaction | Effect |
|---|---|
Continue(...) | Proceed, optionally injecting messages or feedback for the next generation. |
Retry() | Retry the current step. |
RetryWithFeedback | Retry with a feedback string the model sees on the next attempt. |
Finish(reason=...) | End the turn cleanly. The reason appears in the trace. |
Fail(error=...) | End the turn with an error. The error propagates to the caller. |
from dreadnode.agents.events import GenerationStepfrom dreadnode.agents.reactions import Fail, Finishfrom dreadnode.core.hook import hook
@hook(GenerationStep)async def stop_on_keyword(event: GenerationStep) -> Finish | None: last = event.messages[-1] if event.messages else None if last and "DONE" in str(getattr(last, "content", "")): return Finish(reason="agent signalled completion") return NoneState and concurrency
Section titled “State and concurrency”Hooks share the runtime’s event loop with everything else. If two hooks (or the same hook on two events) mutate shared state, guard it.
import asynciofrom collections import defaultdictfrom uuid import UUID
from dreadnode.agents.events import AgentEnd, ToolErrorfrom dreadnode.core.hook import hook
_lock = asyncio.Lock()_failures: dict[UUID, list[str]] = defaultdict(list)
@hook(ToolError)async def collect(event: ToolError) -> None: async with _lock: _failures[event.agent_id].append(event.tool_call.name)
@hook(AgentEnd)async def summarize(event: AgentEnd) -> None: async with _lock: names = _failures.pop(event.agent_id, []) if names: print(f"agent {event.agent_id} failed tools: {names}")Capability reload tears the module down — module-level state does not survive. Persist anything that needs to outlive a reload.
Recursion and self-events
Section titled “Recursion and self-events”When a hook spawns work that itself produces events (an internal subagent run, a follow-up turn), the new events flow back through every registered hook — including the one that started them. Use a ContextVar to mark “this is my own work” and short-circuit:
from contextvars import ContextVar
from dreadnode.agents.events import AgentEndfrom dreadnode.core.hook import hook
# ContextVar propagates to asyncio tasks, so spawned work inherits the flag# and the hook short-circuits before doing more spawning._internal: ContextVar[bool] = ContextVar("_internal", default=False)
@hook(AgentEnd)async def maybe_followup(event: AgentEnd) -> None: if _internal.get(): return _internal.set(True) try: await spawn_followup(event) finally: _internal.set(False)The bundled self-improvement capability uses this pattern to avoid recursing on its own reflector subagent.
Reference
Section titled “Reference”The full hook API — Hook, Condition, Scorer, the event types, and the reaction classes — lives at dreadnode.agents.events and dreadnode.core.hook.