Skip to main content
An Eval provides a systematic framework for testing and benchmarking a dreadnode.task. You can run a task against a dataset across a matrix of different parameters, evaluate each output using one or more scorers, and aggregate the results for analysis. This allows you to move from manual spot-checking to automated, reproducible benchmarking, which is essential for ensuring the quality of your AI systems.

Your First Evaluation

Let’s start with a simple example to see the feature in action. You’ll define a task that summarizes text and then create an Eval to run it against a small dataset. The most direct way to create an Eval is by using the .as_eval() method on an existing task.
import dreadnode as dn

@dn.task
async def summarize(text: str) -> str:
    # In a real-world scenario, this would call an LLM.
    # We'll use a simple truncation to keep the example focused.
    summary = " ".join(text.split()[:5]) + "..."
    return summary

# A dataset is just a list of dictionaries.
# The keys should match the parameter names of your task.
dataset = [
    {"text": "Dreadnode provides a modular framework for evaluating task outputs."},
    {"text": "By decoupling evaluation from execution, you can easily define and compose checks."},
]

# Create the evaluation by attaching the dataset to the task.
basic_eval = summarize.as_eval(
    dataset=dataset,
)

# Run the evaluation and inspect the result.
result = await basic_eval.run()
print(result)
This runs the summarize task once for each item in the dataset. The await basic_eval.run() call returns an EvalResult object, which contains all the Sample data from the run. By default, with no scorers, the pass_rate will be 1.0 as long as no unexpected errors occur.

Defining a Pass/Fail Test

An evaluation is most useful when you define what “success” means. You do this by adding scorers to measure the output and using assert_scores to define your pass/fail criteria. Let’s build on the previous example. You’ll add a scorer to check if the summary is within a specific length range and then assert that this condition must be met.
import dreadnode as dn
from dreadnode import scorers

@dn.task
async def summarize(text: str) -> str:
    summary = " ".join(text.split()[:5]) + "..."
    return summary

dataset = [
    {"text": "Dreadnode provides a modular framework for evaluating task outputs."},
    {"text": "By decoupling evaluation from execution, you can easily define and compose checks."},
    {"text": "This text is too short."}, # This sample will produce a summary that fails the check.
]

# 1. Define a scorer to check the output length.
#    The `name` you provide here is how you'll reference it in assertions.
length_check = scorers.length_in_range(min_length=20, max_length=50, name="valid_length")

# 2. Create the Eval using `.as_eval()` and attach the scorer.
# 3. Use `assert_scores` to turn the scorer into a pass/fail condition.
eval_with_assertion = summarize.as_eval(
    dataset=dataset,
    scorers=[length_check],
    assert_scores=["valid_length"], # This string must match the scorer's `name`.
)

result = await eval_with_assertion.run()

print(f"Pass Rate: {result.pass_rate:.2%}")
print(f"Passed samples: {result.passed_count}")
print(f"Failed samples: {result.failed_count}")
Now, the evaluation actively checks each summary. The third sample produces a summary that is too short, causing the valid_length scorer to return a low value. Because you’ve listed "valid_length" in assert_scores, this sample is marked as “failed,” and the overall pass_rate is no longer 100%.
Running an evaluation on a large dataset can be time-consuming. You can execute samples in parallel by setting the concurrency parameter: my_eval.as_eval(..., concurrency=10).

Benchmarking Models with Scenarios

To compare the performance of different configurations, you can use the parameters argument. An Eval creates a distinct Scenario for every unique combination of the parameters you provide. This is ideal for systematically testing different models, prompts, or settings. Let’s modify the summarization task to accept a model and then use an Eval to benchmark two different summarization strategies.
import dreadnode as dn
from dreadnode import scorers

@dn.task
async def summarize(text: str, model: str) -> str:
    if model == "truncate_short":
        return " ".join(text.split()[:3]) + "..."
    elif model == "truncate_long":
        return " ".join(text.split()[:8]) + "..."
    return "Invalid model specified."

dataset = [
    {"text": "Dreadnode provides a modular framework for evaluating task outputs, solving the problem of embedding messy and repetitive validation logic."},
    {"text": "By decoupling evaluation from execution, you can easily define, compose, and reuse powerful checks for quality, safety, and correctness."},
]

# The `parameters` dict defines the axes of your experiment.
# An Eval will run a full scenario for `model='truncate_short'`
# and another for `model='truncate_long'`.
benchmark_eval = summarize.as_eval(
    dataset=dataset,
    parameters={
        "model": ["truncate_short", "truncate_long"]
    },
    scorers=[scorers.length_in_range(min_length=20, max_length=50, name="valid_length")],
    assert_scores=["valid_length"],
)

