Skip to content

Local evaluations

Run dataset-driven evaluations in your own Python process with Evaluation and @dn.evaluation — no sandboxes, no task archives.

import dreadnode as dn
from dreadnode.scorers import contains
@dn.evaluation(
dataset=[
{"question": "What is Dreadnode?"},
{"question": "What does an evaluation produce?"},
],
scorers=[contains("Answer:")],
assert_scores=["contains"],
concurrency=4,
)
async def answer(question: str) -> str:
return f"Answer: {question}"
result = await answer.run()
print(result.pass_rate, len(result.samples))

Local evaluations execute a task function over a dataset, stream events, and return an EvalResult. They run in your own Python process — no sandboxes, no published tasks, no task archive uploads.

Reach for local evaluations when you’re iterating on prompts, scorers, or agent logic during development. For production-grade benchmarks with provisioned task environments and deterministic verification, see hosted evaluations.

  • Evaluation — orchestrates execution of a task against a dataset
  • @dn.evaluation — wraps a task function into an Evaluation
  • EvalEventEvalStart, EvalSample, and EvalEnd stream progress
  • Sample — per-row input, output, metrics, and errors
  • EvalResult — aggregate metrics, pass/fail stats, stop reason

The decorator above is the shortest path when the task already exists as a Python function and the dataset is small enough to define inline.

Use the Evaluation(...) constructor when you want file-backed datasets, preprocessing, or a task you’re passing around separately. dataset_file accepts JSONL, CSV, JSON, or YAML. Use preprocessor to normalize rows before scoring, and dataset_input_mapping to align dataset keys with task params.

from pathlib import Path
import dreadnode as dn
from dreadnode.evaluations import Evaluation
def normalize(rows: list[dict[str, str]]) -> list[dict[str, str]]:
return [{"prompt": row["prompt"].strip()} for row in rows if row["prompt"].strip()]
evaluation = Evaluation(
task="my_project.tasks.generate_answer",
dataset_file=Path("data/eval.jsonl"),
dataset_input_mapping={"prompt": "question"},
preprocessor=normalize,
concurrency=8,
)
result = await evaluation.run()
  • concurrency — how many samples run in parallel
  • iterations — reruns each dataset row multiple times
  • scorers — reusable metrics attached to each sample
  • assert_scores — turns selected score names into pass/fail gates
  • max_errors and max_consecutive_errors — circuit breakers for unstable tasks

If you already have a Dataset or LocalDataset, convert it to records first:

rows = my_dataset.to_pandas().to_dict(orient="records")
evaluation = Evaluation(task="my_project.tasks.generate_answer", dataset=rows)

EvalResult gives you both a summary and the underlying samples:

print(result.passed_count, result.failed_count, result.pass_rate)
print(result.metrics_summary)
df = result.to_dataframe()
result.to_jsonl("out/eval-results.jsonl")

Each Sample includes the original input, the output, metric series, assertion results, and any execution error.

from dreadnode.evaluations import EvalEnd, EvalSample, EvalStart
async with evaluation.stream() as events:
async for event in events:
if isinstance(event, EvalStart):
print("starting", event.dataset_size)
elif isinstance(event, EvalSample):
print("sample", event.sample_index, event.passed, event.scores)
elif isinstance(event, EvalEnd):
print("done", event.pass_rate, event.stop_reason)

Stream when you want progress reporting, live UI updates, or early-termination logic around a long-running evaluation.