Skip to content

dreadnode

Top-level Python API for the Dreadnode SDK.

TraceBackend = Literal['local', 'remote']

Controls remote OTLP streaming.

  • "local" — local JSONL only. No OTLP streaming.
  • "remote" — local JSONL and OTLP streaming.
  • None (default) — Auto-detect: stream if credentials exist.

Local JSONL is always populated regardless of this setting.

Audio(
data: AudioDataType,
sample_rate: int | None = None,
caption: str | None = None,
format: str | None = None,
)

Audio media type for Dreadnode logging.

Supports:

  • Local file paths (str or Path)
  • Numpy arrays with sample rate
  • Raw bytes

Initialize an Audio object.

Parameters:

  • data (AudioDataType) –The audio data, which can be:
    • A path to a local audio file (str or Path)
    • A numpy array (requires sample_rate)
    • Raw bytes
  • sample_rate (int | None, default: None ) –Required when using numpy arrays
  • caption (str | None, default: None ) –Optional caption for the audio
  • format (str | None, default: None ) –Optional format to use (default is wav for numpy arrays)
to_serializable() -> tuple[t.Any, dict[str, t.Any]]

Serialize the audio data to bytes and return with metadata. Returns: A tuple of (audio_bytes, metadata_dict)

Code(text: str, language: str = '')

Hint type for code-formatted text.

This is a subclass of Text with format set to “code”.

Example