result = await benchmark_eval.run()

# The result object now contains results for each scenario.
for scenario in result.scenarios:
    model_name = scenario.params.get("model")
    print(f"--- Results for model: {model_name} ---")
    print(f"Pass Rate: {scenario.pass_rate:.2%}\n")
The EvalResult now contains a ScenarioResult for each model. This allows you to inspect and compare their performance independently. You’ll see that the truncate_short model fails the length check, while the truncate_long model passes, giving you a clear, data-driven reason to prefer one over the other.

Scoring Against Ground Truth

The most common use case for an Eval is to check the correctness of a task’s output against a known, “ground truth” answer. To do this, you first add the expected answer to your dataset. Then, you use dn.DatasetField within your scorer to reference it. The dn.DatasetField object is a special marker that tells the scorer: “Instead of a fixed value, get the value from this column in the dataset for the specific sample you’re currently evaluating.” Here’s how you can test if the summarize task produces an output that is semantically similar to a reference summary.
import dreadnode as dn
from dreadnode import scorers

@dn.task
async def summarize(text: str) -> str:
    # This task will now produce a more realistic, generative-style summary.
    return "Modular evaluation is possible by decoupling execution from validation."

# Add the "ground_truth_summary" column to your dataset.
dataset = [
    {
        "text": "By decoupling evaluation from execution, you can easily define, compose, and reuse powerful checks for quality, safety, and correctness across your entire application.",
        "ground_truth_summary": "Decoupling evaluation from execution enables reusable quality checks."
    },
]

# 1. The scorer references the `ground_truth_summary` column from the dataset.
# 2. We use a threshold to define "passing" as a similarity score greater than 0.7.
# 3. Finally, we give the composed scorer a clear name for use in assertions.
similarity_check = (
    scorers.similarity_with_sentence_transformers(
        reference=dn.DatasetField("ground_truth_summary")
    ) > 0.7
) >> "is_correct"


correctness_eval = summarize.as_eval(
    dataset=dataset,
    scorers=[similarity_check],
    assert_scores=["is_correct"],
)

result = await correctness_eval.run()

print(f"Pass Rate: {result.pass_rate:.2%}")
In this example, for each row in the dataset, the similarity_check scorer dynamically compares the task’s output against the value in that row’s ground_truth_summary column. This pattern is the foundation for building robust regression tests and quality assurance suites for your AI systems.

Analyzing and Exporting Results

Running an evaluation produces an EvalResult object, which contains the complete data from every sample, scenario, and iteration. While you can inspect its properties directly for a high-level summary, the most effective way to analyze your results is to convert them to a pandas DataFrame. The .to_dataframe() method flattens the entire result set into a table, making it easy to filter, sort, and inspect your data. Each row represents a single Sample, and columns are automatically created for parameters, inputs, outputs, and the average value of each metric. Let’s run an evaluation and then use its DataFrame to find exactly which samples failed and why.
import dreadnode as dn
from dreadnode import scorers
import pandas as pd

# To display all columns in the DataFrame
pd.set_option('display.max_columns', None)

@dn.task
async def get_capital(country: str) -> str:
    capitals = {"France": "Paris", "Japan": "Tokyo"}
    return capitals.get(country, "I don't know.")

dataset = [
    {"country": "France", "expected_capital": "Paris"},
    {"country": "Japan", "expected_capital": "Tokyo"},
    {"country": "Germany", "expected_capital": "Berlin"}, # This will fail
]

# A scorer that checks for the correct answer from the dataset.
correctness_check = scorers.equals(dn.DatasetField("expected_capital")) >> "is_correct"

capital_eval = get_capital.as_eval(
    dataset=dataset,
    dataset_input_mapping=["country"], # Map 'country' column to task's 'country' param
    scorers=[correctness_check],
    assert_scores=["is_correct"],
)

# 1. Run the evaluation to get the result object.
result = await capital_eval.run()

# 2. Convert the result to a DataFrame for analysis.
results_df = result.to_dataframe()

# 3. Filter the DataFrame to find only the failed samples.
failed_samples_df = results_df[results_df["passed"] == False]

print("--- Failed Samples ---")
print(failed_samples_df[["input_country", "output", "metric_is_correct"]])
The output clearly shows the sample for “Germany” failed because the output was “I don’t know.”, which did not match the expected_capital from the dataset. This workflow—run, convert to DataFrame, filter for failures—is a highly effective pattern for debugging your tasks and understanding their failure modes.
The EvalResult object contains nested data for scenarios and iterations. For most analyses, calling .to_dataframe() is the recommended approach as it provides a simple, flat view of every sample, which is ideal for tools like pandas.

