Skip to main content
Hooks are functions that run at key points in the agent lifecycle, allowing you to observe execution for logging and metrics, or intervene by returning reactions that modify agent behavior. Why use hooks:
  • Observe Execution - Track steps, tool calls, token usage, and errors for logging, metrics, or debugging
  • Intervene Dynamically - Inject feedback, retry failed steps, or terminate runs based on runtime conditions
  • Handle Failures - Automatically retry on rate limits, errors, or stalled execution with exponential backoff
When to use hooks: Use hooks to add observability, handle errors gracefully, or enforce runtime constraints without modifying agent logic. This guide covers hook events, observational vs interventional hooks, reactions, built-in hooks, and custom hook patterns.
import dreadnode as dn
from dreadnode.agent.events import ToolStart

async def log_tool_calls(event: ToolStart):
    print(f"Calling tool: {event.tool_call.name}")

agent = dn.Agent(
    name="example",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    hooks=[log_tool_calls],
)

# Hook will print each tool call during execution
result = await agent.run("List all Python files in the current directory")

Events

Hooks receive events at different points in the agent lifecycle. Events fire in order: AgentStart → (StepStartGenerationEndToolStartToolEnd)* → AgentEnd.
EventWhen it fires
AgentStartRun begins.
StepStartNew step starts. Contains step number.
GenerationEndLLM generation completes. Contains message and usage.
ToolStartTool execution begins. Contains tool_call.
ToolEndTool execution completes. Contains tool_call, message, stop.
AgentStalledNo tool calls and no stop conditions met.
AgentErrorAn error occurred. Contains error.
ReactedA hook returned a reaction. Contains hook_name, reaction.
AgentEndRun completes. Contains stop_reason, result.
from dreadnode.agent.events import (
    AgentStart, StepStart, GenerationEnd,
    ToolStart, ToolEnd, AgentStalled,
    AgentError, Reacted, AgentEnd,
)

Observational vs Interventional

Hooks that return None are observational—they don’t affect execution:
async def track_tokens(event: GenerationEnd):
    if event.usage:
        print(f"Step {event.step}: {event.usage.total_tokens} tokens")
Hooks that return a Reaction intervene in execution:
from dreadnode.agent.reactions import Fail

async def block_dangerous_tools(event: ToolStart) -> Fail | None:
    if event.tool_call.name in ["delete_file", "rm_rf"]:
        return Fail(f"Blocked: {event.tool_call.name}")
    return None

Reactions

ReactionEffect
Continue(messages)Continue with modified message list.
Retry(messages)Restart the current step with optional new messages.
RetryWithFeedback(feedback)Restart with a feedback message injected.
Fail(error)Terminate the run with an error.
Finish(reason)Successfully terminate the run.
When multiple hooks return reactions, priority determines the winner: Finish > Fail > Retry/RetryWithFeedback > Continue.

Built-in Hooks

retry_with_feedback

Injects feedback and retries when a specific event occurs. Essential for handling stalled agents:
from dreadnode.agent.events import AgentStalled
from dreadnode.agent.hooks import retry_with_feedback

agent = dn.Agent(
    ...,
    hooks=[
        retry_with_feedback(
            AgentStalled,
            "Continue working. Use finish_task when done."
        )
    ],
)
You can also pass a callable for custom matching:
def mentions_error(event):
    return isinstance(event, GenerationEnd) and "error" in event.message.content.lower()

hook = retry_with_feedback(mentions_error, "Please address the error.")

backoff_on_ratelimit

Automatically retries with exponential backoff on rate limit errors. Use this in any production agent:
from dreadnode.agent.hooks import backoff_on_ratelimit

agent = dn.Agent(
    ...,
    hooks=[backoff_on_ratelimit()],  # Handles RateLimitError, APIError
)
For specific exceptions, use backoff_on_error:
from dreadnode.agent.hooks import backoff_on_error

hook = backoff_on_error(
    (ConnectionError, TimeoutError),
    max_tries=5,
    max_time=120.0,
)

summarize_when_long

Manages context windows by summarizing conversation history when it grows too large:
from dreadnode.agent.hooks import summarize_when_long

agent = dn.Agent(
    ...,
    hooks=[
        summarize_when_long(
            max_tokens=100_000,       # Summarize proactively above this
            min_messages_to_keep=5,   # Always keep recent context
        )
    ],
)
This hook works in two modes:
  • Proactive: Summarizes before each step if tokens exceed threshold
  • Reactive: If a context length error occurs, summarizes and retries

tool_metrics

Logs tool usage metrics to the platform:
from dreadnode.agent.hooks import tool_metrics

agent = dn.Agent(
    ...,
    hooks=[tool_metrics(detailed=True)],
)
Logs: tool/total_count, tool/total_time, tool/success_rate, and per-tool metrics when detailed=True.

Event History

Each event provides the full run history via event.events. Use this for context-aware logic:
async def detect_loops(event: ToolStart) -> Fail | None:
    # Count identical calls
    previous = event.get_events_by_type(ToolStart)
    identical = [
        e for e in previous
        if e.tool_call.name == event.tool_call.name
        and e.tool_call.function.arguments == event.tool_call.function.arguments
    ]

    if len(identical) > 2:
        return Fail(f"Stuck in loop calling {event.tool_call.name}")
    return None
Helper methods:
  • event.get_events_by_type(EventType) - All events of a type
  • event.get_latest_event_by_type(EventType) - Most recent event of a type

Custom Hooks

Hooks are async functions that receive an event and optionally return a reaction:
from dreadnode.agent.events import AgentEvent, GenerationEnd
from dreadnode.agent.reactions import Finish

async def stop_on_success(event: AgentEvent) -> Finish | None:
    if isinstance(event, GenerationEnd):
        if "task completed" in event.message.content.lower():
            return Finish(reason="Task completed")
    return None
For stateful hooks, use a closure:
def max_tool_calls(limit: int):
    """Stop after a maximum number of tool calls."""
    count = 0

    async def hook(event: AgentEvent) -> Finish | None:
        nonlocal count
        if isinstance(event, ToolEnd):
            count += 1
            if count >= limit:
                return Finish(f"Reached {limit} tool calls")
        return None

    return hook

# Use the custom hook
agent = dn.Agent(
    name="limited",
    model="gpt-4o-mini",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    hooks=[max_tool_calls(50)],
)
See the SDK reference for complete hook and event documentation.