Skip to main content
Long-running agents face context window limits as conversation history grows. The SDK provides automatic summarization, session persistence, and execution controls to manage extended agent runs without hitting token limits or losing conversation state. Why handle long conversations:
  • Context Management - Automatically summarize history when approaching token limits while preserving recent messages
  • Session Persistence - Continue multi-turn conversations across runs using threads without losing context
  • Execution Control - Limit steps, tokens, cost, or time to prevent runaway executions
When to use these techniques: Use summarization for tasks requiring many tool calls or large outputs. Use session persistence for multi-stage workflows. Use TaskAgent for goal-oriented work requiring explicit completion. This guide covers automatic summarization, execution limits, session persistence, handling stalls, and production best practices.

Context Window Management

Automatic Summarization

The summarize_when_long hook automatically compresses conversation history when it grows too large:
import dreadnode as dn
from dreadnode.agent.hooks import summarize_when_long

agent = dn.Agent(
    name="long-runner",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    hooks=[
        summarize_when_long(
            max_tokens=80_000,        # Summarize when context exceeds this
            min_messages_to_keep=5,   # Always keep last 5 messages
        ),
    ],
)
This hook operates in two modes:
  1. Proactive: Before each step, checks if the last generation exceeded max_tokens. If so, summarizes older messages before continuing.
  2. Reactive: If a context length error occurs, summarizes the history and retries the step.

Summarization Options

summarize_when_long(
    model="gpt-4o-mini",       # Use a cheaper model for summarization
    max_tokens=100_000,        # Token threshold
    min_messages_to_keep=5,    # Preserve recent context
    guidance="Focus on technical findings and code locations.",  # Guide the summary
)
The guidance parameter helps the summarizer preserve information relevant to your task.

Limiting Agent Execution

Use max_steps to limit think-act cycles, or stop conditions for finer control:
import dreadnode as dn
from dreadnode.agent.stop import generation_count, token_usage, elapsed_time

agent = dn.Agent(
    name="long-runner",
    model="gpt-4o-mini",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    max_steps=50,
    stop_conditions=[
        generation_count(30),          # LLM inference calls
        token_usage(100_000),          # Total tokens
        elapsed_time(max_seconds=600), # Wall-clock time
    ],
)
For goal-oriented tasks, use TaskAgent which continues until the agent explicitly calls finish_task or give_up_on_task:
import dreadnode as dn
from dreadnode.agent import TaskAgent

agent = TaskAgent(
    name="thorough-analyzer",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    max_steps=100,
)
See Stop Conditions for the complete reference.

Session Persistence

Continuing Conversations

Use explicit threads to continue conversations across runs:
import dreadnode as dn
from dreadnode.agent import Thread

agent = dn.Agent(
    name="assistant",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)

# Create a thread for the conversation
thread = Thread()

# First run
await agent.run("Analyze the authentication module.", thread=thread)

# Continue the same conversation in a second run
await agent.run("Now check for SQL injection in the same code.", thread=thread)

# The thread maintains the full conversation history
print(f"Total messages in thread: {len(thread.messages)}")
Note: Thread serialization with model_dump_json() may fail due to circular references. For session persistence, consider storing conversation history separately or using database-backed state management.

Breaking Work Into Sessions

For very long tasks, break work into resumable sessions:
import dreadnode as dn
from dreadnode.agent import Thread, TaskAgent
from dreadnode.agent.stop import generation_count

agent = TaskAgent(
    name="researcher",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    stop_conditions=[generation_count(20)],  # Work in chunks
)

thread = Thread()

# Work in sessions
for session in range(5):
    result = await agent.run(
        "Continue analyzing the codebase. Use finish_task when done.",
        thread=thread,
    )

    print(f"Session {session + 1}: {result.stop_reason}")

    if result.stop_reason == "finished":
        print("Task completed!")
        break

    # Check if actually stuck vs just hit generation limit
    if result.stop_reason == "generation_count":
        continue  # Keep going in next session

Handling Agent Stalling

When an agent stops calling tools without completing, it “stalls”. Handle this with the retry_with_feedback hook:
import dreadnode as dn
from dreadnode.agent.events import AgentStalled
from dreadnode.agent.hooks import retry_with_feedback
from dreadnode.agent.stop import never

agent = dn.Agent(
    name="continuous-worker",
    model="gpt-4o-mini",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    stop_conditions=[never()],  # Never stop automatically
    hooks=[
        retry_with_feedback(
            AgentStalled,
            "You must continue working. Use tools to make progress, "
            "or call finish_task when the task is complete."
        ),
    ],
)
The never() stop condition ensures that if the agent stops calling tools, it triggers AgentStalled rather than finishing successfully.

TaskAgent for Goal-Oriented Work