log_output("code_snippet", Code("print('Hello, World!')", language="python"))
CurrentRun(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the current task span from the current context (backwards compat alias).

CurrentTask(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the current task span from the current context.

CurrentTrial(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the current trial during an optimization study.

Dataset(
name: str,
storage: Storage | None = None,
version: str | None = None,
)

Published dataset loader backed by local storage manifests.

DatasetField(
name: str,
*,
default: Any | Unset = UNSET,
required: bool = True,
)

A Context marker for a value from the full dataset sample row for the current evaluation task.

Dreadnode()

The core Dreadnode SDK class.

A default instance is created and can be used directly with dreadnode.*. Otherwise, create your own instance with Dreadnode().configure().

can_sync: bool

Whether remote sync is possible (has credentials).

session: Profile

Deprecated alias for :attr:profile.

build_package(path: str | Path) -> BuildResult

Build a local repository into an OCI image.

Parameters:

  • path (str | Path) –Path to a dataset, model, or environment package project.

Returns:

  • BuildResult –BuildResult with success status and OCI image.
change_workspace(workspace: str | UUID) -> Workspace

Change the current workspace within the current organization.

This re-resolves the workspace and updates the storage paths accordingly. The organization remains unchanged.

Parameters:

  • workspace (str | UUID) –The workspace name, key, or uuid.UUID to switch to.

Returns:

  • Workspace –The resolved Workspace object.

Raises:

  • RuntimeError –If not configured or workspace not found.
configure(
*,
server: str | None = None,
api_key: str | None = None,
organization: str | UUID | None = None,
workspace: str | UUID | None = None,
project: str | UUID | None = None,
cache: Path | str | None = None,
storage_provider: StorageProvider | None = None,
trace_backend: TraceBackend | None = None,
console: ConsoleOptions | bool | None = None,
otel_scope: str = "dreadnode",
) -> Dreadnode

Configure the Dreadnode SDK.

Credential resolution follows profile precedence: explicit args > environment variables > saved profile defaults.

Parameters:

  • server (str | None, default: None ) –Platform API URL.
  • api_key (str | None, default: None ) –API key for authentication.
  • organization (str | UUID | None, default: None ) –Organization key/UUID override.
  • workspace (str | UUID | None, default: None ) –Workspace key/UUID override.
  • project (str | UUID | None, default: None ) –Project key/UUID override.
  • cache (Path | str | None, default: None ) –Local cache directory (default: ~/.dreadnode).
  • storage_provider (StorageProvider | None, default: None ) –Remote storage provider (s3, r2, minio). Auto-detected if not specified.
  • trace_backend (TraceBackend | None, default: None ) –Controls remote OTLP streaming.
  • console (ConsoleOptions | bool | None, default: None ) –Log span information to the console.
  • otel_scope (str, default: 'dreadnode' ) –The OpenTelemetry scope name.

Returns:

  • Dreadnode –Configured Dreadnode SDK instance.
continue_task(task_context: TaskContext) -> TaskSpan[t.Any]

Continue a task from captured context on a remote host.

Parameters:

  • task_context (TaskContext) –The TaskContext captured from get_task_context().

Returns:

  • TaskSpan[Any] –A TaskSpan object that can be used as a context manager.
evaluation(
func: Callable[..., Any] | None = None,
/,
*,
dataset: Any | None = None,
dataset_file: str | None = None,
name: str | None = None,
description: str = "",
tags: list[str] | None = None,
concurrency: int = 1,
iterations: int = 1,
max_errors: int | None = None,
max_consecutive_errors: int = 10,
dataset_input_mapping: list[str]
| dict[str, str]
| None = None,
parameters: dict[str, list[Any]] | None = None,
scorers: ScorersLike[Any] | None = None,
assert_scores: list[str] | Literal[True] | None = None,
) -> t.Any

Decorator to create an Evaluation from a function. See evaluation() for details.

get_current_run() -> TaskSpan[t.Any] | None

Get the current task span (backwards compatibility alias).

get_current_task() -> TaskSpan[t.Any] | None

Get the current task span.

get_task_context() -> TaskContext

Capture the current task context for transfer to another host, thread, or process.

Use continue_task() to continue the task anywhere else.

Returns:

  • TaskContext –TaskContext containing task state and trace propagation headers.

Raises:

  • RuntimeError –If called outside of an active task.
get_tracer(*, is_span_tracer: bool = True) -> Tracer

Get an OpenTelemetry Tracer instance.

Parameters:

  • is_span_tracer (bool, default: True ) –Whether the tracer is for creating spans.

Returns:

  • Tracer –An OpenTelemetry Tracer.
link_objects(
origin: Any,
link: Any,
attributes: AnyDict | None = None,
) -> None

Associate two runtime objects with each other.

This is useful for linking any two objects which are related to each other, such as a model and its training data, or an input prompt and the resulting output.

Example

with dreadnode.run("my_run"):
model = SomeModel()
data = SomeData()
dreadnode.link_objects(model, data)

Parameters:

  • origin (Any) –The origin object to link from.
  • link (Any) –The linked object to link to.
  • attributes (AnyDict | None, default: None ) –Additional attributes to attach to the link.
list_agents(org: str | None = None) -> list[PackageInfo]

List agents in a workspace.

Parameters:

  • org (str | None, default: None ) –Organization key. Uses configured org if not provided.

Returns:

  • list[PackageInfo] –List of agent PackageInfo.
list_projects(
org: str | None = None, workspace: str | None = None
) -> list[Project]

List projects in a workspace.

Parameters:

  • org (str | None, default: None ) –Organization key. Uses configured org if not provided.
  • workspace (str | None, default: None ) –Workspace key. Uses configured workspace if not provided.

Returns:

  • list[Project] –List of projects.
list_registry(
project_type: PackageType, *, org: str | None = None
) -> list[PackageInfo]

List packages available in the registry.

Currently lists packages from local storage. Remote registry support will be added when the API endpoint is available.

Parameters:

  • project_type (PackageType) –Type of package to list (datasets, models, tools, agents, environments).
  • org (str | None, default: None ) –Organization to filter

Returns:

  • list[PackageInfo] –List of PackageInfo objects.
list_workspaces(org: str | None = None) -> list[Workspace]

List workspaces the user has access to.

Parameters:

  • org (str | None, default: None ) –Organization key. Uses configured org if not provided.

Returns:

  • list[Workspace] –List of workspaces.
load_capability(capability: str | Path) -> Capability

Load a capability from an explicit path or from the configured capability search paths.

Returns a high-level Capability object that exposes the serialized capability manifest plus resolved agents, tools, skills, and MCP server definitions.

Parameters:

  • capability (str | Path) –Capability directory path or capability name.

Returns:

  • Capability –Capability ready to attach to an agent or server runtime.

Raises:

  • FileNotFoundError –If no capability with the requested name can be found.
load_dataset(
path: str | Path,
config: str | None = None,
*,
dataset_name: str | None = None,
split: str | None = None,
format: Literal[
"parquet", "arrow", "feather"
] = "parquet",
version: str | None = None,
**kwargs: Any,
) -> t.Any

Load a dataset from HuggingFace Hub or a local dataset source directory.

Parameters:

  • path (str | Path) –HuggingFace dataset path (e.g., “squad”, “imdb”, “glue”) or a local directory containing dataset.yaml.
  • config (str | None, default: None ) –Dataset configuration name (e.g., “cola” for glue dataset).
  • dataset_name (str | None, default: None ) –Name to store the dataset as locally. Defaults to the path.
  • split (str | None, default: None ) –Dataset split to load (e.g., “train”, “test”, “train[:100]”).
  • format (Literal['parquet', 'arrow', 'feather'], default: 'parquet' ) –Storage format (parquet, arrow, feather).
  • version (str | None, default: None ) –Version string for the stored dataset.
  • **kwargs (Any, default: {} ) –Additional arguments passed to HuggingFace’s load_dataset.

Returns:

  • Any –LocalDataset instance with the loaded data.

Example

import dreadnode as dn dn.configure(…) ds = dn.load_dataset(“glue”, “cola”, split=“train[:100]“)

load_model(
path: str | Path,
*,
model_name: str | None = None,
task: str | None = None,
format: Literal[
"safetensors", "pytorch"
] = "safetensors",
version: str | None = None,
**kwargs: Any,
) -> t.Any

Load a model from HuggingFace Hub or a local model source directory.

Parameters:

  • path (str | Path) –HuggingFace model path (e.g., “bert-base-uncased”, “gpt2”) or a local directory containing model.yaml.
  • model_name (str | None, default: None ) –Name to store the model as locally. Defaults to the path.
  • task (str | None, default: None ) –Task type for the model (e.g., “classification”, “generation”).
  • format (Literal['safetensors', 'pytorch'], default: 'safetensors' ) –Storage format (safetensors or pytorch).
  • version (str | None, default: None ) –Version string for the stored model.
  • **kwargs (Any, default: {} ) –Additional arguments passed to from_pretrained.

Returns:

  • Any –LocalModel instance with the loaded model.

Example

import dreadnode as dn dn.configure(…) model = dn.load_model(“bert-base-uncased”, task=“classification”)

load_package(
uri: str | Path | None = None,
type: PackageType | None = None,
) -> t.Any

Load a package (dataset, model, or agent) from the server.

Downloads and installs the package if not already installed, then loads it via entry points. Artifacts are fetched from CAS on demand.

Parameters:

  • uri (str | Path | None, default: None ) –Package URI (e.g., “dataset://org/name”, “model://org/name”).
  • type (PackageType | None, default: None ) –Package type hint if not specified in URI.

Returns:

  • Any –The loaded package object (Dataset, Model, or Agent).
log_artifact(
local_uri: str | Path, *, name: str | None = None
) -> None

Log a file or directory artifact to the current run.

This stores the artifact in the workspace CAS and uploads it to remote storage. Artifact metadata is recorded in artifacts.jsonl for tracking.

Examples:

Log a single file:

with dreadnode.run("my_run"):
# Save a file
with open("results.json", "w") as f:
json.dump(results, f)
# Log it as an artifact
dreadnode.log_artifact("results.json")

Log a directory:

with dreadnode.run("my_run"):
# Create a directory with model files
os.makedirs("model_output", exist_ok=True)
save_model("model_output/model.pkl")
save_config("model_output/config.yaml")
# Log the entire directory as an artifact
dreadnode.log_artifact("model_output")

Parameters:

  • local_uri (str | Path) –The local path to the file or directory to upload.
  • name (str | None, default: None ) –Optional name for the artifact (defaults to filename).
log_input(
name: str,
value: Any,
*,
label: str | None = None,
attributes: AnyDict | None = None,
) -> None

Log a single input to the current span.

Inputs can be any runtime object, which are serialized, stored, and tracked in the Dreadnode UI.

Parameters:

  • name (str) –The name of the input.
  • value (Any) –The input value to log.
  • label (str | None, default: None ) –Optional display label.
  • attributes (AnyDict | None, default: None ) –Optional additional attributes.

Example

@dreadnode.task
async def my_task(x: int) -> int:
dreadnode.log_input("input_name", x)
return x * 2
log_inputs(**inputs: Any) -> None

Log multiple inputs to the current span.

See log_input() for more details.

log_metric(
name: str,
value: float | bool,
*,
step: int = 0,
origin: Any | None = None,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
attributes: AnyDict | None = None,
) -> Metric
log_metric(
name: str,
value: Metric,
*,
origin: Any | None = None,
aggregation: MetricAggMode | None = None,
) -> Metric
log_metric(
name: str,
value: float | bool | Metric,
*,
step: int = 0,
origin: Any | None = None,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
attributes: AnyDict | None = None,
) -> Metric

Log a single metric to the current task or run.

Metrics are some measurement or recorded value related to the task or run. They can be used to track performance, resource usage, or other quantitative data.

Examples:

With a raw value:

with dreadnode.run("my_run"):
dreadnode.log_metric("accuracy", 0.95, step=10)
dreadnode.log_metric("loss", 0.05, step=10, aggregation="min")

With a Metric object:

with dreadnode.run("my_run"):
metric = Metric(0.95, step=10, timestamp=datetime.now(timezone.utc))
dreadnode.log_metric("accuracy", metric)

Parameters:

  • name (str) –The name of the metric.
  • value (float | bool | Metric) –The value of the metric, either as a raw float/bool or a Metric object.
  • step (int, default: 0 ) –The step of the metric.
  • origin (Any | None, default: None ) –The origin of the metric - can be provided any object which was logged as an input or output anywhere in the run.
  • timestamp (datetime | None, default: None ) –The timestamp of the metric - defaults to the current time.
  • aggregation (MetricAggMode | None, default: None ) –The aggregation to use for the metric. Helpful when you want to let the library take care of translating your raw values into better representations.
    • direct: do not modify the value at all (default)
    • min: the lowest observed value reported for this metric
    • max: the highest observed value reported for this metric
    • avg: the average of all reported values for this metric
    • sum: the cumulative sum of all reported values for this metric
    • count: increment every time this metric is logged - disregard value
  • attributes (AnyDict | None, default: None ) –A dictionary of additional attributes to attach to the metric.

Returns:

  • Metric –The logged metric object.
log_metrics(
metrics: dict[str, float | bool],
*,
step: int = 0,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
attributes: AnyDict | None = None,
origin: Any | None = None,
) -> list[Metric]
log_metrics(
metrics: list[MetricDict],
*,
step: int = 0,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
attributes: AnyDict | None = None,
origin: Any | None = None,
) -> list[Metric]
log_metrics(
metrics: MetricsLike,
*,
step: int = 0,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
attributes: AnyDict | None = None,
origin: Any | None = None,
) -> list[Metric]

Log multiple metrics to the current task or run.

Examples:

Log metrics from a dictionary:

dreadnode.log_metrics(
{
"accuracy": 0.95,
"loss": 0.05,
"f1_score": 0.92
},
step=10
)

Log metrics from a list of MetricDicts:

dreadnode.log_metrics(
[
{"name": "accuracy", "value": 0.95},
{"name": "loss", "value": 0.05, "aggregation": "min"}
],
step=10
)

Parameters:

  • metrics (MetricsLike) –Either a dictionary of name/value pairs or a list of MetricDicts to log.
  • step (int, default: 0 ) –Default step value for metrics if not supplied.
  • timestamp (datetime | None, default: None ) –Default timestamp for metrics if not supplied.
  • aggregation (MetricAggMode | None, default: None ) –Default aggregation for metrics if not supplied.
  • attributes (AnyDict | None, default: None ) –Default attributes for metrics if not supplied.
  • origin (Any | None, default: None ) –The origin of the metrics - can be provided any object which was logged as an input or output anywhere in the run.

Returns:

  • list[Metric] –List of logged Metric objects.
log_output(
name: str,
value: Any,
*,
label: str | None = None,
attributes: AnyDict | None = None,
) -> None

Log a single output to the current span.

Outputs can be any runtime object, which are serialized, stored, and tracked in the Dreadnode UI.

Parameters:

  • name (str) –The name of the output.
  • value (Any) –The value of the output.
  • label (str | None, default: None ) –An optional label for the output, useful for filtering in the UI.
  • attributes (AnyDict | None, default: None ) –Additional attributes to attach to the output.

Example

@dreadnode.task
async def my_task(x: int) -> int:
result = x * 2
dreadnode.log_output("result", result)
return result
log_outputs(**outputs: Any) -> None

Log multiple outputs to the current span.

See log_output() for more details.

log_param(key: str, value: JsonValue) -> None

Log a single parameter to the current run.

Parameters are key-value pairs that are associated with the run and can be used to track configuration values, hyperparameters, or other metadata.

Example

with dreadnode.run("my_run"):
dreadnode.log_param("param_name", "param_value")

Parameters:

  • key (str) –The name of the parameter.
  • value (JsonValue) –The value of the parameter.
log_params(**params: JsonValue) -> None

Log multiple parameters to the current run.

Parameters are key-value pairs that are associated with the run and can be used to track configuration values, hyperparameters, or other metadata.

Example

with dreadnode.run("my_run"):
dreadnode.log_params(
param1="value1",
param2="value2"
)

Parameters:

  • **params (JsonValue, default: {} ) –The parameters to log. Each parameter is a key-value pair.
log_sample(
label: str,
input: Any,
output: Any,
metrics: MetricsLike | None = None,
*,
step: int = 0,
) -> None

Convenience method to log an input/output pair with metrics as a ephemeral task.

This is useful for logging a single sample of input and output data along with any metrics that were computed during the process.

log_samples(
name: str,
samples: list[
tuple[Any, Any] | tuple[Any, Any, MetricsLike]
],
) -> None

Log multiple input/output samples as ephemeral tasks.

This is useful for logging a batch of input/output pairs with metrics in a single run.

Example

dreadnode.log_samples(
"my_samples",
[
(input1, output1, {"accuracy": 0.95}),
(input2, output2, {"accuracy": 0.90}),
]
)

Parameters:

  • name (str) –The name of the task to create for each sample.
  • samples (list[tuple[Any, Any] | tuple[Any, Any, MetricsLike]]) –A list of tuples containing (input, output, metrics [optional]).
login(
server: str,
api_key: str,
organization: str | UUID,
*,
workspace: str | UUID | None = None,
project: str | UUID | None = None,
cache: Path | str | None = None,
set_default_workspace: bool = True,
set_default_project: bool = True,
) -> Organization

Login to a Dreadnode server and save credentials to profile.

Authenticates with the server, resolves the organization, and saves the profile to ~/.dreadnode/config.yaml for future use.

Parameters:

  • server (str) –The Dreadnode server URL.
  • api_key (str) –The Dreadnode API key.
  • organization (str | UUID) –Organization key or ID to login to.
  • workspace (str | UUID | None, default: None ) –Default workspace to use.
  • project (str | UUID | None, default: None ) –Default project to use.
  • cache (Path | str | None, default: None ) –Local cache directory (default: ~/.dreadnode).
  • set_default_workspace (bool, default: True ) –Save workspace as default in profile.
  • set_default_project (bool, default: True ) –Save project as default in profile.

Returns:

  • Organization –The resolved Organization.

Raises:

  • RuntimeError –If authentication fails or organization not found.
optimize_anything(
*,
evaluator: Callable[..., Any] | None = None,
seed_candidate: str | dict[str, str] | None = None,
dataset: list[Any] | None = None,
trainset: list[Any] | None = None,
valset: list[Any] | None = None,
objective: str | None = None,
background: str | None = None,
name: str | None = None,
description: str = "",
tags: list[str] | None = None,
config: OptimizationConfig | None = None,
backend: str | OptimizationBackend[Any] = "gepa",
adapter: OptimizationAdapter[Any] | None = None,
) -> t.Any

Create an optimize_anything executor. See optimize_anything() for details.

pull_package(
packages: list[str], *, upgrade: bool = False
) -> PullResult

Download packages from the registry.

Parameters:

  • packages (list[str]) –Package names to install.
  • upgrade (bool, default: False ) –Upgrade if already installed.

Returns:

  • PullResult –PullResult with status.
push_capability(
capability: str | Path,
*,
name: str | None = None,
skip_upload: bool = False,
force: bool = False,
publish: bool = False,
) -> CapabilityPushResult

Build and push a capability directory to the OCI registry.

Before pushing, compares the local build SHA-256 against the remote. If the version already exists with the same content, the push is skipped. If the version exists with different content, an error is raised unless force=True.

Parameters:

  • capability (str | Path) –Capability directory path or resolvable local capability name.
  • name (str | None, default: None ) –Optional OCI repository name override. Bare names are prefixed with the active organization when available.
  • skip_upload (bool, default: False ) –Skip uploading to remote and only validate/build locally.
  • force (bool, default: False ) –Push even if the version already exists with different content.
  • publish (bool, default: False ) –Ensure the capability is public after upload or skip.

Returns:

  • CapabilityPushResult –Push result with status and details.
push_dataset(
dataset: str | Path,
*,
name: str | None = None,
skip_upload: bool = False,
publish: bool = False,
) -> PushResult

Build and push a dataset source directory to the OCI registry.

push_environment(
environment: str | Path,
*,
name: str | None = None,
skip_upload: bool = False,
force: bool = False,
publish: bool = False,
) -> PushResult

Build and push an environment directory with task.yaml to the OCI registry.

Before pushing, compares the local build SHA-256 against the remote. If the task already exists with the same content, the push is skipped unless force=True.

Parameters:

  • environment (str | Path) –Task directory path containing task.yaml.
  • name (str | None, default: None ) –Optional OCI repository name override. Bare names are prefixed with the active organization when available.
  • skip_upload (bool, default: False ) –Skip uploading to remote and only build locally.
  • force (bool, default: False ) –Push even if the remote SHA matches.
  • publish (bool, default: False ) –Ensure the task is public after upload or skip.

Returns:

  • PushResult –Push result with success status and details.
push_model(
model: str | Path,
*,
name: str | None = None,
skip_upload: bool = False,
publish: bool = False,
) -> PushResult

Build and push a model source directory to the OCI registry.

push_package(
path: str | Path, *, skip_upload: bool = False
) -> PushResult

Build and push a local package to the Dreadnode OCI Registry.

Handles artifact upload to CAS (for datasets/models) and OCI image push automatically.

Parameters:

  • path (str | Path) –Path to a dataset, model, or environment package project.
  • skip_upload (bool, default: False ) –Skip uploading to remote (local only).

Returns:

  • PushResult –PushResult with status and details.
push_update() -> None

Push any pending run data to the server before run completion.

This is useful for ensuring that the UI is up to date with the latest data. Data is automatically pushed periodically, but you can call this method to force a push.

Example

with dreadnode.run("my\_run"):
dreadnode.log\_params(...)
dreadnode.log\_metric(...)
dreadnode.push\_update()
```python
# do more work
run(
name: str | None = None,
*,
tags: Sequence[str] | None = None,
params: AnyDict | None = None,
project: str | None = None,
name_prefix: str | None = None,
attributes: AnyDict | None = None,
_tracer: Tracer | None = None,
) -> TaskSpan[t.Any]

Create a new top-level task span.

This sets up trace infrastructure and creates a task span that can contain agents, evaluations, studies, or other work.

Example

with dreadnode.run("my_experiment"):
# Run an agent, evaluation, or other work
await agent.run("do something")

Parameters:

  • name (str | None, default: None ) –The name of the task. If not provided, a random name will be generated.
  • tags (Sequence[str] | None, default: None ) –A list of tags to attach to the task.
  • params (AnyDict | None, default: None ) –A dictionary of parameters to attach to the task.
  • project (str | None, default: None ) –The project name to associate with. If not provided, the project passed to configure() will be used, or a default project will be used.
  • attributes (AnyDict | None, default: None ) –Additional attributes to attach to the span.

Returns:

  • TaskSpan[Any] –A TaskSpan object that can be used as a context manager.
scorer(
func: Callable[..., Any] | None = None,
*,
name: str | None = None,
assert_: bool = False,
attributes: AnyDict | None = None,
) -> t.Any

Create a scorer decorator. See scorer() for details.

serve(
host: str | None = None, port: int | None = None
) -> None

Start the agent server.

This starts a FastAPI server that provides REST + WebSocket endpoints for agent communication.

Parameters:

  • host (str | None, default: None ) –Host to bind to. Defaults to DREADNODE_RUNTIME_HOST (legacy: DREADNODE_SERVER_HOST) or 127.0.0.1.
  • port (int | None, default: None ) –Port to bind to. Defaults to DREADNODE_RUNTIME_PORT (legacy: DREADNODE_SERVER_PORT) or 8787.

Example

import dreadnode as dn
dn.configure()
dn.serve(port=8787)
set_capability_visibility(
org: str, name: str, *, is_public: bool
) -> None

Update capability visibility for all versions of a capability name.

set_dataset_visibility(
org: str, name: str, *, is_public: bool
) -> None

Update dataset visibility for all versions of a dataset name.

set_model_visibility(
org: str, name: str, *, is_public: bool
) -> None

Update model visibility for all versions of a model name.

set_task_visibility(
org: str, name: str, *, is_public: bool
) -> None

Update task visibility for all versions of a task name.

shutdown() -> None

Shutdown any associate OpenTelemetry components and flush any pending spans.

It is not required to call this method, as the SDK will automatically flush and shutdown when the process exits.

However, if you want to ensure that all spans are flushed before exiting, you can call this method manually.

span(
name: str,
*,
tags: Sequence[str] | None = None,
attributes: AnyDict | None = None,
) -> Span

Create a new OpenTelemety span.

Spans are more lightweight than tasks, but still let you track work being performed and view it in the UI. You cannot log parameters, inputs, or outputs to spans.

Example

with dreadnode.span("my_span") as span:
# do some work here
pass

Parameters:

  • name (str) –The name of the span.
  • tags (Sequence[str] | None, default: None ) –A list of tags to attach to the span.
  • attributes (AnyDict | None, default: None ) –A dictionary of attributes to attach to the span.

Returns:

  • Span –A Span object.
study(
func: Callable[..., Any] | None = None,
/,
*,
name: str | None = None,
search_strategy: Any | None = None,
dataset: Any | None = None,
dataset_file: str | None = None,
objectives: ScorersLike[Any] | None = None,
directions: list[Direction] | None = None,
constraints: ScorersLike[Any] | None = None,
max_trials: int = 100,
concurrency: int = 1,
stop_conditions: list[Any] | None = None,
) -> t.Any

Decorator to create a Study from a task factory. See study() for details.

sync_capabilities(
directory: str | Path,
*,
force: bool = False,
publish: bool = False,
on_progress: Callable[[str, str, str | None], None]
| None = None,
) -> CapabilitySyncResult

Sync capabilities from a directory to the platform.

Discovers all capabilities (directories containing capability.yaml), compares each against the latest remote version by SHA-256, and pushes only those that have changed. Optionally publishes them to the public catalog.

To push a single capability, use :meth:push_capability instead.

Parameters:

  • directory (str | Path) –Root directory containing capability subdirectories.
  • force (bool, default: False ) –Upload even when the remote SHA matches.
  • publish (bool, default: False ) –Ensure is_public=True after upload or skip.

Returns:

  • CapabilitySyncResult –class:CapabilitySyncResult with uploaded/skipped/failed details.
sync_environments(
directory: str | Path,
*,
force: bool = False,
publish: bool = False,
max_workers: int = 8,
on_progress: Callable[[str, str, str | None], None]
| None = None,
on_status: Callable[[str], None] | None = None,
) -> EnvironmentSyncResult

Sync task environments from a directory to the platform.

Discovers all subdirectories containing task.yaml, compares each against the exact remote version by OCI layer SHA-256, and pushes only those that have changed.

Parameters:

  • directory (str | Path) –Root directory containing task subdirectories.
  • force (bool, default: False ) –Upload even when the remote SHA matches.
  • publish (bool, default: False ) –Ensure is_public=True after upload or skip.
  • max_workers (int, default: 8 ) –Maximum parallel build/upload threads.
  • on_progress (Callable[[str, str, str | None], None] | None, default: None ) –Optional callback (name, status, error) for each task.

Returns:

  • EnvironmentSyncResult –class:EnvironmentSyncResult with uploaded/skipped/failed details.
tag(*tag: str) -> None

Add one or many tags to the current span.

Example

with dreadnode.run("my_run"):
dreadnode.tag("my_tag")

Parameters:

  • tag (str, default: () ) –The tag(s) to attach.
task(
func: Callable[P, Awaitable[R]]
| Callable[P, R]
| None = None,
/,
*,
scorers: ScorersLike[Any] | None = None,
name: str | None = None,
label: str | None = None,
log_inputs: Sequence[str]
| bool
| Inherited = INHERITED,
log_output: bool | Inherited = INHERITED,
log_execution_metrics: bool = False,
tags: Sequence[str] | None = None,
attributes: AnyDict | None = None,
entrypoint: bool = False,
) -> TaskDecorator | ScoredTaskDecorator[R] | Task[P, R]

Create a new task from a function. See task() for details.

task_and_run(
name: str,
*,
task_name: str | None = None,
task_type: SpanType = "task",
project: str | None = None,
tags: Sequence[str] | None = None,
params: AnyDict | None = None,
inputs: AnyDict | None = None,
label: str | None = None,
_tracer: Tracer | None = None,
) -> t.Iterator[TaskSpan[t.Any]]

Create a task span, setting up trace infrastructure if needed.

If no trace context exists, this sets up exporters and creates the span as a top-level span. The span type (evaluation, study, agent, etc.) becomes the root of the trace.

Parameters:

  • name (str) –Name for the task span.
  • task_name (str | None, default: None ) –Optional separate name for the task span. If not provided, uses name.
  • task_type (SpanType, default: 'task' ) –The type of span to create (task, evaluation, study, agent, etc.).
  • project (str | None, default: None ) –Project for trace storage.
  • tags (Sequence[str] | None, default: None ) –Tags to attach to the span.
  • params (AnyDict | None, default: None ) –Parameters to log.
  • inputs (AnyDict | None, default: None ) –Inputs to log.
  • label (str | None, default: None ) –Display label for the span.
task_env(
task_ref: str,
*,
inputs: dict[str, Any] | None = None,
secret_ids: list[str] | None = None,
project_id: str | None = None,
timeout_sec: int | None = None,
) -> TaskEnvironment

Construct a TaskEnvironment bound to this profile’s org/workspace.

The environment is not provisioned until setup() (or async with) is called. Pulls api_client/organization/workspace from the active profile.

Example::

import dreadnode as dn
async with dn.task_env("acme/[email protected]", inputs={"host": "x"}) as env:
await env.execute("curl -sS $web_url/login")
task_span(
name: str,
*,
type: SpanType = "task",
label: str | None = None,
tags: Sequence[str] | None = None,
attributes: AnyDict | None = None,
_tracer: Tracer | None = None,
) -> TaskSpan[t.Any]

Create a task span without an explicit associated function.

This is useful for creating tasks on the fly without having to define a function.

Example

async with dreadnode.task_span("my_task") as task:
# do some work here
pass

Args: name: The name of the task. type: The type of span (task, evaluation, etc.). label: The label of the task - useful for filtering in the UI. tags: A list of tags to attach to the task span. attributes: A dictionary of attributes to attach to the task span.

Returns:

  • TaskSpan[Any] –A TaskSpan object.
train(
config: str | Path | dict[str, Any],
*,
prompts: list[str] | None = None,
reward_fn: Callable[[list[str], list[str]], list[float]]
| None = None,
scorers: ScorersLike[Any] | None = None,
) -> t.Any

Train a model using a YAML configuration file.

This is the main entry point for training LLMs with GRPO, SFT, DPO, PPO, or other training methods supported by the Ray training framework.

Example YAML config (grpo.yaml):

trainer: grpo
model\_name: Qwen/Qwen2.5-1.5B-Instruct
max\_steps: 100
num\_prompts\_per\_step: 4
num\_generations\_per\_prompt: 4
learning\_rate: 1e-6
temperature: 0.7
```python
# Dataset - supports dreadnode datasets, huggingface, jsonl, or inline
dataset:
type: dreadnode # or huggingface, jsonl, list
name: my-dataset # dreadnode dataset name
prompt_field: question
# Reward - supports dreadnode scorers or built-in types
reward:
type: scorer # Use dreadnode scorer
# or type: correctness, length, contains
Usage
```python
import dreadnode as dn
# Train from YAML config
result = dn.train("config/grpo.yaml")
# Train with dreadnode dataset and scorers
@dn.scorer
def correctness(completion: str) -> float:
return 1.0 if "answer" in completion else 0.0
result = dn.train(
{"trainer": "grpo", "model_name": "..."},
prompts=dn.load("my-dataset").to_prompts("question"),
scorers=[correctness],
)
# Train with custom prompts and reward function
result = dn.train(
"config/grpo.yaml",
prompts=["What is 2+2?", "What is 3*4?"],
reward_fn=my_reward_fn,
)

Parameters:

  • config (str | Path | dict[str, Any]) –Path to YAML config file, or dict with config values.
  • prompts (list[str] | None, default: None ) –Optional list of prompts (overrides dataset in config).
  • reward_fn (Callable[[list[str], list[str]], list[float]] | None, default: None ) –Optional reward function (overrides reward/scorers).
  • scorers (ScorersLike[Any] | None, default: None ) –Optional dreadnode Scorers to use as reward (converted to reward_fn).

Returns:

  • Any –Training result (trainer-specific).

Adapter that evaluates agent instruction candidates with Evaluation.

apply_candidate(candidate: dict[str, str]) -> Agent

Clone the agent and apply an instruction-only candidate.

evaluate(
batch: list[dict[str, Any]],
candidate: dict[str, str],
*,
capture_traces: bool = False,
) -> OptimizationEvaluationBatch

Evaluate one batch of examples and return per-example scores.

evaluate_candidate(
candidate: dict[str, str],
example: dict[str, Any] | None = None,
) -> OptimizationEvaluation

Evaluate one candidate in a GEPA-compatible (score, side_info) shape.

make_reflective_dataset(
candidate: dict[str, str],
eval_batch: OptimizationEvaluationBatch,
components_to_update: list[str],
) -> dict[str, list[dict[str, t.Any]]]

Build component-scoped reflective data for GEPA.

seed_candidate() -> dict[str, str]

Return the current instruction candidate for this agent.

EnvVar(
name: str,
*,
default: Any | Unset = UNSET,
required: bool = True,
)

A Context marker for an environment variable.

Evaluation of a task against a dataset.

Attributes:

  • task (Task[..., Out] | str) –The task to evaluate.
  • dataset (Any | None) –The dataset to use for the evaluation.
  • dataset_file (FilePath | str | None) –File path of a JSONL, CSV, JSON, or YAML dataset.
  • name (str) –The name of the evaluation.
  • dataset_input_mapping (list[str] | dict[str, str] | None) –Mapping from dataset keys to task parameter names.
  • preprocessor (InputDatasetProcessor | None) –Optional preprocessor for the dataset.
  • scorers (ScorersLike[Out]) –Scorers to evaluate task output.
  • assert_scores (list[str] | Literal[True]) –Scores to assert are truthy.
  • trace (bool) –Whether to produce trace contexts.
max_consecutive_errors: int | None = Config(default=10)

Maximum consecutive errors before stopping the evaluation.

max_errors: int | None = Config(default=None)

Maximum total errors before stopping the evaluation.

console() -> EvalResult[In, Out]

Run the evaluation with a live display in the console.

with_(
*,
name: str | None = None,
description: str | None = None,
tags: list[str] | None = None,
label: str | None = None,
task: Task[..., Out] | str | None = None,
dataset: Any | None = None,
concurrency: int | None = None,
iterations: int | None = None,
max_errors: int | None = None,
max_consecutive_errors: int | None = None,
parameters: dict[str, list[Any]] | None = None,
scorers: ScorersLike[Out] | None = None,
assert_scores: list[str] | Literal[True] | None = None,
append: bool = False,
) -> te.Self

Create a modified clone of the evaluation.

Image(
data: ImageDataOrPathType,
mode: str | None = None,
caption: str | None = None,
format: str | None = None,
)

Image media type for Dreadnode logging.

This class maintains a high-fidelity float32 numpy array as the canonical representation, ensuring no precision loss during use in transforms, scorers, and optimization routines.

Initialize an Image object.

Parameters:

  • data (ImageDataOrPathType) –The image data, which can be:
    • A file path (str or Path)
    • A base64-encoded string (starting with “data:image/”)
    • Raw bytes of an image file
    • A numpy array (HWC or HW format)
    • A Pillow Image object
  • mode (str | None, default: None ) –Optional mode for the image (RGB, L, etc.)
  • caption (str | None, default: None ) –Optional caption for the image
  • format (str | None, default: None ) –Optional format to use when saving (png, jpg, etc.)
canonical_array: ndarray[Any, dtype[float32]]

Get the canonical high-fidelity representation.

Returns:

  • ndarray[Any, dtype[float32]] –float32 numpy array in [0,1] range, HWC format
mode: str

Get the image mode (L, RGB, RGBA, etc.).

shape: tuple[int, ...]

Get the shape of the canonical array.

resize(
height: int, width: int, *, resample: int | None = None
) -> Image

Resize the image to the specified size.

Parameters:

  • height (int) –The desired height of the image.
  • width (int) –The desired width of the image.
  • resample (int | None, default: None ) –Resampling filter to use (see PIL.Image for options).

Returns:

  • Image –New Image object with resized image
show() -> None

Displays the image using the default image viewer.

to_base64() -> str

Returns the image as a base64 encoded string.

to_numpy(
dtype: Any = np.float32,
) -> np.ndarray[t.Any, t.Any]

Returns the image as a NumPy array with specified dtype.

Parameters:

  • dtype (Any, default: float32 ) –Target dtype. Common options:
    • np.float32/np.float64: Values in [0.0, 1.0] (recommended)
    • np.uint8: Values in [0, 255]

Returns:

  • ndarray[Any, Any] –NumPy array in HWC format (or HW for grayscale)
to_pil() -> PILImage

Returns the image as a Pillow Image object.

to_serializable() -> tuple[bytes, dict[str, t.Any]]

Convert the image to bytes and return with metadata.

Returns:

  • tuple[bytes, dict[str, Any]] –Tuple of (image_bytes, metadata_dict)
Markdown(text: str)

Hint type for markdown-formatted text.

This is a subclass of Text with format set to “markdown”.

Example

log_output("report", Markdown("..."))

Any reported value regarding the state of a run, task, and optionally object (input/output).

Attributes:

  • value (float) –The value of the metric, e.g. 0.5, 1.0, 2.0, etc.
  • step (int) –An step value to indicate when this metric was reported.
  • timestamp (datetime) –The timestamp when the metric was reported.
  • attributes (JsonDict) –A dictionary of attributes to attach to the metric.
apply_aggregation(
agg: MetricAggMode, others: list[Metric]
) -> Metric

Apply an aggregation mode to the metric. This will modify the metric in place.

Parameters:

  • agg (MetricAggMode) –The aggregation to apply. One of “sum”, “min”, “max”, or “count”.
  • others (list[Metric]) –A list of other metrics to apply the aggregation to.

Returns:

  • Metric –self
from_many(
values: Sequence[tuple[str, float, float]],
step: int = 0,
**attributes: JsonValue,
) -> Metric

Create a composite metric from individual values and weights.

This is useful for creating a metric that is the weighted average of multiple values. The values should be a sequence of tuples, where each tuple contains the name of the metric, the value of the metric, and the weight of the metric.

The individual values will be reported in the attributes of the metric.

Parameters:

  • values (Sequence[tuple[str, float, float]]) –A sequence of tuples containing the name, value, and weight of each metric.
  • step (int, default: 0 ) –The step value to attach to the metric.
  • **attributes (JsonValue, default: {} ) –Additional attributes to attach to the metric.

Returns:

  • Metric –A composite Metric

A series of metric values with aggregation computed on read.

This replaces dict[str, list[Metric]] for metric storage. Raw values are always preserved, and any aggregation can be computed at query time.

Attributes:

  • values (list[float]) –The raw metric values in order of logging.
  • steps (list[int | None]) –Optional step indices for each value.
  • timestamps (list[datetime]) –Timestamps for each value.
value: float | None

Convenience property for single-value series (same as last).

append(
value: float,
step: int | None = None,
timestamp: datetime | None = None,
) -> None

Append a value to the series.

at_step(step: int) -> float | None

Get the value at a specific step.

count() -> int

Get the number of values.

first() -> float | None

Get the first value in the series.

last() -> float | None

Get the last value in the series.

max() -> float | None

Get the maximum value.

mean() -> float | None

Compute the mean of all values.

min() -> float | None

Get the minimum value.

sum() -> float

Get the sum of all values.

to_metric(aggregation: MetricAggMode = 'avg') -> Metric

Convert to a single Metric using the specified aggregation.

values_at_steps(steps: Sequence[int]) -> list[float | None]

Get values at multiple steps.

Object3D(
data: Object3DDataType,
caption: str | None = None,
format: str | None = None,
)

3D object media type for Dreadnode logging.

Supports:

  • Local file paths to 3D models (.obj, .glb, .gltf, etc.)
  • Raw bytes with metadata

Initialize a 3D Object.

Parameters:

  • data (Object3DDataType) –The 3D object data, which can be:
    • A path to a local 3D model file (str or Path)
    • Raw bytes of a 3D model file
  • caption (str | None, default: None ) –Optional caption for the 3D object
  • format (str | None, default: None ) –Optional format override (obj, glb, etc.)
to_serializable() -> tuple[bytes, dict[str, t.Any]]

Convert the 3D object to bytes and return with metadata.

Returns:

  • tuple[bytes, dict[str, Any]] –A tuple of (object_bytes, metadata_dict)

Dreadnode-native optimize_anything executor.

effective_dataset: list[Any] | None

Return the trainset if provided, otherwise dataset.

optimization_id: UUID

Stable identifier for this optimization run.

console() -> OptimizationResult[CandidateT]

Run the optimization with a live console adapter.

Top-level configuration for Dreadnode optimize_anything runs.

OptimizationResult(
backend: str,
seed_candidate: CandidateT | None = None,
best_candidate: CandidateT | None = None,
best_score: float | None = None,
best_scores: dict[str, float] = dict(),
objective: str | None = None,
train_size: int = 0,
val_size: int = 0,
pareto_frontier: list[CandidateT] = list(),
history: list[Any] = list(),
metadata: dict[str, Any] = dict(),
raw_result: Any = None,
)

Result of a Dreadnode optimize_anything run.

frontier_size: int

Return the number of candidates currently on the Pareto frontier.

to_dict() -> dict[str, t.Any]

Return a JSON-serializable result dictionary.

ParentTask(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the parent of the current task span from the current context.

Scorer(
func: ScorerCallable[T],
*,
name: str | None = None,
assert_: bool = False,
attributes: JsonDict | None = None,
catch: bool = False,
step: int = 0,
auto_increment_step: bool = False,
log_all: bool = True,
bound_obj: Any | Unset = UNSET,
config: dict[str, ConfigInfo] | None = None,
context: dict[str, Context] | None = None,
wraps: Callable[..., Any] | None = None,
)

A stateful, configurable, and composable wrapper for a scoring function.

A Scorer is a specialized Component that evaluates an object and produces a Metric. It inherits the configuration and context-awareness of a Component, allowing scorers to be defined with dn.Config and dn.Context parameters.

Attributes:

  • name –The name of the scorer.
  • attributes –A dictionary of attributes to attach to each generated metric.
  • catch –Whether to catch exceptions during scoring and log a warning instead.
  • step –An optional step value to attach to generated metrics.
  • auto_increment_step –Whether to automatically increment the step after each scoring.
  • log_all –Whether to log all sub-metrics from nested compositions.
  • bound_obj –An optional object to bind the scorer to, overriding the caller-provided object.

Examples: @dn.scorer(name="length_scorer", catch=True) async def length_scorer(text: str) -> float: return len(text) / 100.0 # Normalize length to [0.0, 1.0]

above(
threshold: float, *, name: str | None = None
) -> ScoringCondition[T]

Create a ScoringCondition that passes if score > threshold.

The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.

Parameters:

  • threshold (float) –The value the score must exceed.
  • name (str | None, default: None ) –Optional name for the condition.

Returns:

  • ScoringCondition[T] –A ScoringCondition that passes if score > threshold.

Examples:

@hook(GenerationStep, when=[quality.above(0.5)])
async def high_quality_only(event):
# event.metrics["quality"] is available
...
as_condition(
*, name: str | None = None
) -> ScoringCondition[T]

Create a ScoringCondition that always passes but attaches the metric.

Use this when you want to record the score without gating. The metric will be attached to the event for logging/telemetry.

Parameters:

  • name (str | None, default: None ) –Optional name for the condition.

Returns:

  • ScoringCondition[T] –A ScoringCondition that always passes.

Examples:

@hook(GenerationStep, when=[
quality.above(0.5), # Gates on quality
safety.as_condition(), # Just records safety metric
])
async def observe(event):
# Both metrics available: event.metrics["quality"], event.metrics["safety"]
...
as_scorer(
func: Callable[[OuterT], T], *, name: str | None = None
) -> Scorer[OuterT]

Adapts a scorer to operate with some other type

A wrapper that allows a generic scorer (e.g., one that refines a string) to be used with a complex candidate object (e.g., a Pydantic model containing that string).

Parameters:

  • func (Callable[[OuterT], T]) –A function to convert from some outer type to the scorer’s expected type.
  • name (str | None, default: None ) –An optional new name for the adapted scorer.

Returns:

  • Scorer[OuterT] –A new Scorer instance that operates on the OuterT.
assert_off() -> Scorer[T]

Mark this scorer as not an assertion.

assert_on() -> Scorer[T]

Mark this scorer as an assertion (must be truthy).

at_least(
threshold: float, *, name: str | None = None
) -> ScoringCondition[T]

Create a ScoringCondition that passes if score >= threshold.

The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.

Parameters:

  • threshold (float) –The minimum acceptable value.
  • name (str | None, default: None ) –Optional name for the condition.

Returns:

  • ScoringCondition[T] –A ScoringCondition that passes if score >= threshold.

Examples:

@hook(GenerationStep, when=[confidence.at_least(0.8)])
async def confident_only(event):
...
at_most(
threshold: float, *, name: str | None = None
) -> ScoringCondition[T]

Create a ScoringCondition that passes if score <= threshold.

The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.

Parameters:

  • threshold (float) –The maximum acceptable value.
  • name (str | None, default: None ) –Optional name for the condition.

Returns:

  • ScoringCondition[T] –A ScoringCondition that passes if score <= threshold.

Examples:

@hook(GenerationStep, when=[toxicity.at_most(0.1)])
async def non_toxic_only(event):
...
below(
threshold: float, *, name: str | None = None
) -> ScoringCondition[T]

Create a ScoringCondition that passes if score < threshold.

The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.

Parameters:

  • threshold (float) –The value the score must be below.
  • name (str | None, default: None ) –Optional name for the condition.

Returns:

  • ScoringCondition[T] –A ScoringCondition that passes if score < threshold.

Examples:

@hook(GenerationStep, when=[quality.below(0.5)])
async def retry_low_quality(event) -> Reaction:
return RetryWithFeedback(f"Quality {event.metrics['quality'].value} too low")
bind(obj: Any) -> Scorer[t.Any]

Bind the scorer to a specific object. Any time the scorer is executed, the bound object will be passed instead of the caller-provided object.

This is useful for building scoring patterns that are not directly tied to the output of a task.

Examples:

@dn.task(scorers=[
dn.scorers.image_distance(reference).bind(dn.TaskInput("image"))
])
async def classify(image: dn.Image) -> str:
...

Parameters:

  • obj (Any) –The object to bind the scorer to.

Returns:

  • Scorer[Any] –A new Scorer bound to the specified object.
clone() -> Scorer[T]

Clone the scorer.

evaluate(
obj: T,
scorers: ScorersLike[T],
*,
step: int | None = None,
assert_scores: Literal[True, False]
| list[str]
| None = None,
) -> dict[str, list[Metric]]

Run multiple scorers against an object and collect metrics.

Parameters:

  • obj (T) –The object to score.
  • scorers (ScorersLike[T]) –A list of scorers to use.
  • step (int | None, default: None ) –An optional step value to attach to all generated metrics.
  • assert_scores (Literal[True, False] | list[str] | None, default: None ) –Controls assertion behavior:
    • None (default): Use each scorer’s assert_ field
    • True: Assert ALL scorers must be truthy
    • False: Disable all assertions
    • list[str]: Assert only these scorer names (overrides scorer.assert_)

Returns:

  • dict[str, list[Metric]] –A dictionary mapping scorer names to their generated metrics.

Raises:

  • AssertionFailedError –If any asserted scores have falsy values.
fit(scorer: ScorerLike[T]) -> Scorer[T]

Fit a scorer to the given attributes.

Parameters:

  • scorer (ScorerLike[T]) –The scorer to fit.

Returns:

  • Scorer[T] –A Scorer instance.
fit_many(scorers: ScorersLike[T] | None) -> list[Scorer[T]]

Convert a collection of scorer-like objects into a list of Scorer instances.

This method provides a flexible way to handle different input formats for scorers, automatically converting callables to Scorer objects and applying consistent naming and attributes across all scorers.

Parameters:

  • scorers (ScorersLike[T] | None) –A collection of scorer-like objects. Can be:
    • A dictionary mapping names to scorer objects or callables
    • A sequence of scorer objects or callables
    • None (returns empty list)

Returns:

  • list[Scorer[T]] –A list of Scorer instances with consistent configuration.
normalize_and_score(
obj: T, *args: Any, **kwargs: Any
) -> list[Metric]

Executes the scorer and returns all generated metrics, including from nested compositions.

Parameters:

  • obj (T) –The object to score.

Returns:

  • list[Metric] –All metrics generated by the scorer.
on(
event_type: type[AgentEventT],
*,
adapter: Callable[[AgentEventT], Any] | None = None,
**kwargs: Any,
) -> ScorerHook[AgentEventT]

Create a ScorerHook that runs this scorer on agent events.

.. deprecated:: Use @hook(EventType, when=[scorer.above(threshold)]) instead. Or use .above(), .below(), .as_condition() for scoring conditions.

This enables per-step scoring during agent execution, even outside of an Evaluation context.

Parameters:

  • event_type (type[AgentEventT]) –The event type to trigger on (e.g., GenerationStep, ToolStep).
  • adapter (Callable[[AgentEventT], Any] | None, default: None ) –Optional function to extract the object to score from the event.
  • **kwargs (Any, default: {} ) –Additional arguments passed to ScorerHook.

Returns:

  • ScorerHook[AgentEventT] –A ScorerHook configured to run this scorer on matching events.

Examples:

@dn.scorer
async def quality(text: str) -> float:
return await check_quality(text)
# Score generation outputs
hook = quality.on(
GenerationStep,
adapter=lambda e: e.messages[0].content if e.messages else "",
)
# Use with threshold reactions
hook = quality.on(GenerationStep, adapter=...).retry_if_below(0.5)
# Add to agent
agent = Agent(
...,
scorers=[hook],
)
rename(new_name: str) -> Scorer[T]

Rename the scorer.

Parameters:

  • new_name (str) –The new name for the scorer.

Returns:

  • Scorer[T] –A new Scorer with the updated name.
score(obj: T, *args: Any, **kwargs: Any) -> Metric

Execute the scorer and return the metric. If the scorer is a composition of other scorers, it will return the “highest-priority” metric, typically the first in the list.

Any output value will be converted to a Metric object if not already one.

Parameters:

  • obj (T) –The object to score.

Returns:

  • Metric –A Metric object.
score_composite(
obj: T, *args: Any, **kwargs: Any
) -> tuple[Metric, list[Metric]]

Executes the scorer and returns both the primary Metric and a list of any additional metrics from nested compositions.

Parameters:

  • obj (T) –The object to score.

Returns:

  • tuple[Metric, list[Metric]] –A tuple of the primary Metric and a list of all metrics generated.
with_(
*,
name: str | None = None,
assert_: bool | None = None,
attributes: JsonDict | None = None,
step: int | None = None,
auto_increment_step: bool | None = None,
catch: bool | None = None,
log_all: bool | None = None,
) -> Scorer[T]

Create a new Scorer with updated properties.

Parameters:

  • name (str | None, default: None ) –New name for the scorer.
  • attributes (JsonDict | None, default: None ) –New attributes for the scorer.
  • step (int | None, default: None ) –New step value for the scorer.
  • auto_increment_step (bool | None, default: None ) –Automatically increment the step for each time this scorer is called.
  • catch (bool | None, default: None ) –Catch exceptions in the scorer function.
  • log_all (bool | None, default: None ) –Log all sub-metrics from nested composition.

Returns:

  • Scorer[T] –A new Scorer with the updated properties
Span(
name: str,
tracer: Tracer,
*,
attributes: AnyDict | None = None,
label: str | None = None,
type: SpanType = "span",
tags: Sequence[str] | None = None,
)
active: bool

Check if the span is currently active (recording).

duration: float

Get the duration of the span in seconds.

exception: BaseException | None

Get the exception recorded in the span, if any.

failed: bool

Check if the span has failed.

is_recording: bool

Check if the span is currently recording.

label: str

Get the label of the span.

Table(
data: TableDataType,
caption: str | None = None,
format: str | None = None,
*,
index: bool = False,
)

Table data type for Dreadnode logging.

Supports:

  • Pandas DataFrames
  • CSV/Parquet/JSON files
  • Dict or list data structures
  • NumPy arrays

Initialize a Table object.

Parameters:

  • data (TableDataType) –The table data, which can be:
    • A pandas DataFrame
    • A path to a CSV/JSON/Parquet file
    • A dict or list of dicts
    • A NumPy array
  • caption (str | None, default: None ) –Optional caption for the table
  • format (str | None, default: None ) –Optional format to use when saving (csv, parquet, json)
  • index (bool, default: False ) –Include index in the output
to_serializable() -> tuple[bytes, dict[str, t.Any]]

Convert the table to bytes and return with metadata.

Returns:

  • tuple[bytes, dict[str, Any]] –A tuple of (table_bytes, metadata_dict)
Task(
func: Callable[P, R],
tracer: Tracer,
*,
name: str | None = None,
label: str | None = None,
scorers: ScorersLike[R] | None = None,
assert_scores: list[str] | Literal[True] | None = None,
log_inputs: Sequence[str]
| bool
| Inherited = INHERITED,
log_output: bool | Inherited = INHERITED,
log_execution_metrics: bool = False,
tags: Sequence[str] | None = None,
attributes: AnyDict | None = None,
entrypoint: bool = False,
config: dict[str, ConfigInfo] | None = None,
context: dict[str, Context] | None = None,
)

Structured task wrapper for a function that can be executed within a run.

Tasks allow you to associate metadata, inputs, outputs, and metrics for a unit of work.

Parameters:

  • func (Callable[P, R]) –The function to wrap as a task.
  • tracer (Tracer) –The tracer to use for tracing spans. If None, uses the default tracer.
  • name (str | None, default: None ) –The name of the task. This is used for logging and tracing.
  • label (str | None, default: None ) –The label of the task - used to group associated metrics and data together.
  • scorers (ScorersLike[R] | None, default: None ) –A list of scorers to evaluate the task’s output.
  • tags (Sequence[str] | None, default: None ) –A list of tags to attach to the task span.
  • attributes (AnyDict | None, default: None ) –A dictionary of attributes to attach to the task span.”
  • log_inputs (Sequence[str] | bool | Inherited, default: INHERITED ) –Log all, or specific, incoming arguments to the function as inputs.
  • log_output (bool | Inherited, default: INHERITED ) –Log the result of the function as an output.
  • log_execution_metrics (bool, default: False ) –Track execution metrics such as success rate and run count.
  • entrypoint (bool, default: False ) –Indicate this task should be considered an entrypoint.
  • config (dict[str, ConfigInfo] | None, default: None ) –Configuration schema for the task parameters.
  • context (dict[str, Context] | None, default: None ) –Context schema for the task execution.
clone() -> Task[P, R]

Clone a task.

Returns:

  • Task[P, R] –A new Task instance with the same attributes as this one.
many(count: int, *args: args, **kwargs: kwargs) -> list[R]

Run the task multiple times and return a list of outputs.

Parameters:

  • count (int) –The number of times to run the task.
  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task.

Returns:

  • list[R] –A list of outputs from each task execution.
map(
args: list[Any] | dict[str, Any | list[Any]],
*,
concurrency: int | None = None,
) -> list[R]

Runs this task multiple times by mapping over iterable arguments.

Examples:

@dn.task
async def my_task(input: str, *, suffix: str = "") -> str:
return f"Processed {input}{suffix}"
# Map over a list of basic inputs
await task.map_run(["1", "2", "3"])
# Map over a dict of parameters
await task.map_run({
"input": ["1", "2", "3"],
"suffix": ["_a", "_b", "_c"]
})

Parameters:

  • args (list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.
  • concurrency (int | None, default: None ) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.

Returns:

  • list[R] –A TaskSpanList containing the results of each execution.
retry(count: int, *args: args, **kwargs: kwargs) -> R

Run the task up to count times, returning the output of the first successful execution, otherwise raise the most recent exception.

This is a powerful pattern for non-deterministic tasks where multiple attempts may be needed to generate a valid output according to the task’s assert_scores. However, it can also be useful as a retry mechanism for transient errors.

Parameters:

  • count (int) –The maximum number of times to run the task.
  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task.

Returns:

  • R –The output of the first successful and valid task execution.
run(*args: args, **kwargs: kwargs) -> TaskSpan[R]

Execute the task and return the result as a TaskSpan. If the task fails, an exception is raised.

Parameters:

  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task
run_always(*args: args, **kwargs: kwargs) -> TaskSpan[R]

Execute the task and return the result as a TaskSpan.

Note, if the task fails, the span will still be returned with the exception set.

Parameters:

  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task.

Returns:

  • TaskSpan[R] –The span associated with task execution.
stream_many(
count: int, *args: args, **kwargs: kwargs
) -> t.AsyncContextManager[
t.AsyncGenerator[TaskSpan[R], None]
]

Run the task multiple times concurrently and yield each TaskSpan as it completes.

Parameters:

  • count (int) –The number of times to run the task.
  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task

Yields:

  • AsyncContextManager[AsyncGenerator[TaskSpan[R], None]] –TaskSpan for each task execution, or an Exception if the task fails.
stream_map(
args: list[Any] | dict[str, Any | list[Any]],
*,
concurrency: int | None = None,
) -> t.AsyncContextManager[
t.AsyncGenerator[TaskSpan[R], None]
]

Runs this task multiple times by mapping over iterable arguments.

Parameters:

  • args (list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.
  • concurrency (int | None, default: None ) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.

Returns:

  • AsyncContextManager[AsyncGenerator[TaskSpan[R], None]] –A TaskSpanList containing the results of each execution.
try_(*args: args, **kwargs: kwargs) -> R | None

Attempt to run the task and return the result. If the task fails, None is returned.

Parameters:

  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task.

Returns:

  • R | None –The output of the task, or None if the task failed.
try_many(
count: int, *args: args, **kwargs: kwargs
) -> list[R]

Attempt to run the task multiple times and return a list of outputs. If any task fails, its result is excluded from the output.

Parameters:

  • count (int) –The number of times to run the task.
  • args (args, default: () ) –The arguments to pass to the task.
  • kwargs (kwargs, default: {} ) –The keyword arguments to pass to the task.

Returns:

  • list[R] –A list of outputs from each task execution.
try_map(
args: list[Any] | dict[str, Any | list[Any]],
*,
concurrency: int | None = None,
) -> list[R]

Attempt to run this task multiple times by mapping over iterable arguments. If any task fails, its result is excluded from the output.

Parameters:

  • args (list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.
  • concurrency (int | None, default: None ) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.

Returns:

  • list[R] –A TaskSpanList containing the results of each execution.
with_(
*,
scorers: ScorersLike[R] | None = None,
assert_scores: Sequence[str]
| Literal[True]
| None = None,
name: str | None = None,
tags: Sequence[str] | None = None,
label: str | None = None,
log_inputs: Sequence[str]
| bool
| Inherited
| None = None,
log_output: bool | Inherited | None = None,
log_execution_metrics: bool | None = None,
append: bool = False,
attributes: AnyDict | None = None,
entrypoint: bool = False,
) -> Task[P, R]

Clone a task and modify its attributes.

Parameters:

  • scorers (ScorersLike[R] | None, default: None ) –A list of new scorers to set or append to the task.
  • assert_scores (Sequence[str] | Literal[True] | None, default: None ) –A list of new assertion names to set or append to the task.
  • name (str | None, default: None ) –The new name for the task.
  • tags (Sequence[str] | None, default: None ) –A list of new tags to set or append to the task.
  • label (str | None, default: None ) –The new label for the task.
  • log_inputs (Sequence[str] | bool | Inherited | None, default: None ) –Log all, or specific, incoming arguments to the function as inputs.
  • log_output (bool | Inherited | None, default: None ) –Log the result of the function as an output.
  • log_execution_metrics (bool | None, default: None ) –Log execution metrics such as success rate and run count.
  • append (bool, default: False ) –If True, appends the new scorers and tags to the existing ones. If False, replaces them.
  • attributes (AnyDict | None, default: None ) –Additional attributes to set or update in the task.
  • entrypoint (bool, default: False ) –Indicate this task should be considered an entrypoint. All compatible arguments will be treated as configurable and a run will be created automatically when called if one is not already active.

Returns:

  • Task[P, R] –A new Task instance with the modified attributes.
TaskSpan(
name: str,
tracer: Tracer,
*,
storage: Storage | None = None,
project: str = "default",
task_id: str | UUID | None = None,
type: SpanType = "task",
attributes: AnyDict | None = None,
label: str | None = None,
params: AnyDict | None = None,
metrics: MetricsDict | None = None,
tags: Sequence[str] | None = None,
arguments: Arguments | None = None,
)

Self-sufficient task span with object storage, metrics, params, and artifacts.

TaskSpan is the primary span type for all operations. It manages its own:

  • Object storage (inputs, outputs, arbitrary objects)
  • Metrics tracking
  • Parameters
  • Artifacts
  • Child tasks

TaskSpans can be nested - a TaskSpan can contain child TaskSpans.

agent_id: str | None

Get the ID of the nearest agent span in the parent chain.

all_tasks: list[TaskSpan[Any]]

Get all tasks, including nested subtasks.

arguments: Arguments | None

Get the arguments used for this task if created from a function.

eval_id: str | None

Get the ID of the nearest evaluation span in the parent chain.

inputs: AnyDict

Get all logged inputs.

metrics: MetricsDict

Get all metrics.

output: R

Get the output of this task if created from a function.

outputs: AnyDict

Get all logged outputs.

params: AnyDict

Get all parameters.

parent_task: TaskSpan[Any] | None

Get the parent task if it exists.

parent_task_id: str

Get the parent task ID if it exists.

root_id: str

Get the root task’s ID (for span grouping/routing).

run_id: str

Alias for root_id (backwards compatibility).

study_id: str | None

Get the ID of the nearest study span in the parent chain.

task_id: str

Get this task’s unique ID.

tasks: list[TaskSpan[Any]]

Get the list of child tasks.

from_context(
context: TaskContext,
tracer: Tracer,
storage: Storage | None = None,
) -> TaskSpan[t.Any]

Continue a task from captured context on a remote host.

get_average_metric_value(key: str) -> float

Get the mean of a metric series.

get_object(hash_: str) -> Object

Get an object by its hash.

link_objects(
object_hash: str,
link_hash: str,
attributes: AnyDict | None = None,
) -> None

Link two objects together.

log_artifact(
local_uri: str | Path, *, name: str | None = None
) -> dict[str, t.Any] | None

Log a file as an artifact.

log_input(
name: str,
value: Any,
*,
label: str | None = None,
attributes: AnyDict | None = None,
) -> str

Log an input value.

log_metric(
name: str,
value: float | bool,
*,
step: int = 0,
origin: Any | None = None,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
prefix: str | None = None,
attributes: JsonDict | None = None,
) -> Metric
log_metric(
name: str,
value: Metric,
*,
origin: Any | None = None,
aggregation: MetricAggMode | None = None,
prefix: str | None = None,
) -> Metric
log_metric(
name: str,
value: float | bool | Metric,
*,
step: int = 0,
origin: Any | None = None,
timestamp: datetime | None = None,
aggregation: MetricAggMode | None = None,
prefix: str | None = None,
attributes: JsonDict | None = None,
) -> Metric

Log a metric value.

log_object(
value: Any,
*,
label: str | None = None,
event_name: str = EVENT_NAME_OBJECT,
attributes: AnyDict | None = None,
) -> str

Store an object and return its hash. Objects are stored but not logged as span events.

log_output(
name: str,
value: Any,
*,
label: str | None = None,
attributes: AnyDict | None = None,
) -> str

Log an output value.

log_param(key: str, value: Any) -> None

Log a single parameter.

log_params(**params: Any) -> None

Log multiple parameters.

Text(text: str, format: str)

Text data type for Dreadnode logging.

Initialize a Text object.

Parameters:

  • text (str) –The text content to log
  • format (str) –The format hint of the text
Transform(
func: TransformCallable[In, Out],
*,
name: str | None = None,
catch: bool = False,
modality: Modality | None = None,
config: dict[str, ConfigInfo] | None = None,
context: dict[str, Context] | None = None,
compliance_tags: dict[str, Any] | None = None,
)

Represents a transformation operation that modifies the input data.

catch = catch

If True, catches exceptions during the transform and attempts to return the original, unmodified object from the input. If False, exceptions are raised.

compliance_tags = compliance_tags or {}

Compliance framework tags (OWASP, ATLAS, SAIF) for this transform.

modality = modality

The data modality this transform operates on (text, image, audio, video).

name = name

The name of the transform, used for reporting and logging.

as_transform(
*,
adapt_in: Callable[[OuterIn], In],
adapt_out: Callable[[Out], OuterOut],
name: str | None = None,
) -> Transform[OuterIn, OuterOut]

Adapt this transform to a different input/output shape.

clone() -> Transform[In, Out]

Clone the transform.

fit(
transform: TransformLike[In, Out],
) -> Transform[In, Out]

Ensures that the provided transform is a Transform instance.

fit_many(
transforms: TransformsLike[In, Out] | None,
) -> list[Transform[In, Out]]

Convert a collection of transform-like objects into a list of Transform instances.

This method provides a flexible way to handle different input formats for transforms, automatically converting callables to Transform objects and applying consistent naming and attributes across all transforms.

Parameters:

  • transforms (TransformsLike[In, Out] | None) –A collection of transform-like objects. Can be:
    • A dictionary mapping names to transform objects or callables
    • A sequence of scorer objects or callables
    • None (returns empty list)

Returns:

  • list[Transform[In, Out]] –A list of Scorer instances with consistent configuration.
rename(new_name: str) -> Transform[In, Out]

Rename the transform.

Parameters:

  • new_name (str) –The new name for the transform.

Returns:

  • Transform[In, Out] –A new Transform with the updated name.
transform(object: In, *args: Any, **kwargs: Any) -> Out

Perform a transform from In to Out.

Parameters:

  • object (In) –The input object to transform.

Returns:

  • Out –The transformed output object.
with_(
*,
name: str | None = None,
catch: bool | None = None,
modality: Modality | None = None,
compliance_tags: dict[str, Any] | None = None,
) -> Transform[In, Out]

Create a new Transform with updated properties.

TrialCandidate(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the candidate of the current trial during an optimization study.

TrialOutput(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the evaluation result of the current trial during an optimization study.

TrialScore(
*, default: Any | Unset = UNSET, required: bool = True
)

Retrieve the score of the current trial during an optimization study.

Video(
data: VideoDataType,
fps: float | None = None,
caption: str | None = None,
format: str | None = None,
width: int | None = None,
height: int | None = None,
)

Video media type for Dreadnode logging.

Supports:

  • Local file paths (str or Path)
  • Numpy array sequences with frame rate
  • Raw bytes with metadata
  • MoviePy VideoClip objects (if installed)

Initialize a Video object.

Parameters:

  • data (VideoDataType) –The video data, which can be:
    • A path to a local video file (str or Path)
    • A numpy array of frames (requires fps)
    • A list of numpy arrays for individual frames (requires fps)
    • Raw bytes
    • A MoviePy VideoClip object (if MoviePy is installed)
  • fps (float | None, default: None ) –Frames per second, required for numpy array input (ignored if data is a file path or raw bytes)
  • caption (str | None, default: None ) –Optional caption for the video
  • format (str | None, default: None ) –Optional format override (mp4, avi, etc.)
  • width (int | None, default: None ) –Optional width in pixels
  • height (int | None, default: None ) –Optional height in pixels
to_serializable() -> tuple[bytes, dict[str, t.Any]]

Convert the video to bytes and return with metadata.

Returns:

  • tuple[bytes, dict[str, Any]] –A tuple of (video_bytes, metadata_dict)
AgentInput(
name: str | None = None,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an input from the nearest agent span.

Parameters:

  • name (str | None, default: None ) –The name of the input. If None, uses the first input logged.
  • default (Any | Unset, default: UNSET ) –A default value if the named input is not found.
  • required (bool, default: True ) –Whether the context is required.
AgentOutput(
name: str = "output",
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an output from the nearest agent span.

Parameters:

  • name (str, default: 'output' ) –The name of the output.
  • default (Any | Unset, default: UNSET ) –A default value if the named output is not found.
  • required (bool, default: True ) –Whether the context is required.
AgentParam(
name: str,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference a parameter from the nearest agent span.

Parameters:

  • name (str) –The name of the parameter.
  • default (Any | Unset, default: UNSET ) –A default value if the named parameter is not found.
  • required (bool, default: True ) –Whether the context is required.
Config(
default: EllipsisType,
*,
key: str | None = None,
help: str | None = None,
description: str | None = None,
expose_as: Any | None = None,
examples: list[Any] | None = None,
gt: float | None = None,
ge: float | None = None,
lt: float | None = None,
le: float | None = None,
min_length: int | None = None,
max_length: int | None = None,
pattern: str | None = None,
alias: str | None = None,
**kwargs: Any,
) -> t.Any
Config(
default: T,
*,
key: str | None = None,
help: str | None = None,
description: str | None = None,
expose_as: Any = None,
examples: list[Any] | None = None,
gt: float | None = None,
ge: float | None = None,
lt: float | None = None,
le: float | None = None,
min_length: int | None = None,
max_length: int | None = None,
pattern: str | None = None,
alias: str | None = None,
**kwargs: Any,
) -> T
Config(
*,
default_factory: Callable[[], T],
key: str | None = None,
help: str | None = None,
description: str | None = None,
expose_as: Any | None = None,
examples: list[Any] | None = None,
gt: float | None = None,
ge: float | None = None,
lt: float | None = None,
le: float | None = None,
min_length: int | None = None,
max_length: int | None = None,
pattern: str | None = None,
alias: str | None = None,
**kwargs: Any,
) -> T
Config(
*,
key: str | None = None,
help: str | None = None,
description: str | None = None,
expose_as: Any | None = None,
examples: list[Any] | None = None,
gt: float | None = None,
ge: float | None = None,
lt: float | None = None,
le: float | None = None,
min_length: int | None = None,
max_length: int | None = None,
pattern: str | None = None,
alias: str | None = None,
**kwargs: Any,
) -> t.Any
Config(
default: Any = ...,
*,
key: str | None = UNSET,
help: str | None = UNSET,
description: str | None = UNSET,
expose_as: Any | None = None,
examples: list[Any] | None = UNSET,
exclude: bool | None = UNSET,
repr: bool = UNSET,
init: bool | None = UNSET,
init_var: bool | None = UNSET,
kw_only: bool | None = UNSET,
gt: SupportsGt | None = UNSET,
ge: SupportsGt | None = UNSET,
lt: SupportsGt | None = UNSET,
le: SupportsGt | None = UNSET,
min_length: int | None = UNSET,
max_length: int | None = UNSET,
pattern: str | None = UNSET,
alias: str | None = UNSET,
**kwargs: Any,
) -> t.Any

Declares a static, configurable parameter.

Parameters:

  • default (Any, default: ... ) –Default value if the field is not set.
  • alias (str | None, default: UNSET ) –The name to use for the attribute when validating or serializing by alias. This is often used for things like converting between snake and camel case.
  • help (str | None, default: UNSET ) –Human-readable help text.
  • description (str | None, default: UNSET ) –Human-readable description (overridden by help)
  • expose_as (Any | None, default: None ) –Override the type that this config value should be annotated as in configuration models.
  • examples (list[Any] | None, default: UNSET ) –Example values for this field.
  • exclude (bool | None, default: UNSET ) –Exclude the field from the model serialization.
  • repr (bool, default: UNSET ) –A boolean indicating whether to include the field in the __repr__ output.
  • init (bool | None, default: UNSET ) –Whether the field should be included in the constructor of the dataclass. (Only applies to dataclasses.)
  • init_var (bool | None, default: UNSET ) –Whether the field should only be included in the constructor of the dataclass. (Only applies to dataclasses.)
  • kw_only (bool | None, default: UNSET ) –Whether the field should be a keyword-only argument in the constructor of the dataclass. (Only applies to dataclasses.)
  • gt (SupportsGt | None, default: UNSET ) –Greater than. If set, value must be greater than this. Only applicable to numbers.
  • ge (SupportsGt | None, default: UNSET ) –Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers.
  • lt (SupportsGt | None, default: UNSET ) –Less than. If set, value must be less than this. Only applicable to numbers.
  • le (SupportsGt | None, default: UNSET ) –Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers.
  • min_length (int | None, default: UNSET ) –Minimum length for iterables.
  • max_length (int | None, default: UNSET ) –Maximum length for iterables.
  • pattern (str | None, default: UNSET ) –Pattern for strings (a regular expression).
  • **kwargs (Any, default: {} ) –Additional keyword arguments forwarded to Pydantic’s Field, including default_factory, coerce_numbers_to_str, strict, multiple_of, allow_inf_nan, max_digits, decimal_places, union_mode, and fail_fast. See the Pydantic Field documentation for full semantics.
EvalInput(
name: str | None = None,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an input from the nearest evaluation span.

Parameters:

  • name (str | None, default: None ) –The name of the input. If None, uses the first input logged.
  • default (Any | Unset, default: UNSET ) –A default value if the named input is not found.
  • required (bool, default: True ) –Whether the context is required.
EvalOutput(
name: str = "output",
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an output from the nearest evaluation span.

Parameters:

  • name (str, default: 'output' ) –The name of the output.
  • default (Any | Unset, default: UNSET ) –A default value if the named output is not found.
  • required (bool, default: True ) –Whether the context is required.
EvalParam(
name: str,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference a parameter from the nearest evaluation span.

Parameters:

  • name (str) –The name of the parameter.
  • default (Any | Unset, default: UNSET ) –A default value if the named parameter is not found.
  • required (bool, default: True ) –Whether the context is required.
StudyInput(
name: str | None = None,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an input from the nearest study span.

Parameters:

  • name (str | None, default: None ) –The name of the input. If None, uses the first input logged.
  • default (Any | Unset, default: UNSET ) –A default value if the named input is not found.
  • required (bool, default: True ) –Whether the context is required.
StudyOutput(
name: str = "output",
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an output from the nearest study span.

Parameters:

  • name (str, default: 'output' ) –The name of the output.
  • default (Any | Unset, default: UNSET ) –A default value if the named output is not found.
  • required (bool, default: True ) –Whether the context is required.
StudyParam(
name: str,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference a parameter from the nearest study span.

Parameters:

  • name (str) –The name of the parameter.
  • default (Any | Unset, default: UNSET ) –A default value if the named parameter is not found.
  • required (bool, default: True ) –Whether the context is required.
TaskInput(
name: str | None = None,
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an input from the current task.

Parameters:

  • name (str | None, default: None ) –The name of the input. If None, uses the first input logged.
  • default (Any | Unset, default: UNSET ) –A default value if the named input is not found.
  • required (bool, default: True ) –Whether the context is required.
TaskOutput(
name: str = "output",
*,
default: Any | Unset = UNSET,
required: bool = True,
) -> TypedSpanContext

Reference an output from the current task.

Parameters:

  • name (str, default: 'output' ) –The name of the output.
  • default (Any | Unset, default: UNSET ) –A default value if the named output is not found.
  • required (bool, default: True ) –Whether the context is required.
configure_logging(
level: LogLevel | None = None,
log_file: Path | None = None,
log_file_level: LogLevel = "debug",
*,
verbose: bool = False,
) -> None

Configure loguru with Rich console output (library/interactive mode).

Parameters:

  • level (LogLevel | None, default: None ) –Console log level. If omitted, defaults to the DREADNODE_LOG_LEVEL env var or info.
  • log_file (Path | None, default: None ) –Optional file path for logging.
  • log_file_level (LogLevel, default: 'debug' ) –Log level for file output.
  • verbose (bool, default: False ) –Enable richer tracebacks and show source paths.
configure_server_logging(
level: LogLevel | None = None,
log_file: Path | str | None = None,
log_file_level: LogLevel = "debug",
) -> None

Configure loguru for server/serve mode (structured, timestamped, no Rich).

Intercepts uvicorn and fastapi stdlib loggers into loguru. Also checks the DREADNODE_LOG_FILE env var for a file sink path.

Parameters:

  • level (LogLevel | None, default: None ) –Console log level. If omitted, defaults to the DREADNODE_LOG_LEVEL env var or info.
  • log_file (Path | str | None, default: None ) –Optional file path for logging. Falls back to DREADNODE_LOG_FILE env var if not provided.
  • log_file_level (LogLevel, default: 'debug' ) –Log level for file output.
get_default_instance() -> Dreadnode

Get the default Dreadnode instance (lazy import to avoid circular dependency).

study_span(
name: str,
*,
label: str | None = None,
tags: list[str] | None = None,
airt_assessment_id: str | None = None,
airt_attack_name: str | None = None,
airt_goal: str | None = None,
airt_goal_category: str | None = None,
airt_category: str | None = None,
airt_sub_category: str | None = None,
airt_transforms: list[str] | None = None,
airt_target_model: str | None = None,
airt_attacker_model: str | None = None,
airt_evaluator_model: str | None = None,
) -> TaskSpan[t.Any]

Create a bare span for optimization study execution.

Events populate all attributes via emit().

Parameters:

  • name (str) –The study name.
  • label (str | None, default: None ) –Human-readable label.
  • tags (list[str] | None, default: None ) –Additional tags.
  • airt_assessment_id (str | None, default: None ) –AIRT assessment ID (for platform linking).
  • airt_attack_name (str | None, default: None ) –AIRT attack name.
  • airt_goal (str | None, default: None ) –AIRT attack goal.
  • airt_goal_category (str | None, default: None ) –AIRT goal category.
  • airt_transforms (list[str] | None, default: None ) –AIRT transforms applied.
  • airt_target_model (str | None, default: None ) –Target model identifier.
  • airt_attacker_model (str | None, default: None ) –Attacker model identifier.
  • airt_evaluator_model (str | None, default: None ) –Evaluator model identifier.

Returns:

  • TaskSpan[Any] –A bare TaskSpan for study execution.
trial_span(
trial_id: str,
*,
step: int,
task_name: str | None = None,
label: str | None = None,
tags: list[str] | None = None,
airt_assessment_id: str | None = None,
airt_trial_index: int | None = None,
airt_attack_name: str | None = None,
airt_goal: str | None = None,
airt_goal_category: str | None = None,
airt_category: str | None = None,
airt_sub_category: str | None = None,
airt_transforms: list[str] | None = None,
airt_target_model: str | None = None,
airt_attacker_model: str | None = None,
airt_evaluator_model: str | None = None,
) -> TaskSpan[t.Any]

Create a bare span for optimization trial.

Events populate all attributes via emit().

Parameters:

  • trial_id (str) –Unique trial identifier.
  • step (int) –Trial number in the study.
  • task_name (str | None, default: None ) –Name of the task being evaluated (for label).
  • label (str | None, default: None ) –Human-readable label.
  • tags (list[str] | None, default: None ) –Additional tags.
  • airt_assessment_id (str | None, default: None ) –AIRT assessment ID (for linking trial to assessment).
  • airt_trial_index (int | None, default: None ) –AIRT trial index within the attack.
  • airt_attack_name (str | None, default: None ) –AIRT attack name.
  • airt_goal (str | None, default: None ) –AIRT attack goal.
  • airt_goal_category (str | None, default: None ) –AIRT goal category.
  • airt_transforms (list[str] | None, default: None ) –AIRT transforms applied.
  • airt_target_model (str | None, default: None ) –Target model identifier.
  • airt_attacker_model (str | None, default: None ) –Attacker model identifier.
  • airt_evaluator_model (str | None, default: None ) –Evaluator/judge model identifier.

Returns:

  • TaskSpan[Any] –A bare TaskSpan for trial execution.