Skip to main content
The Agent class is stateless—it’s a reusable orchestrator. All conversation history (messages and events) is stored in a separate Thread object, enabling flexible state management across runs. Why use threads:
  • Persist Conversations - Continue multi-turn conversations across runs by passing the same thread to multiple agent calls
  • Multi-Agent Collaboration - Share context between specialized agents by passing a common thread
  • Fork and Explore - Create independent branches from a thread to test different approaches in parallel
When to use threads: Use explicit threads for multi-turn conversations, multi-agent workflows, or when you need to inspect conversation history. Use the agent’s default internal thread for simple, single-run tasks. This guide covers thread properties, default threads, explicit thread management, forking, usage tracking, and best practices.
import dreadnode as dn
from dreadnode.agent import Thread

# Create a thread to manage state explicitly
thread = Thread()

agent = dn.Agent(
    name="assistant",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)

# Pass thread to preserve state across runs
result1 = await agent.run("What is 2 + 2?", thread=thread)
print(f"First answer: {result1.messages[-1].content}")

result2 = await agent.run("Multiply that by 10.", thread=thread)  # Has context of previous answer
print(f"Second answer: {result2.messages[-1].content}")

# View all messages in the thread
print(f"\nTotal messages in thread: {len(thread.messages)}")
for i, msg in enumerate(thread.messages):
    role = msg.role
    if hasattr(msg, 'content') and isinstance(msg.content, str):
        content = msg.content[:100]
    else:
        content = "[non-text content]"
    print(f"{i+1}. {role}: {content}")

Thread Properties

PropertyTypeDescription
messageslist[Message]All messages in the conversation.
eventslist[AgentEvent]All events that occurred during runs.
total_usageUsageAggregated token usage across all generations.
last_usageUsage | NoneToken usage from the most recent generation.

Default Thread

For convenience, every agent has an internal thread. When you call run() or stream() without specifying a thread, the agent uses this internal one:
agent = dn.Agent(
    name="summarizer",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)

print(len(agent.thread.messages))  # 0

await agent.run("Summarize the README file.")

print(len(agent.thread.messages))  # 5 (system, user, assistant with tool calls, tool result, assistant response)
Reset the internal thread with agent.reset():
# Returns the old thread and creates a fresh one
previous_thread = agent.reset()

print(len(previous_thread.messages))  # 5
print(len(agent.thread.messages))     # 0

Explicit Thread Management

Pass a Thread to run() or stream() for explicit control over conversation state.

Persisting Conversations

Resume conversations by reusing a thread:
from dreadnode.agent import Thread

agent = dn.Agent(
    name="assistant",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)
thread = Thread()

# First interaction
await agent.run("What files are in this directory?", thread=thread)

# Later - agent has no memory, but thread preserves context
await agent.run("Which of those files is largest?", thread=thread)

Multi-Agent Collaboration

Share a thread between specialized agents:
from dreadnode.agent import Thread

# Specialized agents
finder = dn.Agent(
    name="finder",
    model="gpt-4o",
    instructions="You find files and information.",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
)

writer = dn.Agent(
    name="writer",
    model="gpt-4o",
    instructions="You write and modify files.",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="write")],
)

# Shared context
shared_thread = Thread()

# First agent finds context
await finder.run("Find the main Python file.", thread=shared_thread)

# Second agent uses that context
await writer.run("Add a docstring to the file you found.", thread=shared_thread)

Thread Forking

Create an independent copy of a thread with fork():
from dreadnode.agent import Thread

thread = Thread()

await agent.run("Analyze the codebase.", thread=thread)

# Create a branch to explore an alternative path
branch = thread.fork()

# Original thread continues one direction
await agent.run("Focus on security issues.", thread=thread)

# Forked thread explores another
await agent.run("Focus on performance issues.", thread=branch)
Forking creates a deep copy - changes to one thread don’t affect the other.

Parallel Exploration

Fork threads to explore multiple approaches concurrently:
import asyncio

async def explore_approach(agent, thread, focus):
    branch = thread.fork()
    return await agent.run(f"Investigate {focus}", thread=branch)

# Run multiple explorations in parallel
results = await asyncio.gather(
    explore_approach(agent, thread, "the authentication system"),
    explore_approach(agent, thread, "the database layer"),
    explore_approach(agent, thread, "the API endpoints"),
)

Accessing Thread State After a Run

After a run completes, access the thread’s state directly and use the result for run-specific metrics:
from dreadnode.agent import Thread
from dreadnode.agent.events import ToolEnd

thread = Thread()
result = await agent.run("Analyze the code.", thread=thread)

# Run-specific metrics from result
print(f"Run took {result.steps} steps")
print(f"Run tokens: {result.usage.total_tokens}")

# Thread state after the run
print(f"Total tokens across all runs: {thread.total_usage.total_tokens}")
print(f"Total messages in thread: {len(thread.messages)}")