TaskAgent is pre-configured for tasks that require explicit completion:
import dreadnode as dn
from dreadnode.agent import TaskAgent

agent = TaskAgent(
    name="investigator",
    model="gpt-4o",
    tools=[
        dn.agent.tools.fs.Filesystem(path=".", variant="read"),
        # TaskAgent automatically adds:
        # - finish_task
        # - give_up_on_task
        # - update_todo
    ],
    max_steps=100,
)

result = await agent.run("Find all hardcoded credentials in the codebase.")

if result.stop_reason == "finished":
    # Agent called finish_task or give_up_on_task
    print("Task completed")
elif result.stop_reason == "max_steps_reached":
    print("Hit step limit without completing")

Combining Strategies

For production workloads, combine multiple strategies:
import dreadnode as dn
from dreadnode.agent import TaskAgent
from dreadnode.agent.hooks import (
    backoff_on_ratelimit,
    summarize_when_long,
)
from dreadnode.agent.stop import (
    estimated_cost,
    elapsed_time,
)

agent = TaskAgent(
    name="production-agent",
    model="gpt-4o",
    tools=[
        dn.agent.tools.fs.Filesystem(path="/project", variant="read"),
    ],
    max_steps=100,
    hooks=[
        # Handle context growth
        summarize_when_long(max_tokens=80_000, min_messages_to_keep=10),
        # Handle API errors
        backoff_on_ratelimit(),
    ],
    stop_conditions=[
        # Cost and time limits as safety nets
        estimated_cost(10.0),
        elapsed_time(max_seconds=1800),  # 30 minutes
    ],
)

Monitoring Long Runs

Track progress during long-running agents:
import dreadnode as dn
from dreadnode.agent.events import StepStart, GenerationEnd

agent = dn.Agent(
    name="monitored-agent",
    model="gpt-4o",
    max_steps=50,
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)

async with agent.stream("Complex task...") as events:
    async for event in events:
        if isinstance(event, StepStart):
            print(f"Step {event.step}...")

        if isinstance(event, GenerationEnd):
            total_tokens = event.total_usage.total_tokens
            cost = event.estimated_cost or 0
            print(f"  Tokens: {total_tokens}, Est. cost: ${cost:.4f}")
For custom progress hooks:
import dreadnode as dn
from dreadnode.agent.events import AgentEvent, StepStart
from dreadnode.agent.reactions import Reaction

def progress_logger():
    """Log progress at each step."""

    async def hook(event: AgentEvent) -> Reaction | None:
        if isinstance(event, StepStart):
            total_tokens = event.total_usage.total_tokens
            print(f"Step {event.step} | Total tokens: {total_tokens}")
        return None

    return hook

agent = dn.Agent(
    name="custom-monitor",
    model="gpt-4o",
    max_steps=20,
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    hooks=[progress_logger()],
)

Best Practices

For Long-Running Analysis Tasks

Use TaskAgent with context management and safety limits:
import dreadnode as dn
from dreadnode.agent import TaskAgent
from dreadnode.agent.hooks import summarize_when_long, backoff_on_ratelimit
from dreadnode.agent.stop import estimated_cost, elapsed_time

agent = TaskAgent(
    name="long-analyzer",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    max_steps=200,
    hooks=[
        summarize_when_long(
            max_tokens=80_000,
            min_messages_to_keep=10,
            model="gpt-4o-mini",  # Cheaper model for summaries
        ),
        backoff_on_ratelimit(),
    ],
    stop_conditions=[
        estimated_cost(5.0),  # Stop at $5
        elapsed_time(max_seconds=3600),  # 1 hour max
    ],
)

For Iterative Workflows

Break work into explicit sessions with thread persistence:
from dreadnode.agent import Thread, TaskAgent
from dreadnode.agent.stop import generation_count

agent = TaskAgent(
    name="iterative-worker",
    model="gpt-4o-mini",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    stop_conditions=[generation_count(15)],  # Resume after 15 LLM calls
)

thread = Thread()
max_sessions = 10

for session in range(max_sessions):
    result = await agent.run(
        "Continue your analysis. Call finish_task when complete.",
        thread=thread,
    )

    if result.stop_reason == "finished":
        print(f"Completed in {session + 1} sessions")
        break

    # Save checkpoint after each session
    print(f"Session {session + 1}: {len(thread.messages)} messages")

Thread Management

DO:
  • Use Thread() objects to maintain conversation history across multiple agent.run() calls
  • Monitor thread size with len(thread.messages)
  • Use summarization hooks to manage context window
DON’T:
  • Try to serialize threads with model_dump_json() (causes circular reference errors)
  • Reuse the same thread across different agents or tasks
  • Ignore context window limits without summarization