Live Monitoring in the Console

For long-running evaluations, you may want to see progress in real-time rather than waiting for the final result. You can do this by calling .console() instead of .run(). This will render a live dashboard in your terminal that updates as each sample completes.
# This uses the same `capital_eval` object from the previous example.

# Instead of .run(), call .console() to see live progress.
# It still returns the same EvalResult object when finished.
result = await capital_eval.console()
When you run this code, you will see a TUI that includes:
  • Progress Bars: Overall progress and progress for the current scenario.
  • Event Log: A timestamped log of key events, such as a sample failing.
  • Summary Statistics: A live-updating summary of the pass/fail rate.
Using .console() is perfect for interactive development and for monitoring large benchmark runs.

Advanced Patterns

Once you are comfortable with the basics, you can use these advanced features to build more resilient and sophisticated evaluation pipelines.

Loading Datasets from Files

For larger evaluations, defining your dataset in-memory isn’t practical. You can load a dataset directly from a file by providing a string or pathlib.Path object. Supported formats include .jsonl, .csv, .json, and .yaml. Let’s assume you have a file named dataset.jsonl with the following content:
dataset.jsonl
{"country": "France", "expected_capital": "Paris"}
{"country": "Japan", "expected_capital": "Tokyo"}
You can then reference this file directly in your Eval.
import dreadnode as dn
from dreadnode import scorers

@dn.task
async def get_capital(country: str) -> str:
    capitals = {"France": "Paris", "Japan": "Tokyo"}
    return capitals.get(country, "I don't know.")

correctness_check = scorers.equals(dn.DatasetField("expected_capital")) >> "is_correct"

# Simply pass the file path to the `dataset` argument.
file_based_eval = get_capital.as_eval(
    dataset="dataset.jsonl",
    dataset_input_mapping=["country"],
    scorers=[correctness_check],
    assert_scores=["is_correct"],
)

result = await file_based_eval.run()
print(f"Pass Rate: {result.pass_rate:.2%}")

Customizing Input Mapping

The system can automatically map dataset columns to task parameters if their names match. However, if your dataset columns have different names than your task’s parameters, you must provide an explicit mapping using dataset_input_mapping. Here’s how you would map a dataset with a location column to the task’s country parameter.
import dreadnode as dn
from dreadnode import scorers

@dn.task
async def get_capital(country: str) -> str: # Task expects `country`
    capitals = {"France": "Paris"}
    return capitals.get(country, "I don't know.")

# Dataset uses `location` instead of `country`.
dataset_with_mismatched_keys = [
    {"location": "France", "expected_capital": "Paris"}
]

# Use a dict to map `dataset_key: task_parameter_name`.
mapping_eval = get_capital.as_eval(
    dataset=dataset_with_mismatched_keys,
    dataset_input_mapping={"location": "country"},
    scorers=[scorers.equals(dn.DatasetField("expected_capital")) >> "is_correct"],
)

result = await mapping_eval.run()
print(result.samples[0].input)

Building Resilient Evaluations

When working with large datasets or non-deterministic tasks, some samples may fail due to transient issues or bad data. You can configure your Eval to tolerate a certain number of failures without stopping the entire run.
  • max_errors: The total number of sample errors to tolerate before stopping.
  • max_consecutive_errors: The number of consecutive sample errors to tolerate before stopping.
@dn.task
async def flaky_task(value: int) -> int:
    if value == 2:
        raise ValueError("This value causes an error!")
    return value * 2

dataset = [{"value": 1}, {"value": 2}, {"value": 3}, {"value": 4}]

# This evaluation will stop after the first error.
# result = await flaky_task.as_eval(dataset=dataset).run() # This would raise an error and stop.

# This evaluation will tolerate up to 5 total errors and continue running.
resilient_eval = flaky_task.as_eval(
    dataset=dataset,
    max_errors=5,
)

result = await resilient_eval.run()
print(f"Total samples processed: {len(result.samples)}")
print(f"Samples with errors: {len([s for s in result.samples if s.error])}")

Programmatic Event Streaming

The .console() method is a convenient wrapper around a lower-level event stream. If you need to build custom logic or UIs based on evaluation events, you can consume this stream directly using async with eval.stream(). This is useful for advanced cases like sending real-time alerts or implementing custom early-stopping logic.
# Uses the `capital_eval` from a previous example.
async with capital_eval.stream() as stream:
    async for event in stream:
        if isinstance(event, dn.eval.SampleComplete):
            if event.sample.failed:
                print(f"Detected a failure on sample {event.sample.index}!")
        elif isinstance(event, dn.eval.EvalEnd):
            print(f"Evaluation finished with stop reason: {event.stop_reason}")