Skip to content

Local search

Drive `optimize_anything` and `DreadnodeAgentAdapter` from the SDK for in-process prompt and agent optimization.

optimize_anything is the SDK surface for running a GEPA-backed search in your own Python code. Reach for it when what you’re optimizing isn’t a published capability — a prompt you’re still iterating on, an agent wired up in a notebook, a scorer you rewrite between runs.

import asyncio
import dreadnode as dn
from dreadnode.optimization import EngineConfig, OptimizationConfig
def score(candidate: str, example: dict[str, str]) -> float:
return 1.0 if example["expected"] in candidate else 0.0
async def main() -> None:
optimization = dn.optimize_anything(
seed_candidate="Answer the question directly.",
evaluator=score,
dataset=[
{"question": "What is Dreadnode?", "expected": "Dreadnode"},
{"question": "What is GEPA?", "expected": "GEPA"},
],
valset=[
{"question": "Name the SDK.", "expected": "Dreadnode"},
],
objective="Improve a short answer prompt for factual responses.",
config=OptimizationConfig(engine=EngineConfig(max_metric_calls=50)),
)
result = await optimization.run()
print(result.best_score, result.best_candidate)
asyncio.run(main())

When you pass seed_candidate + evaluator, the evaluator takes the candidate as its first argument and the dataset row as the second. The returned float is the score the optimizer maximizes. The adapter path replaces this contract — see agent instruction optimization below.

DriverBest fit
seed_candidate + evaluatorYou’re optimizing a plain string (prompt, template) with a pure function.
adapter=DreadnodeAgentAdapterThe candidate is an agent’s instructions, scored through the evaluation stack.
adapter=CapabilityEnvAdapterThe candidate is a capability and scoring needs a live task sandbox (CTF flag, service state, files).
Study + Sampler (custom loop)You need full control over the search — see custom search loops.

DreadnodeAgentAdapter turns an agent into a candidate. Each trial produces a new instruction block, which the adapter clones onto the agent and evaluates through a standard Evaluation against the dataset and scorers.

import asyncio
import dreadnode as dn
from dreadnode.optimization import DreadnodeAgentAdapter
async def main() -> None:
agent = dn.Agent(
name="support-agent",
model="openai/gpt-4o-mini",
instructions="Answer support questions clearly.",
)
adapter = DreadnodeAgentAdapter(
agent=agent,
dataset=[
{"goal": "Explain password reset flow"},
{"goal": "Describe billing cycle"},
],
scorers=[dn.scorers.contains("step-by-step")],
goal_field="goal",
)
optimization = dn.optimize_anything(
adapter=adapter,
objective="Improve agent instructions for support quality.",
)
result = await optimization.run()
print(result.best_candidate)
asyncio.run(main())

Use the adapter when the candidate is structured (an agent, a capability, a multi-field configuration) and scoring has to run through the evaluation pipeline, not a standalone function. dn capability improve uses the same adapter under the hood, so when you’re iterating on a local capability directory, reach for capability improvement instead of wiring this up by hand.

CapabilityEnvAdapter is the env-scoring sibling of DreadnodeAgentAdapter. Each trial provisions a fresh task environment, runs the candidate capability’s agent against it, and calls your scorers while the sandbox is still alive — so a scorer can shell into the env through the current_task_environment contextvar to read a flag file, check a service, or grep the filesystem.

import re
import dreadnode as dn
from dreadnode.capabilities.capability import Capability
from dreadnode.core.environment import current_task_environment
from dreadnode.core.metric import Metric
from dreadnode.core.scorer import scorer
from dreadnode.optimization import CapabilityEnvAdapter, optimize_anything
from dreadnode.optimization.config import EngineConfig, OptimizationConfig
dn.configure()
FLAG = re.compile(r"FLAG\{[^}]+\}")
@scorer(name="flag")
async def flag_scorer(agent_output: str) -> Metric:
if FLAG.search(str(agent_output)):
return Metric(value=1.0)
env = current_task_environment.get()
if env is not None:
_code, out = await env.execute(
"cat /flag* 2>/dev/null; grep -rh 'FLAG{' / 2>/dev/null | head -1",
timeout_sec=15,
)
if FLAG.search(out):
return Metric(value=1.0)
return Metric(value=0.0)
adapter = CapabilityEnvAdapter(
capability=Capability("dreadnode/web-security", storage=dn.storage),
model="anthropic/claude-sonnet-4-6",
agent_name="web-security",
task_ref="xbow/xben-071-24",
timeout_sec=1800,
dataset=[{"goal": "capture the flag"}],
scorers=[flag_scorer],
score_name="flag",
parallel_rows=1,
)
optimization = optimize_anything(
adapter=adapter,
trainset=adapter.dataset,
config=OptimizationConfig(engine=EngineConfig(max_metric_calls=3)),
objective="Maximise flag-capture on the target task.",
)
result = await optimization.console()

Dataset rows take a goal (the agent prompt fallback) and optionally override task_ref or pass inputs to the environment template. parallel_rows on the adapter fans rows across concurrent sandboxes inside one candidate evaluation; concurrency on optimize_anything runs candidates in parallel. Peak concurrent sandboxes is concurrency × parallel_rows.

The full walkthrough — scorer patterns, train/val split, scaling the fan-out, and moving hosted — lives in the task-environment optimization guide.

A completed run isn’t a shippable candidate on its own. Read the result before deciding:

  • result.best_candidate — the winning prompt or instruction block.
  • result.best_score — the best score observed during search.
  • result.best_scores — per-metric view when the evaluator emits more than one metric.
  • result.history — the trial records the backend collected. For GEPA this is every evaluated trial, which tells you whether the run plateaued early or was still finding new bests when the budget ran out.
  • Validation behavior — if you passed valset, check whether the win held. Training-only wins are usually overfitting.