# Analyze events from the thread
tool_events = [e for e in thread.events if isinstance(e, ToolEnd)]
print(f"Total tool calls in thread: {len(tool_events)}")
The thread maintains cumulative state across multiple runs, while each result contains metrics for that specific run.

Usage Tracking

Track token usage across runs:
from dreadnode.agent import Thread

thread = Thread()

await agent.run("First task", thread=thread)
print(f"After run 1: {thread.total_usage.total_tokens} tokens")

await agent.run("Second task", thread=thread)
print(f"After run 2: {thread.total_usage.total_tokens} tokens")

# Most recent generation's usage
if thread.last_usage:
    print(f"Last generation: {thread.last_usage.total_tokens} tokens")

Best Practices

When to Use Explicit Threads

Use the default thread (agent.thread) when:
  • Single agent, single task execution
  • No need to preserve state between separate operations
  • Quick prototyping or simple scripts
Use explicit threads when:
  • Continuing conversations across multiple runs
  • Multiple agents need to share context
  • You need to inspect or fork conversation state
  • Building production applications with session management
from dreadnode.agent import Thread

# Default thread - simple case
agent = dn.Agent(
    name="helper",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")]
)
await agent.run("Quick task")

# Explicit thread - production case
thread = Thread()
result1 = await agent.run("Start analysis", thread=thread)
# Later in your application...
result2 = await agent.run("Continue from where you left off", thread=thread)

Thread Lifecycle Management

Create a new thread for each independent conversation or session:
from dreadnode.agent import Thread

# Session-based threads
user_sessions = {}

def get_user_thread(user_id: str) -> Thread:
    if user_id not in user_sessions:
        user_sessions[user_id] = Thread()
    return user_sessions[user_id]

# Each user gets their own conversation
thread = get_user_thread("user123")
await agent.run("User message", thread=thread)
Reset threads when starting fresh:
# Clear thread for new conversation
thread = Thread()  # Create fresh thread
# Or reset agent's internal thread
agent.reset()

Threads with Streaming

Threads work seamlessly with streaming - events are added to the thread as they occur:
from dreadnode.agent import Thread
from dreadnode.agent.events import StepStart, GenerationEnd

thread = Thread()

async with agent.stream("Complex task", thread=thread) as events:
    async for event in events:
        if isinstance(event, StepStart):
            print(f"Step {event.step}...")
        elif isinstance(event, GenerationEnd):
            print(f"Tokens so far: {thread.total_usage.total_tokens}")

# After streaming completes, thread has full history
print(f"Final message count: {len(thread.messages)}")

Error Handling

When a run fails, the thread retains all messages and events up to the failure:
from dreadnode.agent import Thread

thread = Thread()

try:
    result = await agent.run("Risky operation", thread=thread)
except Exception as e:
    # Thread still has partial conversation
    print(f"Failed after {len(thread.messages)} messages")
    print(f"Last message: {thread.messages[-1].content}")

    # Can retry with same thread to preserve context
    result = await agent.run("Try a safer approach", thread=thread)

Memory and Context Window

Threads grow unbounded. For long-running conversations, use the summarize_when_long hook:
from dreadnode.agent.hooks import summarize_when_long
from dreadnode.agent import Thread

agent = dn.Agent(
    name="long-runner",
    model="gpt-4o",
    tools=[dn.agent.tools.fs.Filesystem(path=".", variant="read")],
    hooks=[
        summarize_when_long(
            max_tokens=80_000,      # Trigger summarization above this
            min_messages_to_keep=5, # Keep recent messages
        )
    ],
)

thread = Thread()

# Simulate multiple interactions
await agent.run("Analyze the main.py file", thread=thread)
await agent.run("Now check utils.py", thread=thread)
await agent.run("Compare the two files", thread=thread)

# Thread is automatically summarized if it exceeds max_tokens
print(f"Messages: {len(thread.messages)}, Tokens: {thread.total_usage.total_tokens}")
The hook works in two modes:
  • Proactive: Summarizes before each step if tokens exceed threshold
  • Reactive: If a context length error occurs, summarizes and retries
See Handle Long Conversations for context window management strategies.

Common Pitfalls

Do not share threads across unrelated tasks:
# Bad - thread mixes different contexts
thread = Thread()
await agent.run("Analyze auth.py", thread=thread)
await agent.run("Write a poem", thread=thread)  # Confused context

# Good - separate threads for separate tasks
thread1 = Thread()
await agent.run("Analyze auth.py", thread=thread1)

thread2 = Thread()
await agent.run("Write a poem", thread=thread2)
Don’t try to serialize threads with model_dump_json():
# Bad - causes circular reference errors
thread = Thread()
json_str = thread.model_dump_json()  # Fails!

# Good - store conversation data separately if needed
messages_data = [{"role": m.role, "content": m.content} for m in thread.messages]
DO check thread state before assumptions:
# Good - defensive checks
if len(thread.messages) > 0:
    last_message = thread.messages[-1].content

if thread.last_usage:
    tokens = thread.last_usage.total_tokens