dreadnode
Top-level Python API for the Dreadnode SDK.
TraceBackend
Section titled “TraceBackend”TraceBackend = Literal['local', 'remote']Controls remote OTLP streaming.
"local"— local JSONL only. No OTLP streaming."remote"— local JSONL and OTLP streaming.None(default) — Auto-detect: stream if credentials exist.
Local JSONL is always populated regardless of this setting.
Audio( data: AudioDataType, sample_rate: int | None = None, caption: str | None = None, format: str | None = None,)Audio media type for Dreadnode logging.
Supports:
- Local file paths (str or Path)
- Numpy arrays with sample rate
- Raw bytes
Initialize an Audio object.
Parameters:
data(AudioDataType) –The audio data, which can be:- A path to a local audio file (str or Path)
- A numpy array (requires sample_rate)
- Raw bytes
sample_rate(int | None, default:None) –Required when using numpy arrayscaption(str | None, default:None) –Optional caption for the audioformat(str | None, default:None) –Optional format to use (default is wav for numpy arrays)
to_serializable
Section titled “to_serializable”to_serializable() -> tuple[t.Any, dict[str, t.Any]]Serialize the audio data to bytes and return with metadata. Returns: A tuple of (audio_bytes, metadata_dict)
Code(text: str, language: str = '')Hint type for code-formatted text.
This is a subclass of Text with format set to “code”.
Example
log_output("code_snippet", Code("print('Hello, World!')", language="python"))CurrentRun
Section titled “CurrentRun”CurrentRun( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the current task span from the current context (backwards compat alias).
CurrentTask
Section titled “CurrentTask”CurrentTask( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the current task span from the current context.
CurrentTrial
Section titled “CurrentTrial”CurrentTrial( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the current trial during an optimization study.
Dataset
Section titled “Dataset”Dataset( name: str, storage: Storage | None = None, version: str | None = None,)Published dataset loader backed by local storage manifests.
DatasetField
Section titled “DatasetField”DatasetField( name: str, *, default: Any | Unset = UNSET, required: bool = True,)A Context marker for a value from the full dataset sample row for the current evaluation task.
Dreadnode
Section titled “Dreadnode”Dreadnode()The core Dreadnode SDK class.
A default instance is created and can be used directly with dreadnode.*.
Otherwise, create your own instance with Dreadnode().configure().
can_sync
Section titled “can_sync”can_sync: boolWhether remote sync is possible (has credentials).
session
Section titled “session”session: ProfileDeprecated alias for :attr:profile.
build_package
Section titled “build_package”build_package(path: str | Path) -> BuildResultBuild a local repository into an OCI image.
Parameters:
path(str | Path) –Path to a dataset, model, or environment package project.
Returns:
BuildResult–BuildResult with success status and OCI image.
change_workspace
Section titled “change_workspace”change_workspace(workspace: str | UUID) -> WorkspaceChange the current workspace within the current organization.
This re-resolves the workspace and updates the storage paths accordingly. The organization remains unchanged.
Parameters:
workspace(str | UUID) –The workspace name, key, or uuid.UUID to switch to.
Returns:
Workspace–The resolved Workspace object.
Raises:
RuntimeError–If not configured or workspace not found.
configure
Section titled “configure”configure( *, server: str | None = None, api_key: str | None = None, organization: str | UUID | None = None, workspace: str | UUID | None = None, project: str | UUID | None = None, cache: Path | str | None = None, storage_provider: StorageProvider | None = None, trace_backend: TraceBackend | None = None, console: ConsoleOptions | bool | None = None, otel_scope: str = "dreadnode",) -> DreadnodeConfigure the Dreadnode SDK.
Credential resolution follows profile precedence: explicit args > environment variables > saved profile defaults.
Parameters:
server(str | None, default:None) –Platform API URL.api_key(str | None, default:None) –API key for authentication.organization(str | UUID | None, default:None) –Organization key/UUID override.workspace(str | UUID | None, default:None) –Workspace key/UUID override.project(str | UUID | None, default:None) –Project key/UUID override.cache(Path | str | None, default:None) –Local cache directory (default: ~/.dreadnode).storage_provider(StorageProvider | None, default:None) –Remote storage provider (s3, r2, minio). Auto-detected if not specified.trace_backend(TraceBackend | None, default:None) –Controls remote OTLP streaming.console(ConsoleOptions | bool | None, default:None) –Log span information to the console.otel_scope(str, default:'dreadnode') –The OpenTelemetry scope name.
Returns:
Dreadnode–Configured Dreadnode SDK instance.
continue_task
Section titled “continue_task”continue_task(task_context: TaskContext) -> TaskSpan[t.Any]Continue a task from captured context on a remote host.
Parameters:
task_context(TaskContext) –The TaskContext captured from get_task_context().
Returns:
TaskSpan[Any]–A TaskSpan object that can be used as a context manager.
evaluation
Section titled “evaluation”evaluation( func: Callable[..., Any] | None = None, /, *, dataset: Any | None = None, dataset_file: str | None = None, name: str | None = None, description: str = "", tags: list[str] | None = None, concurrency: int = 1, iterations: int = 1, max_errors: int | None = None, max_consecutive_errors: int = 10, dataset_input_mapping: list[str] | dict[str, str] | None = None, parameters: dict[str, list[Any]] | None = None, scorers: ScorersLike[Any] | None = None, assert_scores: list[str] | Literal[True] | None = None,) -> t.AnyDecorator to create an Evaluation from a function. See evaluation() for details.
get_current_run
Section titled “get_current_run”get_current_run() -> TaskSpan[t.Any] | NoneGet the current task span (backwards compatibility alias).
get_current_task
Section titled “get_current_task”get_current_task() -> TaskSpan[t.Any] | NoneGet the current task span.
get_task_context
Section titled “get_task_context”get_task_context() -> TaskContextCapture the current task context for transfer to another host, thread, or process.
Use continue_task() to continue the task anywhere else.
Returns:
TaskContext–TaskContext containing task state and trace propagation headers.
Raises:
RuntimeError–If called outside of an active task.
get_tracer
Section titled “get_tracer”get_tracer(*, is_span_tracer: bool = True) -> TracerGet an OpenTelemetry Tracer instance.
Parameters:
is_span_tracer(bool, default:True) –Whether the tracer is for creating spans.
Returns:
Tracer–An OpenTelemetry Tracer.
link_objects
Section titled “link_objects”link_objects( origin: Any, link: Any, attributes: AnyDict | None = None,) -> NoneAssociate two runtime objects with each other.
This is useful for linking any two objects which are related to each other, such as a model and its training data, or an input prompt and the resulting output.
Example
with dreadnode.run("my_run"): model = SomeModel() data = SomeData()
dreadnode.link_objects(model, data)Parameters:
origin(Any) –The origin object to link from.link(Any) –The linked object to link to.attributes(AnyDict | None, default:None) –Additional attributes to attach to the link.
list_agents
Section titled “list_agents”list_agents(org: str | None = None) -> list[PackageInfo]List agents in a workspace.
Parameters:
org(str | None, default:None) –Organization key. Uses configured org if not provided.
Returns:
list[PackageInfo]–List of agent PackageInfo.
list_projects
Section titled “list_projects”list_projects( org: str | None = None, workspace: str | None = None) -> list[Project]List projects in a workspace.
Parameters:
org(str | None, default:None) –Organization key. Uses configured org if not provided.workspace(str | None, default:None) –Workspace key. Uses configured workspace if not provided.
Returns:
list[Project]–List of projects.
list_registry
Section titled “list_registry”list_registry( project_type: PackageType, *, org: str | None = None) -> list[PackageInfo]List packages available in the registry.
Currently lists packages from local storage. Remote registry support will be added when the API endpoint is available.
Parameters:
project_type(PackageType) –Type of package to list (datasets, models, tools, agents, environments).org(str | None, default:None) –Organization to filter
Returns:
list[PackageInfo]–List of PackageInfo objects.
list_workspaces
Section titled “list_workspaces”list_workspaces(org: str | None = None) -> list[Workspace]List workspaces the user has access to.
Parameters:
org(str | None, default:None) –Organization key. Uses configured org if not provided.
Returns:
list[Workspace]–List of workspaces.
load_capability
Section titled “load_capability”load_capability(capability: str | Path) -> CapabilityLoad a capability from an explicit path or from the configured capability search paths.
Returns a high-level Capability object that exposes the serialized capability
manifest plus resolved agents, tools, skills, and MCP server definitions.
Parameters:
capability(str | Path) –Capability directory path or capability name.
Returns:
Capability–Capability ready to attach to an agent or server runtime.
Raises:
FileNotFoundError–If no capability with the requested name can be found.
load_dataset
Section titled “load_dataset”load_dataset( path: str | Path, config: str | None = None, *, dataset_name: str | None = None, split: str | None = None, format: Literal[ "parquet", "arrow", "feather" ] = "parquet", version: str | None = None, **kwargs: Any,) -> t.AnyLoad a dataset from HuggingFace Hub or a local dataset source directory.
Parameters:
path(str | Path) –HuggingFace dataset path (e.g., “squad”, “imdb”, “glue”) or a local directory containing dataset.yaml.config(str | None, default:None) –Dataset configuration name (e.g., “cola” for glue dataset).dataset_name(str | None, default:None) –Name to store the dataset as locally. Defaults to the path.split(str | None, default:None) –Dataset split to load (e.g., “train”, “test”, “train[:100]”).format(Literal['parquet', 'arrow', 'feather'], default:'parquet') –Storage format (parquet, arrow, feather).version(str | None, default:None) –Version string for the stored dataset.**kwargs(Any, default:{}) –Additional arguments passed to HuggingFace’s load_dataset.
Returns:
Any–LocalDataset instance with the loaded data.
Example
import dreadnode as dn dn.configure(…) ds = dn.load_dataset(“glue”, “cola”, split=“train[:100]“)
load_model
Section titled “load_model”load_model( path: str | Path, *, model_name: str | None = None, task: str | None = None, format: Literal[ "safetensors", "pytorch" ] = "safetensors", version: str | None = None, **kwargs: Any,) -> t.AnyLoad a model from HuggingFace Hub or a local model source directory.
Parameters:
path(str | Path) –HuggingFace model path (e.g., “bert-base-uncased”, “gpt2”) or a local directory containing model.yaml.model_name(str | None, default:None) –Name to store the model as locally. Defaults to the path.task(str | None, default:None) –Task type for the model (e.g., “classification”, “generation”).format(Literal['safetensors', 'pytorch'], default:'safetensors') –Storage format (safetensors or pytorch).version(str | None, default:None) –Version string for the stored model.**kwargs(Any, default:{}) –Additional arguments passed to from_pretrained.
Returns:
Any–LocalModel instance with the loaded model.
Example
import dreadnode as dn dn.configure(…) model = dn.load_model(“bert-base-uncased”, task=“classification”)
load_package
Section titled “load_package”load_package( uri: str | Path | None = None, type: PackageType | None = None,) -> t.AnyLoad a package (dataset, model, or agent) from the server.
Downloads and installs the package if not already installed, then loads it via entry points. Artifacts are fetched from CAS on demand.
Parameters:
uri(str | Path | None, default:None) –Package URI (e.g., “dataset://org/name”, “model://org/name”).type(PackageType | None, default:None) –Package type hint if not specified in URI.
Returns:
Any–The loaded package object (Dataset, Model, or Agent).
log_artifact
Section titled “log_artifact”log_artifact( local_uri: str | Path, *, name: str | None = None) -> NoneLog a file or directory artifact to the current run.
This stores the artifact in the workspace CAS and uploads it to remote storage. Artifact metadata is recorded in artifacts.jsonl for tracking.
Examples:
Log a single file:
with dreadnode.run("my_run"): # Save a file with open("results.json", "w") as f: json.dump(results, f)
# Log it as an artifact dreadnode.log_artifact("results.json")Log a directory:
with dreadnode.run("my_run"): # Create a directory with model files os.makedirs("model_output", exist_ok=True) save_model("model_output/model.pkl") save_config("model_output/config.yaml")
# Log the entire directory as an artifact dreadnode.log_artifact("model_output")Parameters:
local_uri(str | Path) –The local path to the file or directory to upload.name(str | None, default:None) –Optional name for the artifact (defaults to filename).
log_input
Section titled “log_input”log_input( name: str, value: Any, *, label: str | None = None, attributes: AnyDict | None = None,) -> NoneLog a single input to the current span.
Inputs can be any runtime object, which are serialized, stored, and tracked in the Dreadnode UI.
Parameters:
name(str) –The name of the input.value(Any) –The input value to log.label(str | None, default:None) –Optional display label.attributes(AnyDict | None, default:None) –Optional additional attributes.
Example
@dreadnode.taskasync def my_task(x: int) -> int: dreadnode.log_input("input_name", x) return x * 2log_inputs
Section titled “log_inputs”log_inputs(**inputs: Any) -> NoneLog multiple inputs to the current span.
See log_input() for more details.
log_metric
Section titled “log_metric”log_metric( name: str, value: float | bool, *, step: int = 0, origin: Any | None = None, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, attributes: AnyDict | None = None,) -> Metriclog_metric( name: str, value: Metric, *, origin: Any | None = None, aggregation: MetricAggMode | None = None,) -> Metriclog_metric( name: str, value: float | bool | Metric, *, step: int = 0, origin: Any | None = None, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, attributes: AnyDict | None = None,) -> MetricLog a single metric to the current task or run.
Metrics are some measurement or recorded value related to the task or run. They can be used to track performance, resource usage, or other quantitative data.
Examples:
With a raw value:
with dreadnode.run("my_run"): dreadnode.log_metric("accuracy", 0.95, step=10) dreadnode.log_metric("loss", 0.05, step=10, aggregation="min")With a Metric object:
with dreadnode.run("my_run"): metric = Metric(0.95, step=10, timestamp=datetime.now(timezone.utc)) dreadnode.log_metric("accuracy", metric)Parameters:
name(str) –The name of the metric.value(float | bool | Metric) –The value of the metric, either as a raw float/bool or a Metric object.step(int, default:0) –The step of the metric.origin(Any | None, default:None) –The origin of the metric - can be provided any object which was logged as an input or output anywhere in the run.timestamp(datetime | None, default:None) –The timestamp of the metric - defaults to the current time.aggregation(MetricAggMode | None, default:None) –The aggregation to use for the metric. Helpful when you want to let the library take care of translating your raw values into better representations.- direct: do not modify the value at all (default)
- min: the lowest observed value reported for this metric
- max: the highest observed value reported for this metric
- avg: the average of all reported values for this metric
- sum: the cumulative sum of all reported values for this metric
- count: increment every time this metric is logged - disregard value
attributes(AnyDict | None, default:None) –A dictionary of additional attributes to attach to the metric.
Returns:
Metric–The logged metric object.
log_metrics
Section titled “log_metrics”log_metrics( metrics: dict[str, float | bool], *, step: int = 0, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, attributes: AnyDict | None = None, origin: Any | None = None,) -> list[Metric]log_metrics( metrics: list[MetricDict], *, step: int = 0, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, attributes: AnyDict | None = None, origin: Any | None = None,) -> list[Metric]log_metrics( metrics: MetricsLike, *, step: int = 0, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, attributes: AnyDict | None = None, origin: Any | None = None,) -> list[Metric]Log multiple metrics to the current task or run.
Examples:
Log metrics from a dictionary:
dreadnode.log_metrics( { "accuracy": 0.95, "loss": 0.05, "f1_score": 0.92 }, step=10)Log metrics from a list of MetricDicts:
dreadnode.log_metrics( [ {"name": "accuracy", "value": 0.95}, {"name": "loss", "value": 0.05, "aggregation": "min"} ], step=10)Parameters:
metrics(MetricsLike) –Either a dictionary of name/value pairs or a list of MetricDicts to log.step(int, default:0) –Default step value for metrics if not supplied.timestamp(datetime | None, default:None) –Default timestamp for metrics if not supplied.aggregation(MetricAggMode | None, default:None) –Default aggregation for metrics if not supplied.attributes(AnyDict | None, default:None) –Default attributes for metrics if not supplied.origin(Any | None, default:None) –The origin of the metrics - can be provided any object which was logged as an input or output anywhere in the run.
Returns:
list[Metric]–List of logged Metric objects.
log_output
Section titled “log_output”log_output( name: str, value: Any, *, label: str | None = None, attributes: AnyDict | None = None,) -> NoneLog a single output to the current span.
Outputs can be any runtime object, which are serialized, stored, and tracked in the Dreadnode UI.
Parameters:
name(str) –The name of the output.value(Any) –The value of the output.label(str | None, default:None) –An optional label for the output, useful for filtering in the UI.attributes(AnyDict | None, default:None) –Additional attributes to attach to the output.
Example
@dreadnode.taskasync def my_task(x: int) -> int: result = x * 2 dreadnode.log_output("result", result) return resultlog_outputs
Section titled “log_outputs”log_outputs(**outputs: Any) -> NoneLog multiple outputs to the current span.
See log_output() for more details.
log_param
Section titled “log_param”log_param(key: str, value: JsonValue) -> NoneLog a single parameter to the current run.
Parameters are key-value pairs that are associated with the run and can be used to track configuration values, hyperparameters, or other metadata.
Example
with dreadnode.run("my_run"): dreadnode.log_param("param_name", "param_value")Parameters:
key(str) –The name of the parameter.value(JsonValue) –The value of the parameter.
log_params
Section titled “log_params”log_params(**params: JsonValue) -> NoneLog multiple parameters to the current run.
Parameters are key-value pairs that are associated with the run and can be used to track configuration values, hyperparameters, or other metadata.
Example
with dreadnode.run("my_run"): dreadnode.log_params( param1="value1", param2="value2" )Parameters:
**params(JsonValue, default:{}) –The parameters to log. Each parameter is a key-value pair.
log_sample
Section titled “log_sample”log_sample( label: str, input: Any, output: Any, metrics: MetricsLike | None = None, *, step: int = 0,) -> NoneConvenience method to log an input/output pair with metrics as a ephemeral task.
This is useful for logging a single sample of input and output data along with any metrics that were computed during the process.
log_samples
Section titled “log_samples”log_samples( name: str, samples: list[ tuple[Any, Any] | tuple[Any, Any, MetricsLike] ],) -> NoneLog multiple input/output samples as ephemeral tasks.
This is useful for logging a batch of input/output pairs with metrics in a single run.
Example
dreadnode.log_samples( "my_samples", [ (input1, output1, {"accuracy": 0.95}), (input2, output2, {"accuracy": 0.90}), ])Parameters:
name(str) –The name of the task to create for each sample.samples(list[tuple[Any, Any] | tuple[Any, Any, MetricsLike]]) –A list of tuples containing (input, output, metrics [optional]).
login( server: str, api_key: str, organization: str | UUID, *, workspace: str | UUID | None = None, project: str | UUID | None = None, cache: Path | str | None = None, set_default_workspace: bool = True, set_default_project: bool = True,) -> OrganizationLogin to a Dreadnode server and save credentials to profile.
Authenticates with the server, resolves the organization, and saves the profile to ~/.dreadnode/config.yaml for future use.
Parameters:
server(str) –The Dreadnode server URL.api_key(str) –The Dreadnode API key.organization(str | UUID) –Organization key or ID to login to.workspace(str | UUID | None, default:None) –Default workspace to use.project(str | UUID | None, default:None) –Default project to use.cache(Path | str | None, default:None) –Local cache directory (default: ~/.dreadnode).set_default_workspace(bool, default:True) –Save workspace as default in profile.set_default_project(bool, default:True) –Save project as default in profile.
Returns:
Organization–The resolved Organization.
Raises:
RuntimeError–If authentication fails or organization not found.
optimize_anything
Section titled “optimize_anything”optimize_anything( *, evaluator: Callable[..., Any] | None = None, seed_candidate: str | dict[str, str] | None = None, dataset: list[Any] | None = None, trainset: list[Any] | None = None, valset: list[Any] | None = None, objective: str | None = None, background: str | None = None, name: str | None = None, description: str = "", tags: list[str] | None = None, config: OptimizationConfig | None = None, backend: str | OptimizationBackend[Any] = "gepa", adapter: OptimizationAdapter[Any] | None = None,) -> t.AnyCreate an optimize_anything executor. See optimize_anything() for details.
pull_package
Section titled “pull_package”pull_package( packages: list[str], *, upgrade: bool = False) -> PullResultDownload packages from the registry.
Parameters:
packages(list[str]) –Package names to install.upgrade(bool, default:False) –Upgrade if already installed.
Returns:
PullResult–PullResult with status.
push_capability
Section titled “push_capability”push_capability( capability: str | Path, *, name: str | None = None, skip_upload: bool = False, force: bool = False, publish: bool = False,) -> CapabilityPushResultBuild and push a capability directory to the OCI registry.
Before pushing, compares the local build SHA-256 against the remote.
If the version already exists with the same content, the push is skipped.
If the version exists with different content, an error is raised unless
force=True.
Parameters:
capability(str | Path) –Capability directory path or resolvable local capability name.name(str | None, default:None) –Optional OCI repository name override. Bare names are prefixed with the active organization when available.skip_upload(bool, default:False) –Skip uploading to remote and only validate/build locally.force(bool, default:False) –Push even if the version already exists with different content.publish(bool, default:False) –Ensure the capability is public after upload or skip.
Returns:
CapabilityPushResult–Push result with status and details.
push_dataset
Section titled “push_dataset”push_dataset( dataset: str | Path, *, name: str | None = None, skip_upload: bool = False, publish: bool = False,) -> PushResultBuild and push a dataset source directory to the OCI registry.
push_environment
Section titled “push_environment”push_environment( environment: str | Path, *, name: str | None = None, skip_upload: bool = False, force: bool = False, publish: bool = False,) -> PushResultBuild and push an environment directory with task.yaml to the OCI registry.
Before pushing, compares the local build SHA-256 against the remote.
If the task already exists with the same content, the push is skipped
unless force=True.
Parameters:
environment(str | Path) –Task directory path containing task.yaml.name(str | None, default:None) –Optional OCI repository name override. Bare names are prefixed with the active organization when available.skip_upload(bool, default:False) –Skip uploading to remote and only build locally.force(bool, default:False) –Push even if the remote SHA matches.publish(bool, default:False) –Ensure the task is public after upload or skip.
Returns:
PushResult–Push result with success status and details.
push_model
Section titled “push_model”push_model( model: str | Path, *, name: str | None = None, skip_upload: bool = False, publish: bool = False,) -> PushResultBuild and push a model source directory to the OCI registry.
push_package
Section titled “push_package”push_package( path: str | Path, *, skip_upload: bool = False) -> PushResultBuild and push a local package to the Dreadnode OCI Registry.
Handles artifact upload to CAS (for datasets/models) and OCI image push automatically.
Parameters:
path(str | Path) –Path to a dataset, model, or environment package project.skip_upload(bool, default:False) –Skip uploading to remote (local only).
Returns:
PushResult–PushResult with status and details.
push_update
Section titled “push_update”push_update() -> NonePush any pending run data to the server before run completion.
This is useful for ensuring that the UI is up to date with the latest data. Data is automatically pushed periodically, but you can call this method to force a push.
Example
with dreadnode.run("my\_run"):dreadnode.log\_params(...)dreadnode.log\_metric(...)dreadnode.push\_update()
```python# do more workrun( name: str | None = None, *, tags: Sequence[str] | None = None, params: AnyDict | None = None, project: str | None = None, name_prefix: str | None = None, attributes: AnyDict | None = None, _tracer: Tracer | None = None,) -> TaskSpan[t.Any]Create a new top-level task span.
This sets up trace infrastructure and creates a task span that can contain agents, evaluations, studies, or other work.
Example
with dreadnode.run("my_experiment"): # Run an agent, evaluation, or other work await agent.run("do something")Parameters:
name(str | None, default:None) –The name of the task. If not provided, a random name will be generated.tags(Sequence[str] | None, default:None) –A list of tags to attach to the task.params(AnyDict | None, default:None) –A dictionary of parameters to attach to the task.project(str | None, default:None) –The project name to associate with. If not provided, the project passed toconfigure()will be used, or a default project will be used.attributes(AnyDict | None, default:None) –Additional attributes to attach to the span.
Returns:
TaskSpan[Any]–A TaskSpan object that can be used as a context manager.
scorer
Section titled “scorer”scorer( func: Callable[..., Any] | None = None, *, name: str | None = None, assert_: bool = False, attributes: AnyDict | None = None,) -> t.AnyCreate a scorer decorator. See scorer() for details.
serve( host: str | None = None, port: int | None = None) -> NoneStart the agent server.
This starts a FastAPI server that provides REST + WebSocket endpoints for agent communication.
Parameters:
host(str | None, default:None) –Host to bind to. Defaults to DREADNODE_RUNTIME_HOST (legacy: DREADNODE_SERVER_HOST) or 127.0.0.1.port(int | None, default:None) –Port to bind to. Defaults to DREADNODE_RUNTIME_PORT (legacy: DREADNODE_SERVER_PORT) or 8787.
Example
import dreadnode as dndn.configure()dn.serve(port=8787)set_capability_visibility
Section titled “set_capability_visibility”set_capability_visibility( org: str, name: str, *, is_public: bool) -> NoneUpdate capability visibility for all versions of a capability name.
set_dataset_visibility
Section titled “set_dataset_visibility”set_dataset_visibility( org: str, name: str, *, is_public: bool) -> NoneUpdate dataset visibility for all versions of a dataset name.
set_model_visibility
Section titled “set_model_visibility”set_model_visibility( org: str, name: str, *, is_public: bool) -> NoneUpdate model visibility for all versions of a model name.
set_task_visibility
Section titled “set_task_visibility”set_task_visibility( org: str, name: str, *, is_public: bool) -> NoneUpdate task visibility for all versions of a task name.
shutdown
Section titled “shutdown”shutdown() -> NoneShutdown any associate OpenTelemetry components and flush any pending spans.
It is not required to call this method, as the SDK will automatically flush and shutdown when the process exits.
However, if you want to ensure that all spans are flushed before exiting, you can call this method manually.
span( name: str, *, tags: Sequence[str] | None = None, attributes: AnyDict | None = None,) -> SpanCreate a new OpenTelemety span.
Spans are more lightweight than tasks, but still let you track work being performed and view it in the UI. You cannot log parameters, inputs, or outputs to spans.
Example
with dreadnode.span("my_span") as span: # do some work here passParameters:
name(str) –The name of the span.tags(Sequence[str] | None, default:None) –A list of tags to attach to the span.attributes(AnyDict | None, default:None) –A dictionary of attributes to attach to the span.
Returns:
Span–A Span object.
study( func: Callable[..., Any] | None = None, /, *, name: str | None = None, search_strategy: Any | None = None, dataset: Any | None = None, dataset_file: str | None = None, objectives: ScorersLike[Any] | None = None, directions: list[Direction] | None = None, constraints: ScorersLike[Any] | None = None, max_trials: int = 100, concurrency: int = 1, stop_conditions: list[Any] | None = None,) -> t.AnyDecorator to create a Study from a task factory. See study() for details.
sync_capabilities
Section titled “sync_capabilities”sync_capabilities( directory: str | Path, *, force: bool = False, publish: bool = False, on_progress: Callable[[str, str, str | None], None] | None = None,) -> CapabilitySyncResultSync capabilities from a directory to the platform.
Discovers all capabilities (directories containing capability.yaml),
compares each against the latest remote version by SHA-256, and pushes
only those that have changed. Optionally publishes them to the public
catalog.
To push a single capability, use :meth:push_capability instead.
Parameters:
directory(str | Path) –Root directory containing capability subdirectories.force(bool, default:False) –Upload even when the remote SHA matches.publish(bool, default:False) –Ensureis_public=Trueafter upload or skip.
Returns:
CapabilitySyncResult–class:CapabilitySyncResultwith uploaded/skipped/failed details.
sync_environments
Section titled “sync_environments”sync_environments( directory: str | Path, *, force: bool = False, publish: bool = False, max_workers: int = 8, on_progress: Callable[[str, str, str | None], None] | None = None, on_status: Callable[[str], None] | None = None,) -> EnvironmentSyncResultSync task environments from a directory to the platform.
Discovers all subdirectories containing task.yaml, compares each
against the exact remote version by OCI layer SHA-256, and pushes
only those that have changed.
Parameters:
directory(str | Path) –Root directory containing task subdirectories.force(bool, default:False) –Upload even when the remote SHA matches.publish(bool, default:False) –Ensureis_public=Trueafter upload or skip.max_workers(int, default:8) –Maximum parallel build/upload threads.on_progress(Callable[[str, str, str | None], None] | None, default:None) –Optional callback(name, status, error)for each task.
Returns:
EnvironmentSyncResult–class:EnvironmentSyncResultwith uploaded/skipped/failed details.
tag(*tag: str) -> NoneAdd one or many tags to the current span.
Example
with dreadnode.run("my_run"): dreadnode.tag("my_tag")Parameters:
tag(str, default:()) –The tag(s) to attach.
task( func: Callable[P, Awaitable[R]] | Callable[P, R] | None = None, /, *, scorers: ScorersLike[Any] | None = None, name: str | None = None, label: str | None = None, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, log_execution_metrics: bool = False, tags: Sequence[str] | None = None, attributes: AnyDict | None = None, entrypoint: bool = False,) -> TaskDecorator | ScoredTaskDecorator[R] | Task[P, R]Create a new task from a function. See task() for details.
task_and_run
Section titled “task_and_run”task_and_run( name: str, *, task_name: str | None = None, task_type: SpanType = "task", project: str | None = None, tags: Sequence[str] | None = None, params: AnyDict | None = None, inputs: AnyDict | None = None, label: str | None = None, _tracer: Tracer | None = None,) -> t.Iterator[TaskSpan[t.Any]]Create a task span, setting up trace infrastructure if needed.
If no trace context exists, this sets up exporters and creates the span as a top-level span. The span type (evaluation, study, agent, etc.) becomes the root of the trace.
Parameters:
name(str) –Name for the task span.task_name(str | None, default:None) –Optional separate name for the task span. If not provided, uses name.task_type(SpanType, default:'task') –The type of span to create (task, evaluation, study, agent, etc.).project(str | None, default:None) –Project for trace storage.tags(Sequence[str] | None, default:None) –Tags to attach to the span.params(AnyDict | None, default:None) –Parameters to log.inputs(AnyDict | None, default:None) –Inputs to log.label(str | None, default:None) –Display label for the span.
task_env
Section titled “task_env”task_env( task_ref: str, *, inputs: dict[str, Any] | None = None, secret_ids: list[str] | None = None, project_id: str | None = None, timeout_sec: int | None = None,) -> TaskEnvironmentConstruct a TaskEnvironment bound to this profile’s org/workspace.
The environment is not provisioned until setup() (or async with)
is called. Pulls api_client/organization/workspace from the
active profile.
Example::
import dreadnode as dn
await env.execute("curl -sS $web_url/login")task_span
Section titled “task_span”task_span( name: str, *, type: SpanType = "task", label: str | None = None, tags: Sequence[str] | None = None, attributes: AnyDict | None = None, _tracer: Tracer | None = None,) -> TaskSpan[t.Any]Create a task span without an explicit associated function.
This is useful for creating tasks on the fly without having to define a function.
Example
async with dreadnode.task_span("my_task") as task: # do some work here passArgs: name: The name of the task. type: The type of span (task, evaluation, etc.). label: The label of the task - useful for filtering in the UI. tags: A list of tags to attach to the task span. attributes: A dictionary of attributes to attach to the task span.
Returns:
TaskSpan[Any]–A TaskSpan object.
train( config: str | Path | dict[str, Any], *, prompts: list[str] | None = None, reward_fn: Callable[[list[str], list[str]], list[float]] | None = None, scorers: ScorersLike[Any] | None = None,) -> t.AnyTrain a model using a YAML configuration file.
This is the main entry point for training LLMs with GRPO, SFT, DPO, PPO, or other training methods supported by the Ray training framework.
Example YAML config (grpo.yaml):
trainer: grpomodel\_name: Qwen/Qwen2.5-1.5B-Instructmax\_steps: 100num\_prompts\_per\_step: 4num\_generations\_per\_prompt: 4learning\_rate: 1e-6temperature: 0.7
```python# Dataset - supports dreadnode datasets, huggingface, jsonl, or inlinedataset: type: dreadnode # or huggingface, jsonl, list name: my-dataset # dreadnode dataset name prompt_field: question
# Reward - supports dreadnode scorers or built-in typesreward: type: scorer # Use dreadnode scorer # or type: correctness, length, containsUsage
```pythonimport dreadnode as dn
# Train from YAML configresult = dn.train("config/grpo.yaml")
# Train with dreadnode dataset and scorers@dn.scorerdef correctness(completion: str) -> float: return 1.0 if "answer" in completion else 0.0
result = dn.train( {"trainer": "grpo", "model_name": "..."}, prompts=dn.load("my-dataset").to_prompts("question"), scorers=[correctness],)
# Train with custom prompts and reward functionresult = dn.train( "config/grpo.yaml", prompts=["What is 2+2?", "What is 3*4?"], reward_fn=my_reward_fn,)Parameters:
config(str | Path | dict[str, Any]) –Path to YAML config file, or dict with config values.prompts(list[str] | None, default:None) –Optional list of prompts (overrides dataset in config).reward_fn(Callable[[list[str], list[str]], list[float]] | None, default:None) –Optional reward function (overrides reward/scorers).scorers(ScorersLike[Any] | None, default:None) –Optional dreadnode Scorers to use as reward (converted to reward_fn).
Returns:
Any–Training result (trainer-specific).
DreadnodeAgentAdapter
Section titled “DreadnodeAgentAdapter”Adapter that evaluates agent instruction candidates with Evaluation.
apply_candidate
Section titled “apply_candidate”apply_candidate(candidate: dict[str, str]) -> AgentClone the agent and apply an instruction-only candidate.
evaluate
Section titled “evaluate”evaluate( batch: list[dict[str, Any]], candidate: dict[str, str], *, capture_traces: bool = False,) -> OptimizationEvaluationBatchEvaluate one batch of examples and return per-example scores.
evaluate_candidate
Section titled “evaluate_candidate”evaluate_candidate( candidate: dict[str, str], example: dict[str, Any] | None = None,) -> OptimizationEvaluationEvaluate one candidate in a GEPA-compatible (score, side_info) shape.
make_reflective_dataset
Section titled “make_reflective_dataset”make_reflective_dataset( candidate: dict[str, str], eval_batch: OptimizationEvaluationBatch, components_to_update: list[str],) -> dict[str, list[dict[str, t.Any]]]Build component-scoped reflective data for GEPA.
seed_candidate
Section titled “seed_candidate”seed_candidate() -> dict[str, str]Return the current instruction candidate for this agent.
EnvVar
Section titled “EnvVar”EnvVar( name: str, *, default: Any | Unset = UNSET, required: bool = True,)A Context marker for an environment variable.
Evaluation
Section titled “Evaluation”Evaluation of a task against a dataset.
Attributes:
task(Task[..., Out] | str) –The task to evaluate.dataset(Any | None) –The dataset to use for the evaluation.dataset_file(FilePath | str | None) –File path of a JSONL, CSV, JSON, or YAML dataset.name(str) –The name of the evaluation.dataset_input_mapping(list[str] | dict[str, str] | None) –Mapping from dataset keys to task parameter names.preprocessor(InputDatasetProcessor | None) –Optional preprocessor for the dataset.scorers(ScorersLike[Out]) –Scorers to evaluate task output.assert_scores(list[str] | Literal[True]) –Scores to assert are truthy.trace(bool) –Whether to produce trace contexts.
max_consecutive_errors
Section titled “max_consecutive_errors”max_consecutive_errors: int | None = Config(default=10)Maximum consecutive errors before stopping the evaluation.
max_errors
Section titled “max_errors”max_errors: int | None = Config(default=None)Maximum total errors before stopping the evaluation.
console
Section titled “console”console() -> EvalResult[In, Out]Run the evaluation with a live display in the console.
with_( *, name: str | None = None, description: str | None = None, tags: list[str] | None = None, label: str | None = None, task: Task[..., Out] | str | None = None, dataset: Any | None = None, concurrency: int | None = None, iterations: int | None = None, max_errors: int | None = None, max_consecutive_errors: int | None = None, parameters: dict[str, list[Any]] | None = None, scorers: ScorersLike[Out] | None = None, assert_scores: list[str] | Literal[True] | None = None, append: bool = False,) -> te.SelfCreate a modified clone of the evaluation.
Image( data: ImageDataOrPathType, mode: str | None = None, caption: str | None = None, format: str | None = None,)Image media type for Dreadnode logging.
This class maintains a high-fidelity float32 numpy array as the canonical representation, ensuring no precision loss during use in transforms, scorers, and optimization routines.
Initialize an Image object.
Parameters:
data(ImageDataOrPathType) –The image data, which can be:- A file path (str or Path)
- A base64-encoded string (starting with “data:image/”)
- Raw bytes of an image file
- A numpy array (HWC or HW format)
- A Pillow Image object
mode(str | None, default:None) –Optional mode for the image (RGB, L, etc.)caption(str | None, default:None) –Optional caption for the imageformat(str | None, default:None) –Optional format to use when saving (png, jpg, etc.)
canonical_array
Section titled “canonical_array”canonical_array: ndarray[Any, dtype[float32]]Get the canonical high-fidelity representation.
Returns:
ndarray[Any, dtype[float32]]–float32 numpy array in [0,1] range, HWC format
mode: strGet the image mode (L, RGB, RGBA, etc.).
shape: tuple[int, ...]Get the shape of the canonical array.
resize
Section titled “resize”resize( height: int, width: int, *, resample: int | None = None) -> ImageResize the image to the specified size.
Parameters:
height(int) –The desired height of the image.width(int) –The desired width of the image.resample(int | None, default:None) –Resampling filter to use (see PIL.Image for options).
Returns:
Image–New Image object with resized image
show() -> NoneDisplays the image using the default image viewer.
to_base64
Section titled “to_base64”to_base64() -> strReturns the image as a base64 encoded string.
to_numpy
Section titled “to_numpy”to_numpy( dtype: Any = np.float32,) -> np.ndarray[t.Any, t.Any]Returns the image as a NumPy array with specified dtype.
Parameters:
dtype(Any, default:float32) –Target dtype. Common options:- np.float32/np.float64: Values in [0.0, 1.0] (recommended)
- np.uint8: Values in [0, 255]
Returns:
ndarray[Any, Any]–NumPy array in HWC format (or HW for grayscale)
to_pil
Section titled “to_pil”to_pil() -> PILImageReturns the image as a Pillow Image object.
to_serializable
Section titled “to_serializable”to_serializable() -> tuple[bytes, dict[str, t.Any]]Convert the image to bytes and return with metadata.
Returns:
tuple[bytes, dict[str, Any]]–Tuple of (image_bytes, metadata_dict)
Markdown
Section titled “Markdown”Markdown(text: str)Hint type for markdown-formatted text.
This is a subclass of Text with format set to “markdown”.
Example
log_output("report", Markdown("..."))Metric
Section titled “Metric”Any reported value regarding the state of a run, task, and optionally object (input/output).
Attributes:
value(float) –The value of the metric, e.g. 0.5, 1.0, 2.0, etc.step(int) –An step value to indicate when this metric was reported.timestamp(datetime) –The timestamp when the metric was reported.attributes(JsonDict) –A dictionary of attributes to attach to the metric.
apply_aggregation
Section titled “apply_aggregation”apply_aggregation( agg: MetricAggMode, others: list[Metric]) -> MetricApply an aggregation mode to the metric. This will modify the metric in place.
Parameters:
agg(MetricAggMode) –The aggregation to apply. One of “sum”, “min”, “max”, or “count”.others(list[Metric]) –A list of other metrics to apply the aggregation to.
Returns:
Metric–self
from_many
Section titled “from_many”from_many( values: Sequence[tuple[str, float, float]], step: int = 0, **attributes: JsonValue,) -> MetricCreate a composite metric from individual values and weights.
This is useful for creating a metric that is the weighted average of multiple values. The values should be a sequence of tuples, where each tuple contains the name of the metric, the value of the metric, and the weight of the metric.
The individual values will be reported in the attributes of the metric.
Parameters:
values(Sequence[tuple[str, float, float]]) –A sequence of tuples containing the name, value, and weight of each metric.step(int, default:0) –The step value to attach to the metric.**attributes(JsonValue, default:{}) –Additional attributes to attach to the metric.
Returns:
Metric–A composite Metric
MetricSeries
Section titled “MetricSeries”A series of metric values with aggregation computed on read.
This replaces dict[str, list[Metric]] for metric storage. Raw values are always preserved, and any aggregation can be computed at query time.
Attributes:
values(list[float]) –The raw metric values in order of logging.steps(list[int | None]) –Optional step indices for each value.timestamps(list[datetime]) –Timestamps for each value.
value: float | NoneConvenience property for single-value series (same as last).
append
Section titled “append”append( value: float, step: int | None = None, timestamp: datetime | None = None,) -> NoneAppend a value to the series.
at_step
Section titled “at_step”at_step(step: int) -> float | NoneGet the value at a specific step.
count() -> intGet the number of values.
first() -> float | NoneGet the first value in the series.
last() -> float | NoneGet the last value in the series.
max() -> float | NoneGet the maximum value.
mean() -> float | NoneCompute the mean of all values.
min() -> float | NoneGet the minimum value.
sum() -> floatGet the sum of all values.
to_metric
Section titled “to_metric”to_metric(aggregation: MetricAggMode = 'avg') -> MetricConvert to a single Metric using the specified aggregation.
values_at_steps
Section titled “values_at_steps”values_at_steps(steps: Sequence[int]) -> list[float | None]Get values at multiple steps.
Object3D
Section titled “Object3D”Object3D( data: Object3DDataType, caption: str | None = None, format: str | None = None,)3D object media type for Dreadnode logging.
Supports:
- Local file paths to 3D models (.obj, .glb, .gltf, etc.)
- Raw bytes with metadata
Initialize a 3D Object.
Parameters:
data(Object3DDataType) –The 3D object data, which can be:- A path to a local 3D model file (str or Path)
- Raw bytes of a 3D model file
caption(str | None, default:None) –Optional caption for the 3D objectformat(str | None, default:None) –Optional format override (obj, glb, etc.)
to_serializable
Section titled “to_serializable”to_serializable() -> tuple[bytes, dict[str, t.Any]]Convert the 3D object to bytes and return with metadata.
Returns:
tuple[bytes, dict[str, Any]]–A tuple of (object_bytes, metadata_dict)
Optimization
Section titled “Optimization”Dreadnode-native optimize_anything executor.
effective_dataset
Section titled “effective_dataset”effective_dataset: list[Any] | NoneReturn the trainset if provided, otherwise dataset.
optimization_id
Section titled “optimization_id”optimization_id: UUIDStable identifier for this optimization run.
console
Section titled “console”console() -> OptimizationResult[CandidateT]Run the optimization with a live console adapter.
OptimizationConfig
Section titled “OptimizationConfig”Top-level configuration for Dreadnode optimize_anything runs.
OptimizationResult
Section titled “OptimizationResult”OptimizationResult( backend: str, seed_candidate: CandidateT | None = None, best_candidate: CandidateT | None = None, best_score: float | None = None, best_scores: dict[str, float] = dict(), objective: str | None = None, train_size: int = 0, val_size: int = 0, pareto_frontier: list[CandidateT] = list(), history: list[Any] = list(), metadata: dict[str, Any] = dict(), raw_result: Any = None,)Result of a Dreadnode optimize_anything run.
frontier_size
Section titled “frontier_size”frontier_size: intReturn the number of candidates currently on the Pareto frontier.
to_dict
Section titled “to_dict”to_dict() -> dict[str, t.Any]Return a JSON-serializable result dictionary.
ParentTask
Section titled “ParentTask”ParentTask( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the parent of the current task span from the current context.
Scorer
Section titled “Scorer”Scorer( func: ScorerCallable[T], *, name: str | None = None, assert_: bool = False, attributes: JsonDict | None = None, catch: bool = False, step: int = 0, auto_increment_step: bool = False, log_all: bool = True, bound_obj: Any | Unset = UNSET, config: dict[str, ConfigInfo] | None = None, context: dict[str, Context] | None = None, wraps: Callable[..., Any] | None = None,)A stateful, configurable, and composable wrapper for a scoring function.
A Scorer is a specialized Component that evaluates an object and produces a Metric.
It inherits the configuration and context-awareness of a Component, allowing
scorers to be defined with dn.Config and dn.Context parameters.
Attributes:
name–The name of the scorer.attributes–A dictionary of attributes to attach to each generated metric.catch–Whether to catch exceptions during scoring and log a warning instead.step–An optional step value to attach to generated metrics.auto_increment_step–Whether to automatically increment the step after each scoring.log_all–Whether to log all sub-metrics from nested compositions.bound_obj–An optional object to bind the scorer to, overriding the caller-provided object.
Examples:
@dn.scorer(name="length_scorer", catch=True) async def length_scorer(text: str) -> float: return len(text) / 100.0 # Normalize length to [0.0, 1.0]
above( threshold: float, *, name: str | None = None) -> ScoringCondition[T]Create a ScoringCondition that passes if score > threshold.
The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.
Parameters:
threshold(float) –The value the score must exceed.name(str | None, default:None) –Optional name for the condition.
Returns:
ScoringCondition[T]–A ScoringCondition that passes if score > threshold.
Examples:
@hook(GenerationStep, when=[quality.above(0.5)])async def high_quality_only(event): # event.metrics["quality"] is available ...as_condition
Section titled “as_condition”as_condition( *, name: str | None = None) -> ScoringCondition[T]Create a ScoringCondition that always passes but attaches the metric.
Use this when you want to record the score without gating. The metric will be attached to the event for logging/telemetry.
Parameters:
name(str | None, default:None) –Optional name for the condition.
Returns:
ScoringCondition[T]–A ScoringCondition that always passes.
Examples:
@hook(GenerationStep, when=[ quality.above(0.5), # Gates on quality safety.as_condition(), # Just records safety metric])async def observe(event): # Both metrics available: event.metrics["quality"], event.metrics["safety"] ...as_scorer
Section titled “as_scorer”as_scorer( func: Callable[[OuterT], T], *, name: str | None = None) -> Scorer[OuterT]Adapts a scorer to operate with some other type
A wrapper that allows a generic scorer (e.g., one that refines a string) to be used with a complex candidate object (e.g., a Pydantic model containing that string).
Parameters:
func(Callable[[OuterT], T]) –A function to convert from some outer type to the scorer’s expected type.name(str | None, default:None) –An optional new name for the adapted scorer.
Returns:
Scorer[OuterT]–A new Scorer instance that operates on theOuterT.
assert_off
Section titled “assert_off”assert_off() -> Scorer[T]Mark this scorer as not an assertion.
assert_on
Section titled “assert_on”assert_on() -> Scorer[T]Mark this scorer as an assertion (must be truthy).
at_least
Section titled “at_least”at_least( threshold: float, *, name: str | None = None) -> ScoringCondition[T]Create a ScoringCondition that passes if score >= threshold.
The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.
Parameters:
threshold(float) –The minimum acceptable value.name(str | None, default:None) –Optional name for the condition.
Returns:
ScoringCondition[T]–A ScoringCondition that passes if score >= threshold.
Examples:
@hook(GenerationStep, when=[confidence.at_least(0.8)])async def confident_only(event): ...at_most
Section titled “at_most”at_most( threshold: float, *, name: str | None = None) -> ScoringCondition[T]Create a ScoringCondition that passes if score <= threshold.
The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.
Parameters:
threshold(float) –The maximum acceptable value.name(str | None, default:None) –Optional name for the condition.
Returns:
ScoringCondition[T]–A ScoringCondition that passes if score <= threshold.
Examples:
@hook(GenerationStep, when=[toxicity.at_most(0.1)])async def non_toxic_only(event): ...below( threshold: float, *, name: str | None = None) -> ScoringCondition[T]Create a ScoringCondition that passes if score < threshold.
The condition runs this scorer, attaches the metric to the event, and gates based on the threshold.
Parameters:
threshold(float) –The value the score must be below.name(str | None, default:None) –Optional name for the condition.
Returns:
ScoringCondition[T]–A ScoringCondition that passes if score < threshold.
Examples:
@hook(GenerationStep, when=[quality.below(0.5)])async def retry_low_quality(event) -> Reaction: return RetryWithFeedback(f"Quality {event.metrics['quality'].value} too low")bind(obj: Any) -> Scorer[t.Any]Bind the scorer to a specific object. Any time the scorer is executed, the bound object will be passed instead of the caller-provided object.
This is useful for building scoring patterns that are not directly tied to the output of a task.
Examples:
@dn.task(scorers=[ dn.scorers.image_distance(reference).bind(dn.TaskInput("image"))])async def classify(image: dn.Image) -> str: ...Parameters:
obj(Any) –The object to bind the scorer to.
Returns:
Scorer[Any]–A new Scorer bound to the specified object.
clone() -> Scorer[T]Clone the scorer.
evaluate
Section titled “evaluate”evaluate( obj: T, scorers: ScorersLike[T], *, step: int | None = None, assert_scores: Literal[True, False] | list[str] | None = None,) -> dict[str, list[Metric]]Run multiple scorers against an object and collect metrics.
Parameters:
obj(T) –The object to score.scorers(ScorersLike[T]) –A list of scorers to use.step(int | None, default:None) –An optional step value to attach to all generated metrics.assert_scores(Literal[True, False] | list[str] | None, default:None) –Controls assertion behavior:- None (default): Use each scorer’s assert_ field
- True: Assert ALL scorers must be truthy
- False: Disable all assertions
- list[str]: Assert only these scorer names (overrides scorer.assert_)
Returns:
dict[str, list[Metric]]–A dictionary mapping scorer names to their generated metrics.
Raises:
AssertionFailedError–If any asserted scores have falsy values.
fit(scorer: ScorerLike[T]) -> Scorer[T]Fit a scorer to the given attributes.
Parameters:
scorer(ScorerLike[T]) –The scorer to fit.
Returns:
Scorer[T]–A Scorer instance.
fit_many
Section titled “fit_many”fit_many(scorers: ScorersLike[T] | None) -> list[Scorer[T]]Convert a collection of scorer-like objects into a list of Scorer instances.
This method provides a flexible way to handle different input formats for scorers, automatically converting callables to Scorer objects and applying consistent naming and attributes across all scorers.
Parameters:
scorers(ScorersLike[T] | None) –A collection of scorer-like objects. Can be:- A dictionary mapping names to scorer objects or callables
- A sequence of scorer objects or callables
- None (returns empty list)
Returns:
list[Scorer[T]]–A list of Scorer instances with consistent configuration.
normalize_and_score
Section titled “normalize_and_score”normalize_and_score( obj: T, *args: Any, **kwargs: Any) -> list[Metric]Executes the scorer and returns all generated metrics, including from nested compositions.
Parameters:
obj(T) –The object to score.
Returns:
list[Metric]–All metrics generated by the scorer.
on( event_type: type[AgentEventT], *, adapter: Callable[[AgentEventT], Any] | None = None, **kwargs: Any,) -> ScorerHook[AgentEventT]Create a ScorerHook that runs this scorer on agent events.
.. deprecated::
Use @hook(EventType, when=[scorer.above(threshold)]) instead.
Or use .above(), .below(), .as_condition() for scoring conditions.
This enables per-step scoring during agent execution, even outside of an Evaluation context.
Parameters:
event_type(type[AgentEventT]) –The event type to trigger on (e.g., GenerationStep, ToolStep).adapter(Callable[[AgentEventT], Any] | None, default:None) –Optional function to extract the object to score from the event.**kwargs(Any, default:{}) –Additional arguments passed to ScorerHook.
Returns:
ScorerHook[AgentEventT]–A ScorerHook configured to run this scorer on matching events.
Examples:
@dn.scorerasync def quality(text: str) -> float: return await check_quality(text)
# Score generation outputshook = quality.on( GenerationStep, adapter=lambda e: e.messages[0].content if e.messages else "",)
# Use with threshold reactionshook = quality.on(GenerationStep, adapter=...).retry_if_below(0.5)
# Add to agentagent = Agent( ..., scorers=[hook],)rename
Section titled “rename”rename(new_name: str) -> Scorer[T]Rename the scorer.
Parameters:
new_name(str) –The new name for the scorer.
Returns:
Scorer[T]–A new Scorer with the updated name.
score(obj: T, *args: Any, **kwargs: Any) -> MetricExecute the scorer and return the metric. If the scorer is a composition of other scorers, it will return the “highest-priority” metric, typically the first in the list.
Any output value will be converted to a Metric object if not already one.
Parameters:
obj(T) –The object to score.
Returns:
Metric–A Metric object.
score_composite
Section titled “score_composite”score_composite( obj: T, *args: Any, **kwargs: Any) -> tuple[Metric, list[Metric]]Executes the scorer and returns both the primary Metric and a list of any additional metrics from nested compositions.
Parameters:
obj(T) –The object to score.
Returns:
tuple[Metric, list[Metric]]–A tuple of the primary Metric and a list of all metrics generated.
with_( *, name: str | None = None, assert_: bool | None = None, attributes: JsonDict | None = None, step: int | None = None, auto_increment_step: bool | None = None, catch: bool | None = None, log_all: bool | None = None,) -> Scorer[T]Create a new Scorer with updated properties.
Parameters:
name(str | None, default:None) –New name for the scorer.attributes(JsonDict | None, default:None) –New attributes for the scorer.step(int | None, default:None) –New step value for the scorer.auto_increment_step(bool | None, default:None) –Automatically increment the step for each time this scorer is called.catch(bool | None, default:None) –Catch exceptions in the scorer function.log_all(bool | None, default:None) –Log all sub-metrics from nested composition.
Returns:
Scorer[T]–A new Scorer with the updated properties
Span( name: str, tracer: Tracer, *, attributes: AnyDict | None = None, label: str | None = None, type: SpanType = "span", tags: Sequence[str] | None = None,)active
Section titled “active”active: boolCheck if the span is currently active (recording).
duration
Section titled “duration”duration: floatGet the duration of the span in seconds.
exception
Section titled “exception”exception: BaseException | NoneGet the exception recorded in the span, if any.
failed
Section titled “failed”failed: boolCheck if the span has failed.
is_recording
Section titled “is_recording”is_recording: boolCheck if the span is currently recording.
label: strGet the label of the span.
Table( data: TableDataType, caption: str | None = None, format: str | None = None, *, index: bool = False,)Table data type for Dreadnode logging.
Supports:
- Pandas DataFrames
- CSV/Parquet/JSON files
- Dict or list data structures
- NumPy arrays
Initialize a Table object.
Parameters:
data(TableDataType) –The table data, which can be:- A pandas DataFrame
- A path to a CSV/JSON/Parquet file
- A dict or list of dicts
- A NumPy array
caption(str | None, default:None) –Optional caption for the tableformat(str | None, default:None) –Optional format to use when saving (csv, parquet, json)index(bool, default:False) –Include index in the output
to_serializable
Section titled “to_serializable”to_serializable() -> tuple[bytes, dict[str, t.Any]]Convert the table to bytes and return with metadata.
Returns:
tuple[bytes, dict[str, Any]]–A tuple of (table_bytes, metadata_dict)
Task( func: Callable[P, R], tracer: Tracer, *, name: str | None = None, label: str | None = None, scorers: ScorersLike[R] | None = None, assert_scores: list[str] | Literal[True] | None = None, log_inputs: Sequence[str] | bool | Inherited = INHERITED, log_output: bool | Inherited = INHERITED, log_execution_metrics: bool = False, tags: Sequence[str] | None = None, attributes: AnyDict | None = None, entrypoint: bool = False, config: dict[str, ConfigInfo] | None = None, context: dict[str, Context] | None = None,)Structured task wrapper for a function that can be executed within a run.
Tasks allow you to associate metadata, inputs, outputs, and metrics for a unit of work.
Parameters:
func(Callable[P, R]) –The function to wrap as a task.tracer(Tracer) –The tracer to use for tracing spans. If None, uses the default tracer.name(str | None, default:None) –The name of the task. This is used for logging and tracing.label(str | None, default:None) –The label of the task - used to group associated metrics and data together.scorers(ScorersLike[R] | None, default:None) –A list of scorers to evaluate the task’s output.tags(Sequence[str] | None, default:None) –A list of tags to attach to the task span.attributes(AnyDict | None, default:None) –A dictionary of attributes to attach to the task span.”log_inputs(Sequence[str] | bool | Inherited, default:INHERITED) –Log all, or specific, incoming arguments to the function as inputs.log_output(bool | Inherited, default:INHERITED) –Log the result of the function as an output.log_execution_metrics(bool, default:False) –Track execution metrics such as success rate and run count.entrypoint(bool, default:False) –Indicate this task should be considered an entrypoint.config(dict[str, ConfigInfo] | None, default:None) –Configuration schema for the task parameters.context(dict[str, Context] | None, default:None) –Context schema for the task execution.
clone() -> Task[P, R]Clone a task.
Returns:
Task[P, R]–A new Task instance with the same attributes as this one.
many(count: int, *args: args, **kwargs: kwargs) -> list[R]Run the task multiple times and return a list of outputs.
Parameters:
count(int) –The number of times to run the task.args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task.
Returns:
list[R]–A list of outputs from each task execution.
map( args: list[Any] | dict[str, Any | list[Any]], *, concurrency: int | None = None,) -> list[R]Runs this task multiple times by mapping over iterable arguments.
Examples:
@dn.taskasync def my_task(input: str, *, suffix: str = "") -> str: return f"Processed {input}{suffix}"
# Map over a list of basic inputsawait task.map_run(["1", "2", "3"])
# Map over a dict of parametersawait task.map_run({ "input": ["1", "2", "3"], "suffix": ["_a", "_b", "_c"]})Parameters:
args(list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.concurrency(int | None, default:None) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.
Returns:
list[R]–A TaskSpanList containing the results of each execution.
retry(count: int, *args: args, **kwargs: kwargs) -> RRun the task up to count times, returning the output of the first
successful execution, otherwise raise the most recent exception.
This is a powerful pattern for non-deterministic tasks where multiple
attempts may be needed to generate a valid output according to the
task’s assert_scores. However, it can also be useful as a retry
mechanism for transient errors.
Parameters:
count(int) –The maximum number of times to run the task.args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task.
Returns:
R–The output of the first successful and valid task execution.
run(*args: args, **kwargs: kwargs) -> TaskSpan[R]Execute the task and return the result as a TaskSpan. If the task fails, an exception is raised.
Parameters:
args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task
run_always
Section titled “run_always”run_always(*args: args, **kwargs: kwargs) -> TaskSpan[R]Execute the task and return the result as a TaskSpan.
Note, if the task fails, the span will still be returned with the exception set.
Parameters:
args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task.
Returns:
TaskSpan[R]–The span associated with task execution.
stream_many
Section titled “stream_many”stream_many( count: int, *args: args, **kwargs: kwargs) -> t.AsyncContextManager[ t.AsyncGenerator[TaskSpan[R], None]]Run the task multiple times concurrently and yield each TaskSpan as it completes.
Parameters:
count(int) –The number of times to run the task.args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task
Yields:
AsyncContextManager[AsyncGenerator[TaskSpan[R], None]]–TaskSpan for each task execution, or an Exception if the task fails.
stream_map
Section titled “stream_map”stream_map( args: list[Any] | dict[str, Any | list[Any]], *, concurrency: int | None = None,) -> t.AsyncContextManager[ t.AsyncGenerator[TaskSpan[R], None]]Runs this task multiple times by mapping over iterable arguments.
Parameters:
args(list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.concurrency(int | None, default:None) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.
Returns:
AsyncContextManager[AsyncGenerator[TaskSpan[R], None]]–A TaskSpanList containing the results of each execution.
try_(*args: args, **kwargs: kwargs) -> R | NoneAttempt to run the task and return the result. If the task fails, None is returned.
Parameters:
args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task.
Returns:
R | None–The output of the task, or None if the task failed.
try_many
Section titled “try_many”try_many( count: int, *args: args, **kwargs: kwargs) -> list[R]Attempt to run the task multiple times and return a list of outputs. If any task fails, its result is excluded from the output.
Parameters:
count(int) –The number of times to run the task.args(args, default:()) –The arguments to pass to the task.kwargs(kwargs, default:{}) –The keyword arguments to pass to the task.
Returns:
list[R]–A list of outputs from each task execution.
try_map
Section titled “try_map”try_map( args: list[Any] | dict[str, Any | list[Any]], *, concurrency: int | None = None,) -> list[R]Attempt to run this task multiple times by mapping over iterable arguments. If any task fails, its result is excluded from the output.
Parameters:
args(list[Any] | dict[str, Any | list[Any]]) –Either a flat list of the first positional argument, or a dict where each key is a parameter name and the value is either a single value or a list of values to map over.concurrency(int | None, default:None) –The maximum number of tasks to run in parallel. If None, runs with unlimited concurrency.
Returns:
list[R]–A TaskSpanList containing the results of each execution.
with_( *, scorers: ScorersLike[R] | None = None, assert_scores: Sequence[str] | Literal[True] | None = None, name: str | None = None, tags: Sequence[str] | None = None, label: str | None = None, log_inputs: Sequence[str] | bool | Inherited | None = None, log_output: bool | Inherited | None = None, log_execution_metrics: bool | None = None, append: bool = False, attributes: AnyDict | None = None, entrypoint: bool = False,) -> Task[P, R]Clone a task and modify its attributes.
Parameters:
scorers(ScorersLike[R] | None, default:None) –A list of new scorers to set or append to the task.assert_scores(Sequence[str] | Literal[True] | None, default:None) –A list of new assertion names to set or append to the task.name(str | None, default:None) –The new name for the task.tags(Sequence[str] | None, default:None) –A list of new tags to set or append to the task.label(str | None, default:None) –The new label for the task.log_inputs(Sequence[str] | bool | Inherited | None, default:None) –Log all, or specific, incoming arguments to the function as inputs.log_output(bool | Inherited | None, default:None) –Log the result of the function as an output.log_execution_metrics(bool | None, default:None) –Log execution metrics such as success rate and run count.append(bool, default:False) –If True, appends the new scorers and tags to the existing ones. If False, replaces them.attributes(AnyDict | None, default:None) –Additional attributes to set or update in the task.entrypoint(bool, default:False) –Indicate this task should be considered an entrypoint. All compatible arguments will be treated as configurable and a run will be created automatically when called if one is not already active.
Returns:
Task[P, R]–A new Task instance with the modified attributes.
TaskSpan
Section titled “TaskSpan”TaskSpan( name: str, tracer: Tracer, *, storage: Storage | None = None, project: str = "default", task_id: str | UUID | None = None, type: SpanType = "task", attributes: AnyDict | None = None, label: str | None = None, params: AnyDict | None = None, metrics: MetricsDict | None = None, tags: Sequence[str] | None = None, arguments: Arguments | None = None,)Self-sufficient task span with object storage, metrics, params, and artifacts.
TaskSpan is the primary span type for all operations. It manages its own:
- Object storage (inputs, outputs, arbitrary objects)
- Metrics tracking
- Parameters
- Artifacts
- Child tasks
TaskSpans can be nested - a TaskSpan can contain child TaskSpans.
agent_id
Section titled “agent_id”agent_id: str | NoneGet the ID of the nearest agent span in the parent chain.
all_tasks
Section titled “all_tasks”all_tasks: list[TaskSpan[Any]]Get all tasks, including nested subtasks.
arguments
Section titled “arguments”arguments: Arguments | NoneGet the arguments used for this task if created from a function.
eval_id
Section titled “eval_id”eval_id: str | NoneGet the ID of the nearest evaluation span in the parent chain.
inputs
Section titled “inputs”inputs: AnyDictGet all logged inputs.
metrics
Section titled “metrics”metrics: MetricsDictGet all metrics.
output
Section titled “output”output: RGet the output of this task if created from a function.
outputs
Section titled “outputs”outputs: AnyDictGet all logged outputs.
params
Section titled “params”params: AnyDictGet all parameters.
parent_task
Section titled “parent_task”parent_task: TaskSpan[Any] | NoneGet the parent task if it exists.
parent_task_id
Section titled “parent_task_id”parent_task_id: strGet the parent task ID if it exists.
root_id
Section titled “root_id”root_id: strGet the root task’s ID (for span grouping/routing).
run_id
Section titled “run_id”run_id: strAlias for root_id (backwards compatibility).
study_id
Section titled “study_id”study_id: str | NoneGet the ID of the nearest study span in the parent chain.
task_id
Section titled “task_id”task_id: strGet this task’s unique ID.
tasks: list[TaskSpan[Any]]Get the list of child tasks.
from_context
Section titled “from_context”from_context( context: TaskContext, tracer: Tracer, storage: Storage | None = None,) -> TaskSpan[t.Any]Continue a task from captured context on a remote host.
get_average_metric_value
Section titled “get_average_metric_value”get_average_metric_value(key: str) -> floatGet the mean of a metric series.
get_object
Section titled “get_object”get_object(hash_: str) -> ObjectGet an object by its hash.
link_objects
Section titled “link_objects”link_objects( object_hash: str, link_hash: str, attributes: AnyDict | None = None,) -> NoneLink two objects together.
log_artifact
Section titled “log_artifact”log_artifact( local_uri: str | Path, *, name: str | None = None) -> dict[str, t.Any] | NoneLog a file as an artifact.
log_input
Section titled “log_input”log_input( name: str, value: Any, *, label: str | None = None, attributes: AnyDict | None = None,) -> strLog an input value.
log_metric
Section titled “log_metric”log_metric( name: str, value: float | bool, *, step: int = 0, origin: Any | None = None, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, prefix: str | None = None, attributes: JsonDict | None = None,) -> Metriclog_metric( name: str, value: Metric, *, origin: Any | None = None, aggregation: MetricAggMode | None = None, prefix: str | None = None,) -> Metriclog_metric( name: str, value: float | bool | Metric, *, step: int = 0, origin: Any | None = None, timestamp: datetime | None = None, aggregation: MetricAggMode | None = None, prefix: str | None = None, attributes: JsonDict | None = None,) -> MetricLog a metric value.
log_object
Section titled “log_object”log_object( value: Any, *, label: str | None = None, event_name: str = EVENT_NAME_OBJECT, attributes: AnyDict | None = None,) -> strStore an object and return its hash. Objects are stored but not logged as span events.
log_output
Section titled “log_output”log_output( name: str, value: Any, *, label: str | None = None, attributes: AnyDict | None = None,) -> strLog an output value.
log_param
Section titled “log_param”log_param(key: str, value: Any) -> NoneLog a single parameter.
log_params
Section titled “log_params”log_params(**params: Any) -> NoneLog multiple parameters.
Text(text: str, format: str)Text data type for Dreadnode logging.
Initialize a Text object.
Parameters:
text(str) –The text content to logformat(str) –The format hint of the text
Transform
Section titled “Transform”Transform( func: TransformCallable[In, Out], *, name: str | None = None, catch: bool = False, modality: Modality | None = None, config: dict[str, ConfigInfo] | None = None, context: dict[str, Context] | None = None, compliance_tags: dict[str, Any] | None = None,)Represents a transformation operation that modifies the input data.
catch = catchIf True, catches exceptions during the transform and attempts to return the original, unmodified object from the input. If False, exceptions are raised.
compliance_tags
Section titled “compliance_tags”compliance_tags = compliance_tags or {}Compliance framework tags (OWASP, ATLAS, SAIF) for this transform.
modality
Section titled “modality”modality = modalityThe data modality this transform operates on (text, image, audio, video).
name = nameThe name of the transform, used for reporting and logging.
as_transform
Section titled “as_transform”as_transform( *, adapt_in: Callable[[OuterIn], In], adapt_out: Callable[[Out], OuterOut], name: str | None = None,) -> Transform[OuterIn, OuterOut]Adapt this transform to a different input/output shape.
clone() -> Transform[In, Out]Clone the transform.
fit( transform: TransformLike[In, Out],) -> Transform[In, Out]Ensures that the provided transform is a Transform instance.
fit_many
Section titled “fit_many”fit_many( transforms: TransformsLike[In, Out] | None,) -> list[Transform[In, Out]]Convert a collection of transform-like objects into a list of Transform instances.
This method provides a flexible way to handle different input formats for transforms, automatically converting callables to Transform objects and applying consistent naming and attributes across all transforms.
Parameters:
transforms(TransformsLike[In, Out] | None) –A collection of transform-like objects. Can be:- A dictionary mapping names to transform objects or callables
- A sequence of scorer objects or callables
- None (returns empty list)
Returns:
list[Transform[In, Out]]–A list of Scorer instances with consistent configuration.
rename
Section titled “rename”rename(new_name: str) -> Transform[In, Out]Rename the transform.
Parameters:
new_name(str) –The new name for the transform.
Returns:
Transform[In, Out]–A new Transform with the updated name.
transform
Section titled “transform”transform(object: In, *args: Any, **kwargs: Any) -> OutPerform a transform from In to Out.
Parameters:
object(In) –The input object to transform.
Returns:
Out–The transformed output object.
with_( *, name: str | None = None, catch: bool | None = None, modality: Modality | None = None, compliance_tags: dict[str, Any] | None = None,) -> Transform[In, Out]Create a new Transform with updated properties.
TrialCandidate
Section titled “TrialCandidate”TrialCandidate( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the candidate of the current trial during an optimization study.
TrialOutput
Section titled “TrialOutput”TrialOutput( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the evaluation result of the current trial during an optimization study.
TrialScore
Section titled “TrialScore”TrialScore( *, default: Any | Unset = UNSET, required: bool = True)Retrieve the score of the current trial during an optimization study.
Video( data: VideoDataType, fps: float | None = None, caption: str | None = None, format: str | None = None, width: int | None = None, height: int | None = None,)Video media type for Dreadnode logging.
Supports:
- Local file paths (str or Path)
- Numpy array sequences with frame rate
- Raw bytes with metadata
- MoviePy VideoClip objects (if installed)
Initialize a Video object.
Parameters:
data(VideoDataType) –The video data, which can be:- A path to a local video file (str or Path)
- A numpy array of frames (requires fps)
- A list of numpy arrays for individual frames (requires fps)
- Raw bytes
- A MoviePy VideoClip object (if MoviePy is installed)
fps(float | None, default:None) –Frames per second, required for numpy array input (ignored if data is a file path or raw bytes)caption(str | None, default:None) –Optional caption for the videoformat(str | None, default:None) –Optional format override (mp4, avi, etc.)width(int | None, default:None) –Optional width in pixelsheight(int | None, default:None) –Optional height in pixels
to_serializable
Section titled “to_serializable”to_serializable() -> tuple[bytes, dict[str, t.Any]]Convert the video to bytes and return with metadata.
Returns:
tuple[bytes, dict[str, Any]]–A tuple of (video_bytes, metadata_dict)
AgentInput
Section titled “AgentInput”AgentInput( name: str | None = None, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an input from the nearest agent span.
Parameters:
name(str | None, default:None) –The name of the input. If None, uses the first input logged.default(Any | Unset, default:UNSET) –A default value if the named input is not found.required(bool, default:True) –Whether the context is required.
AgentOutput
Section titled “AgentOutput”AgentOutput( name: str = "output", *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an output from the nearest agent span.
Parameters:
name(str, default:'output') –The name of the output.default(Any | Unset, default:UNSET) –A default value if the named output is not found.required(bool, default:True) –Whether the context is required.
AgentParam
Section titled “AgentParam”AgentParam( name: str, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference a parameter from the nearest agent span.
Parameters:
name(str) –The name of the parameter.default(Any | Unset, default:UNSET) –A default value if the named parameter is not found.required(bool, default:True) –Whether the context is required.
Config
Section titled “Config”Config( default: EllipsisType, *, key: str | None = None, help: str | None = None, description: str | None = None, expose_as: Any | None = None, examples: list[Any] | None = None, gt: float | None = None, ge: float | None = None, lt: float | None = None, le: float | None = None, min_length: int | None = None, max_length: int | None = None, pattern: str | None = None, alias: str | None = None, **kwargs: Any,) -> t.AnyConfig( default: T, *, key: str | None = None, help: str | None = None, description: str | None = None, expose_as: Any = None, examples: list[Any] | None = None, gt: float | None = None, ge: float | None = None, lt: float | None = None, le: float | None = None, min_length: int | None = None, max_length: int | None = None, pattern: str | None = None, alias: str | None = None, **kwargs: Any,) -> TConfig( *, default_factory: Callable[[], T], key: str | None = None, help: str | None = None, description: str | None = None, expose_as: Any | None = None, examples: list[Any] | None = None, gt: float | None = None, ge: float | None = None, lt: float | None = None, le: float | None = None, min_length: int | None = None, max_length: int | None = None, pattern: str | None = None, alias: str | None = None, **kwargs: Any,) -> TConfig( *, key: str | None = None, help: str | None = None, description: str | None = None, expose_as: Any | None = None, examples: list[Any] | None = None, gt: float | None = None, ge: float | None = None, lt: float | None = None, le: float | None = None, min_length: int | None = None, max_length: int | None = None, pattern: str | None = None, alias: str | None = None, **kwargs: Any,) -> t.AnyConfig( default: Any = ..., *, key: str | None = UNSET, help: str | None = UNSET, description: str | None = UNSET, expose_as: Any | None = None, examples: list[Any] | None = UNSET, exclude: bool | None = UNSET, repr: bool = UNSET, init: bool | None = UNSET, init_var: bool | None = UNSET, kw_only: bool | None = UNSET, gt: SupportsGt | None = UNSET, ge: SupportsGt | None = UNSET, lt: SupportsGt | None = UNSET, le: SupportsGt | None = UNSET, min_length: int | None = UNSET, max_length: int | None = UNSET, pattern: str | None = UNSET, alias: str | None = UNSET, **kwargs: Any,) -> t.AnyDeclares a static, configurable parameter.
Parameters:
default(Any, default:...) –Default value if the field is not set.alias(str | None, default:UNSET) –The name to use for the attribute when validating or serializing by alias. This is often used for things like converting between snake and camel case.help(str | None, default:UNSET) –Human-readable help text.description(str | None, default:UNSET) –Human-readable description (overridden byhelp)expose_as(Any | None, default:None) –Override the type that this config value should be annotated as in configuration models.examples(list[Any] | None, default:UNSET) –Example values for this field.exclude(bool | None, default:UNSET) –Exclude the field from the model serialization.repr(bool, default:UNSET) –A boolean indicating whether to include the field in the__repr__output.init(bool | None, default:UNSET) –Whether the field should be included in the constructor of the dataclass. (Only applies to dataclasses.)init_var(bool | None, default:UNSET) –Whether the field should only be included in the constructor of the dataclass. (Only applies to dataclasses.)kw_only(bool | None, default:UNSET) –Whether the field should be a keyword-only argument in the constructor of the dataclass. (Only applies to dataclasses.)gt(SupportsGt | None, default:UNSET) –Greater than. If set, value must be greater than this. Only applicable to numbers.ge(SupportsGt | None, default:UNSET) –Greater than or equal. If set, value must be greater than or equal to this. Only applicable to numbers.lt(SupportsGt | None, default:UNSET) –Less than. If set, value must be less than this. Only applicable to numbers.le(SupportsGt | None, default:UNSET) –Less than or equal. If set, value must be less than or equal to this. Only applicable to numbers.min_length(int | None, default:UNSET) –Minimum length for iterables.max_length(int | None, default:UNSET) –Maximum length for iterables.pattern(str | None, default:UNSET) –Pattern for strings (a regular expression).**kwargs(Any, default:{}) –Additional keyword arguments forwarded to Pydantic’sField, includingdefault_factory,coerce_numbers_to_str,strict,multiple_of,allow_inf_nan,max_digits,decimal_places,union_mode, andfail_fast. See the Pydantic Field documentation for full semantics.
EvalInput
Section titled “EvalInput”EvalInput( name: str | None = None, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an input from the nearest evaluation span.
Parameters:
name(str | None, default:None) –The name of the input. If None, uses the first input logged.default(Any | Unset, default:UNSET) –A default value if the named input is not found.required(bool, default:True) –Whether the context is required.
EvalOutput
Section titled “EvalOutput”EvalOutput( name: str = "output", *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an output from the nearest evaluation span.
Parameters:
name(str, default:'output') –The name of the output.default(Any | Unset, default:UNSET) –A default value if the named output is not found.required(bool, default:True) –Whether the context is required.
EvalParam
Section titled “EvalParam”EvalParam( name: str, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference a parameter from the nearest evaluation span.
Parameters:
name(str) –The name of the parameter.default(Any | Unset, default:UNSET) –A default value if the named parameter is not found.required(bool, default:True) –Whether the context is required.
StudyInput
Section titled “StudyInput”StudyInput( name: str | None = None, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an input from the nearest study span.
Parameters:
name(str | None, default:None) –The name of the input. If None, uses the first input logged.default(Any | Unset, default:UNSET) –A default value if the named input is not found.required(bool, default:True) –Whether the context is required.
StudyOutput
Section titled “StudyOutput”StudyOutput( name: str = "output", *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an output from the nearest study span.
Parameters:
name(str, default:'output') –The name of the output.default(Any | Unset, default:UNSET) –A default value if the named output is not found.required(bool, default:True) –Whether the context is required.
StudyParam
Section titled “StudyParam”StudyParam( name: str, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference a parameter from the nearest study span.
Parameters:
name(str) –The name of the parameter.default(Any | Unset, default:UNSET) –A default value if the named parameter is not found.required(bool, default:True) –Whether the context is required.
TaskInput
Section titled “TaskInput”TaskInput( name: str | None = None, *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an input from the current task.
Parameters:
name(str | None, default:None) –The name of the input. If None, uses the first input logged.default(Any | Unset, default:UNSET) –A default value if the named input is not found.required(bool, default:True) –Whether the context is required.
TaskOutput
Section titled “TaskOutput”TaskOutput( name: str = "output", *, default: Any | Unset = UNSET, required: bool = True,) -> TypedSpanContextReference an output from the current task.
Parameters:
name(str, default:'output') –The name of the output.default(Any | Unset, default:UNSET) –A default value if the named output is not found.required(bool, default:True) –Whether the context is required.
configure_logging
Section titled “configure_logging”configure_logging( level: LogLevel | None = None, log_file: Path | None = None, log_file_level: LogLevel = "debug", *, verbose: bool = False,) -> NoneConfigure loguru with Rich console output (library/interactive mode).
Parameters:
level(LogLevel | None, default:None) –Console log level. If omitted, defaults to theDREADNODE_LOG_LEVELenv var orinfo.log_file(Path | None, default:None) –Optional file path for logging.log_file_level(LogLevel, default:'debug') –Log level for file output.verbose(bool, default:False) –Enable richer tracebacks and show source paths.
configure_server_logging
Section titled “configure_server_logging”configure_server_logging( level: LogLevel | None = None, log_file: Path | str | None = None, log_file_level: LogLevel = "debug",) -> NoneConfigure loguru for server/serve mode (structured, timestamped, no Rich).
Intercepts uvicorn and fastapi stdlib loggers into loguru.
Also checks the DREADNODE_LOG_FILE env var for a file sink path.
Parameters:
level(LogLevel | None, default:None) –Console log level. If omitted, defaults to theDREADNODE_LOG_LEVELenv var orinfo.log_file(Path | str | None, default:None) –Optional file path for logging. Falls back toDREADNODE_LOG_FILEenv var if not provided.log_file_level(LogLevel, default:'debug') –Log level for file output.
get_default_instance
Section titled “get_default_instance”get_default_instance() -> DreadnodeGet the default Dreadnode instance (lazy import to avoid circular dependency).
study_span
Section titled “study_span”study_span( name: str, *, label: str | None = None, tags: list[str] | None = None, airt_assessment_id: str | None = None, airt_attack_name: str | None = None, airt_goal: str | None = None, airt_goal_category: str | None = None, airt_category: str | None = None, airt_sub_category: str | None = None, airt_transforms: list[str] | None = None, airt_target_model: str | None = None, airt_attacker_model: str | None = None, airt_evaluator_model: str | None = None,) -> TaskSpan[t.Any]Create a bare span for optimization study execution.
Events populate all attributes via emit().
Parameters:
name(str) –The study name.label(str | None, default:None) –Human-readable label.tags(list[str] | None, default:None) –Additional tags.airt_assessment_id(str | None, default:None) –AIRT assessment ID (for platform linking).airt_attack_name(str | None, default:None) –AIRT attack name.airt_goal(str | None, default:None) –AIRT attack goal.airt_goal_category(str | None, default:None) –AIRT goal category.airt_transforms(list[str] | None, default:None) –AIRT transforms applied.airt_target_model(str | None, default:None) –Target model identifier.airt_attacker_model(str | None, default:None) –Attacker model identifier.airt_evaluator_model(str | None, default:None) –Evaluator model identifier.
Returns:
TaskSpan[Any]–A bare TaskSpan for study execution.
trial_span
Section titled “trial_span”trial_span( trial_id: str, *, step: int, task_name: str | None = None, label: str | None = None, tags: list[str] | None = None, airt_assessment_id: str | None = None, airt_trial_index: int | None = None, airt_attack_name: str | None = None, airt_goal: str | None = None, airt_goal_category: str | None = None, airt_category: str | None = None, airt_sub_category: str | None = None, airt_transforms: list[str] | None = None, airt_target_model: str | None = None, airt_attacker_model: str | None = None, airt_evaluator_model: str | None = None,) -> TaskSpan[t.Any]Create a bare span for optimization trial.
Events populate all attributes via emit().
Parameters:
trial_id(str) –Unique trial identifier.step(int) –Trial number in the study.task_name(str | None, default:None) –Name of the task being evaluated (for label).label(str | None, default:None) –Human-readable label.tags(list[str] | None, default:None) –Additional tags.airt_assessment_id(str | None, default:None) –AIRT assessment ID (for linking trial to assessment).airt_trial_index(int | None, default:None) –AIRT trial index within the attack.airt_attack_name(str | None, default:None) –AIRT attack name.airt_goal(str | None, default:None) –AIRT attack goal.airt_goal_category(str | None, default:None) –AIRT goal category.airt_transforms(list[str] | None, default:None) –AIRT transforms applied.airt_target_model(str | None, default:None) –Target model identifier.airt_attacker_model(str | None, default:None) –Attacker model identifier.airt_evaluator_model(str | None, default:None) –Evaluator/judge model identifier.
Returns:
TaskSpan[Any]–A bare TaskSpan for trial execution.