Skip to content

Hooks

Session-global middleware that observes and reacts to agent events — gate generations, attach metrics, retry with feedback, finish a turn.

A hook is an async function that fires on a specific agent event. Hooks are middleware: the runtime delivers each AgentEvent to every matching hook before the next step proceeds, and a hook can return a Reaction to steer what happens next — continue, retry with feedback, finish the turn, or fail.

hooks/observer.py
from dreadnode.agents.events import ToolError
from dreadnode.core.hook import hook
@hook(ToolError)
async def log_tool_error(event: ToolError) -> None:
print(f"tool {event.tool_call.name} failed: {event.error}")

The runtime imports hooks/observer.py when the capability loads, registers log_tool_error against ToolError, and calls it for every tool failure on every turn.

Hooks come from Python files declared in the manifest:

hooks:
- hooks/observer.py

If hooks: is omitted, the runtime auto-discovers any *.py in the hooks/ directory. Set hooks: [] to disable entirely.

The loader collects module-level Hook instances — anything produced by the @hook(...) decorator. Functions without the decorator are ignored.

Hooks are session-global middleware. Unlike tools, they are not filtered by per-agent rules — a capability that ships a @hook(GenerationStep) participates in every turn for every agent as long as the capability is loaded.

To disable a hook without removing the file, gate the capability behind a flag:

flags:
observer-enabled:
description: Enable the observer hook.
default: true
hooks:
- hooks/observer.py

Capability-level flags gate the entire capability’s load, which includes its hooks. For finer-grained control, read the flag inside the handler:

import os
@hook(ToolError)
async def log_tool_error(event: ToolError) -> None:
if os.environ.get("CAPABILITY_FLAG__OBSERVER__ENABLED") != "1":
return
...

@hook(event_type, *, when=None, scorers=None) returns a Hook instance. The handler must be async def.

ArgumentPurpose
event_typeAn AgentEvent subclass. The hook only fires for events of this exact type (or a subclass).
whenList of Conditions evaluated in order. The hook body runs only if every condition passes.
scorersList of Scorers run after when passes. Each scorer attaches a metric series to event.metrics.
from dreadnode.agents.events import GenerationStep
from dreadnode.core.hook import hook
@hook(
GenerationStep,
when=[quality.above(0.5)],
scorers=[safety, toxicity],
)
async def gated(event: GenerationStep) -> None:
# event.metrics["quality"], event.metrics["safety"],
# event.metrics["toxicity"] are all populated.
...

when predicates can attach metrics as a side effect (ScoringConditions do this), so the body can read event.metrics[...] without re-scoring. Bare conditions just gate execution.

@hook also works on methods. Use it on a class to share state across handlers:

class Observer:
def __init__(self) -> None:
self.failures: list[str] = []
@hook(ToolError)
async def record(self, event: ToolError) -> None:
self.failures.append(event.tool_call.name)
observer = Observer() # module-level instance — required for the loader to pick up its hooks

Every hook subscribes to one event type. The runtime emits a fixed catalog; the most useful ones for capability authors:

EventWhen it fires
AgentStartNew agent run begins. Useful for seeding per-run state.
AgentEndAgent run finishes (success, fail, or stalled).
AgentStepAny step — generation, tool call, or react. Subclasses below.
GenerationStepModel produced a response (with optional tool calls).
GenerationErrorModel call failed before producing a response.
ToolStepA tool call completed (success or surfaced error).
ToolErrorException escaped a tool — the agent will see a structured error.
HeartbeatPeriodic tick during a long step. Useful for cancellation polling.
CompactionEventThe runtime compacted the conversation to fit the context window.
UserInputRequiredAgent paused awaiting human input via ask_user().

Subscribing to AgentStep covers all step subclasses except ReactStep — reactions trigger their own steps, and the runtime suppresses the cascade so a hook listening to AgentStep doesn’t fire on its own reaction. Use @hook(ReactStep) explicitly when you need that.

The full event surface lives at dreadnode.agents.events.

A hook can return a Reaction to influence the runtime. Returning None (or having no return) is the no-op — the agent proceeds normally.

ReactionEffect
Continue(...)Proceed, optionally injecting messages or feedback for the next generation.
Retry()Retry the current step.
RetryWithFeedbackRetry with a feedback string the model sees on the next attempt.
Finish(reason=...)End the turn cleanly. The reason appears in the trace.
Fail(error=...)End the turn with an error. The error propagates to the caller.
from dreadnode.agents.events import GenerationStep
from dreadnode.agents.reactions import Fail, Finish
from dreadnode.core.hook import hook
@hook(GenerationStep)
async def stop_on_keyword(event: GenerationStep) -> Finish | None:
last = event.messages[-1] if event.messages else None
if last and "DONE" in str(getattr(last, "content", "")):
return Finish(reason="agent signalled completion")
return None

Hooks share the runtime’s event loop with everything else. If two hooks (or the same hook on two events) mutate shared state, guard it.

import asyncio
from collections import defaultdict
from uuid import UUID
from dreadnode.agents.events import AgentEnd, ToolError
from dreadnode.core.hook import hook
_lock = asyncio.Lock()
_failures: dict[UUID, list[str]] = defaultdict(list)
@hook(ToolError)
async def collect(event: ToolError) -> None:
async with _lock:
_failures[event.agent_id].append(event.tool_call.name)
@hook(AgentEnd)
async def summarize(event: AgentEnd) -> None:
async with _lock:
names = _failures.pop(event.agent_id, [])
if names:
print(f"agent {event.agent_id} failed tools: {names}")

Capability reload tears the module down — module-level state does not survive. Persist anything that needs to outlive a reload.

When a hook spawns work that itself produces events (an internal subagent run, a follow-up turn), the new events flow back through every registered hook — including the one that started them. Use a ContextVar to mark “this is my own work” and short-circuit:

from contextvars import ContextVar
from dreadnode.agents.events import AgentEnd
from dreadnode.core.hook import hook
# ContextVar propagates to asyncio tasks, so spawned work inherits the flag
# and the hook short-circuits before doing more spawning.
_internal: ContextVar[bool] = ContextVar("_internal", default=False)
@hook(AgentEnd)
async def maybe_followup(event: AgentEnd) -> None:
if _internal.get():
return
_internal.set(True)
try:
await spawn_followup(event)
finally:
_internal.set(False)

The bundled self-improvement capability uses this pattern to avoid recursing on its own reflector subagent.

The full hook API — Hook, Condition, Scorer, the event types, and the reaction classes — lives at dreadnode.agents.events and dreadnode.core.hook.