Skip to content

dreadnode.capabilities

API reference for the dreadnode.capabilities module.

Dreadnode capabilities.

Load capability directories that extend agent functionality with agents, tools, skills, and MCP servers.

AgentDef(
name: str,
description: str,
model: str = "inherit",
system_prompt: str = "",
tools: dict[str, bool] = dict(),
skills: list[str] = list(),
metadata: dict[str, Any] | None = None,
capability: str | None = None,
)

Agent definition resolved from markdown frontmatter.

AgentLinkDef(
kind: Literal["delegate", "subagent", "handoff"],
source: str,
target: str,
)

Synthetic capability link between agents.

Capability(
capability: str | Path,
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
bundled: bool = False,
)

Resolved capability ready for SDK and runtime use.

bundled: bool

Whether this capability is the SDK-internal bundled platform capability.

Set by the loader (via load_capability(path, bundled=True)) for exactly the capability shipped in dreadnode/builtin_capabilities. Not a manifest field; not author-settable (CAP-IDENT-004/005).

worker_defs: list[WorkerDef]

Parsed worker entries from capability.yaml (CAP-WRK-001).

Module import is deferred to the WorkerLifecycleManager, which evaluates the gate (CAP-WRK-007) before importing.

discover(
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
workspace_dir: Path | None = None,
host: str = "local",
) -> DiscoverResult

Discover capabilities for a specific host type.

flag_env_vars() -> dict[str, str]

Build CAPABILITY_FLAG__* env vars from resolved flags (CAP-FLAG-020).

list(
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
) -> builtins.list[str]

List capability names visible from the configured search paths.

resolve_flags(
persisted: dict[str, bool] | None = None,
env_overrides: dict[str, bool] | None = None,
cli_overrides: dict[str, bool] | None = None,
) -> None

Resolve effective flag state from the four-layer override stack.

Capability manifest stored in OCI config and on disk.

CapabilitySyncClient(
api: ApiClient,
org: str,
workspace: str,
cache_dir: Path,
runtime_id: str,
)

Downloads runtime capabilities from the platform into a local cache.

CAP-LOAD-010: sync before runtime starts. CAP-LOAD-012: cache is runtime-managed. CAP-LOAD-013: produces same directory layout the loader expects.

sync() -> SyncResult

Sync runtime capabilities from the platform.

Downloads enabled capabilities into the cache directory. Uses digest-based caching to skip unchanged capabilities.

DiscoverResult(
capabilities: dict[str, Capability] = dict(),
disabled: dict[str, Capability] = dict(),
failures: list[dict[str, Any]] = list(),
)

Result of capability discovery for a single host type.

LoadFailure(name: str, path: Path, error: str)

A capability that failed to load.

LoadOptions(base_dir: Path | None = None)

Options for loading a capability.

LoadResult(
capabilities: list[Capability] = list(),
failures: list[LoadFailure] = list(),
)

Result of loading capabilities from search paths.

MCPServerDef(
name: str,
transport: Literal["stdio", "streamable-http"],
command: str | None = None,
args: list[str] = list(),
env: dict[str, str] | None = None,
cwd: str | Path | None = None,
url: str | None = None,
headers: dict[str, str] | None = None,
timeout: float | None = None,
init_timeout: float | None = None,
when: list[str] | None = None,
source: Literal["inline", "file"] | None = None,
)

Parsed MCP server definition from a capability manifest.

CAP-MCP-002: transport is inferred from fields (command -> stdio, url -> streamable-http).

to_server_config() -> t.Any

Convert to an MCPClient-compatible ServerConfig.

Resolves ${VAR} and ${VAR:-default} env placeholders at this point (connect time), not at capability load time, so that capabilities can be loaded/packaged without every secret being present.

SyncError(name: str, error: str)

A capability that failed to sync.

SyncResult(
synced: list[str] = list(),
cached: list[str] = list(),
removed: list[str] = list(),
errors: list[SyncError] = list(),
bindings: list[dict[str, Any]] = list(),
)

Result of runtime sync operation.

Worker(name: str | None = None)

Capability worker — long-running background component.

Constructed at module level in a capability’s workers/ directory. Handler decorators register callables that the runtime dispatches during the worker’s lifetime. Workers interact with the runtime exclusively through a :class:RuntimeClient instance passed to each handler.

Construct a Worker (CAP-WAPI-001).

When loaded via a capability manifest, the manifest key is authoritative. If name is omitted, the loader assigns the key; if provided, it must match the key (mismatch is a validation error). Standalone workers (CAP-WTOP-002) must provide name.

arun() -> None

Async peer of :meth:run: install signal handlers, then drive the worker.

Factored from :meth:_run_until so tests can drive the lifecycle without touching process-wide signal state.

every(
*,
seconds: float | None = None,
minutes: float | None = None,
cron: str | None = None,
) -> t.Callable[[ClientHandler], ClientHandler]

Register a recurring schedule handler (CAP-WAPI-006).

Exactly one of seconds, minutes, or cron must be provided. Handler signature: async def handler(client) -> None.

on_event(
kind: str,
) -> t.Callable[[EventHandler], EventHandler]

Register an event handler (CAP-WAPI-005).

Returns a decorator. The decorated function is invoked for each broker event whose kind field matches kind exactly. Handler signature: async def handler(event, client) -> None.

on_shutdown(fn: ClientHandler) -> ClientHandler

Register a shutdown handler (CAP-WAPI-004).

Called once during worker stop, before the client is closed. Receives the runtime client as its first argument.

on_startup(fn: ClientHandler) -> ClientHandler

Register a startup handler (CAP-WAPI-003).

Called once when the worker starts, before any other handlers are active. Receives the runtime client as its first argument.

run() -> None

Launch this worker as a standalone process (CAP-WTOP-002).

Reads DREADNODE_RUNTIME_* env vars (CAP-WENV-001..003) via :class:RuntimeClient, runs the worker until SIGTERM/SIGINT. Intended use::

if __name__ == "__main__":
worker.run()

Use :meth:arun if you already have a running event loop.

task(fn: ClientHandler) -> ClientHandler

Register a supervised long-running task (CAP-WAPI-007).

The decorated function runs for the worker’s lifetime. If it returns or raises (except CancelledError), it is restarted with exponential backoff. Handler signature: async def handler(client) -> None.

get_default_capabilities_dir() -> Path

Get the default user capabilities directory.

list_capabilities(
directory: str | Path | None = None,
) -> list[dict[str, t.Any]]

List available capabilities without fully loading them.

load_capabilities(
directory: str | Path | None = None,
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
) -> LoadResult

Load all capabilities from a directory.

load_capabilities_from_search_paths(
search_paths: list[Path],
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
) -> LoadResult

Load capabilities from search paths.

If the same capability name appears in multiple directories, the first one wins.

load_capability(
path: str | Path,
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
*,
bundled: bool = False,
) -> t.Any

Load a capability from a directory.

bundled is a loader-gated flag the SDK sets only for the built-in platform capability shipped in dreadnode/builtin_capabilities. Authors cannot set it; the manifest contract has no corresponding field. Under CAP-IDENT-004/005, bundled capabilities are exempt from wire-name qualification and keep their bare tool names.

merge_capabilities(
capabilities: list[Any],
) -> MergedCapabilities

Merge multiple capabilities into one.

resolve_search_paths(
*,
capability_dirs: list[str | Path] | None = None,
cwd: Path | None = None,
user_dir: str | Path | None = None,
) -> list[Path]

Resolve capability discovery search paths (CAP-LOAD-001).

Precedence:

  1. Project-local .dreadnode/capabilities
  2. User-local ~/.dreadnode/capabilities
  3. Explicit dirs (CLI flags)
  4. DREADNODE_CAPABILITY_DIRS env list High-level resolved capability object.
Capability(
capability: str | Path,
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
bundled: bool = False,
)

Resolved capability ready for SDK and runtime use.

bundled: bool

Whether this capability is the SDK-internal bundled platform capability.

Set by the loader (via load_capability(path, bundled=True)) for exactly the capability shipped in dreadnode/builtin_capabilities. Not a manifest field; not author-settable (CAP-IDENT-004/005).

worker_defs: list[WorkerDef]

Parsed worker entries from capability.yaml (CAP-WRK-001).

Module import is deferred to the WorkerLifecycleManager, which evaluates the gate (CAP-WRK-007) before importing.

discover(
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
workspace_dir: Path | None = None,
host: str = "local",
) -> DiscoverResult

Discover capabilities for a specific host type.

flag_env_vars() -> dict[str, str]

Build CAPABILITY_FLAG__* env vars from resolved flags (CAP-FLAG-020).

list(
*,
cwd: Path | None = None,
storage: Storage | None = None,
capability_dirs: list[str | Path] | None = None,
) -> builtins.list[str]

List capability names visible from the configured search paths.

resolve_flags(
persisted: dict[str, bool] | None = None,
env_overrides: dict[str, bool] | None = None,
cli_overrides: dict[str, bool] | None = None,
) -> None

Resolve effective flag state from the four-layer override stack.

DiscoverResult(
capabilities: dict[str, Capability] = dict(),
disabled: dict[str, Capability] = dict(),
failures: list[dict[str, Any]] = list(),
)

Result of capability discovery for a single host type.

read_local_capability_records(
path: Path,
) -> dict[str, dict[str, t.Any]]

Read persisted local capability records keyed by bare capability name.

read_local_capability_state(path: Path) -> dict[str, bool]

Read persisted local capability state keyed by bare capability name.

write_local_capability_records(
path: Path, state: dict[str, dict[str, Any]]
) -> None

Persist structured local capability records keyed by bare capability name.

write_local_capability_state(
path: Path, state: dict[str, bool]
) -> None

Persist local capability state keyed by bare capability name. Capability loader — v1 spec.

Load capabilities from disk, validate against the v1 contract, and prepare for use.

See specs/capabilities/ for the canonical spec.

get_default_capabilities_dir() -> Path

Get the default user capabilities directory.

list_capabilities(
directory: str | Path | None = None,
) -> list[dict[str, t.Any]]

List available capabilities without fully loading them.

load_capabilities(
directory: str | Path | None = None,
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
) -> LoadResult

Load all capabilities from a directory.

load_capabilities_from_search_paths(
search_paths: list[Path],
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
) -> LoadResult

Load capabilities from search paths.

If the same capability name appears in multiple directories, the first one wins.

load_capability(
path: str | Path,
options: LoadOptions | None = None,
source: Literal["runtime", "local"] = "local",
*,
bundled: bool = False,
) -> t.Any

Load a capability from a directory.

bundled is a loader-gated flag the SDK sets only for the built-in platform capability shipped in dreadnode/builtin_capabilities. Authors cannot set it; the manifest contract has no corresponding field. Under CAP-IDENT-004/005, bundled capabilities are exempt from wire-name qualification and keep their bare tool names.

load_worker_from_def(
worker_def: WorkerDef,
capability_path: Path,
capability_name: str,
) -> t.Any

Import a worker module on behalf of the lifecycle manager (CAP-WRK-002, CAP-WRK-007).

Only called when the worker’s gate is satisfied. Enforces exactly one Worker instance per file. Assigns the manifest key as the worker’s name when the constructor omitted it; validates equality when provided.

Raises ImportError on module import failure or ValueError when the file exposes zero or multiple Worker instances, or when the constructor name conflicts with the manifest key.

merge_capabilities(
capabilities: list[Any],
) -> MergedCapabilities

Merge multiple capabilities into one.

parse_mcp_servers(
mcp: dict[str, Any] | None,
capability_path: Path,
component_health: list[dict[str, Any]] | None = None,
*,
declared_flags: set[str] | None = None,
manifest_path: Path | None = None,
) -> list[MCPServerDef]

Parse MCP server definitions from a capability manifest.

CAP-MCP-001: files and inline servers are merged, inline wins on name conflict. Returns empty list for mcp={} (explicit disable). Auto-discovers .mcp.json and mcp.json when mcp is None.

parse_workers(
workers: dict[str, Any] | None,
capability_path: Path,
component_health: list[dict[str, Any]] | None = None,
*,
declared_flags: set[str] | None = None,
manifest_path: Path | None = None,
) -> list[WorkerDef]

Parse worker entries from a capability manifest (CAP-WRK-001).

Returns an empty list when workers is None or \{\}. Validates each entry’s name, path, and optional when: predicate. Paths that fail validation produce a component_health error entry but don’t abort the rest of the capability load (mirrors CAP-MCP-007).

resolve_search_paths(
*,
capability_dirs: list[str | Path] | None = None,
cwd: Path | None = None,
user_dir: str | Path | None = None,
) -> list[Path]

Resolve capability discovery search paths (CAP-LOAD-001).

Precedence:

  1. Project-local .dreadnode/capabilities
  2. User-local ~/.dreadnode/capabilities
  3. Explicit dirs (CLI flags)
  4. DREADNODE_CAPABILITY_DIRS env list Capability sync — downloads capabilities from the platform.

See specs/capabilities/runtime.md (CAP-LOAD-010..013).

CapabilitySyncClient(
api: ApiClient,
org: str,
workspace: str,
cache_dir: Path,
runtime_id: str,
)

Downloads runtime capabilities from the platform into a local cache.

CAP-LOAD-010: sync before runtime starts. CAP-LOAD-012: cache is runtime-managed. CAP-LOAD-013: produces same directory layout the loader expects.

sync() -> SyncResult

Sync runtime capabilities from the platform.

Downloads enabled capabilities into the cache directory. Uses digest-based caching to skip unchanged capabilities.

LocalInstallClient(
api: ApiClient,
org: str,
local_dir: Path,
state_path: Path,
)

Install registry-backed capabilities into the local user store.

LocalInstallResult(
installed_name: str,
source: str,
overwritten: bool = False,
)

Result of a local registry-backed capability install.

SyncError(name: str, error: str)

A capability that failed to sync.

SyncResult(
synced: list[str] = list(),
cached: list[str] = list(),
removed: list[str] = list(),
errors: list[SyncError] = list(),
bindings: list[dict[str, Any]] = list(),
)

Result of runtime sync operation.

bare_capability_name(qualified_name: str) -> str

Extract the bare name from an org-qualified capability name.

e.g., ‘acme/github’ -> ‘github’ If no ’/’ present, returns the name as-is.

decode_capability_dirname(dirname: str) -> str

Decode a directory name back to a capability name.

Replaces the first ’_’ with ’/’ (canonical names have exactly one ’/’). e.g., ‘acme_github’ -> ‘acme/github’

encode_capability_dirname(name: str) -> str

Encode a capability name for use as a directory name.

Replaces ’/’ with ’_’ to avoid nested directories. This is bijective because capability name parts follow [a-z0-9][a-z0-9-]* (no underscores). e.g., ‘acme/github’ -> ‘acme_github’

install_local(
*,
source_path: Path,
local_dir: Path,
state_path: Path,
name: str,
version: str,
overwrite: bool,
copy: bool = False,
) -> LocalInstallResult

Install a capability from a local directory into the user store.

By default, creates a symlink so edits to the source are live. Pass copy=True to create a frozen snapshot instead.

The caller is responsible for validating the capability before calling this function (e.g. via Capability(source_path)). Capability type definitions — v1 spec.

See specs/capabilities/contract.md for the canonical schema.

AgentDef(
name: str,
description: str,
model: str = "inherit",
system_prompt: str = "",
tools: dict[str, bool] = dict(),
skills: list[str] = list(),
metadata: dict[str, Any] | None = None,
capability: str | None = None,
)

Agent definition resolved from markdown frontmatter.

AgentLinkDef(
kind: Literal["delegate", "subagent", "handoff"],
source: str,
target: str,
)

Synthetic capability link between agents.

DependencySpec(
python: list[str] = list(),
packages: list[str] = list(),
scripts: list[str] = list(),
)

Declared runtime dependencies from capability.yaml.

These fields are sandbox-specific — they describe what a managed sandbox (E2B/Docker) needs. Ignored for local installs.

HealthCheck(name: str, command: str)

Pre-flight check definition from capability.yaml.

Runs on load for enabled capabilities. Exit code 0 = pass, non-zero = fail. Failed checks produce a component_health entry with kind=“check”.

LoadFailure(name: str, path: Path, error: str)

A capability that failed to load.

LoadOptions(base_dir: Path | None = None)

Options for loading a capability.

LoadResult(
capabilities: list[Capability] = list(),
failures: list[LoadFailure] = list(),
)

Result of loading capabilities from search paths.

MCPServerDef(
name: str,
transport: Literal["stdio", "streamable-http"],
command: str | None = None,
args: list[str] = list(),
env: dict[str, str] | None = None,
cwd: str | Path | None = None,
url: str | None = None,
headers: dict[str, str] | None = None,
timeout: float | None = None,
init_timeout: float | None = None,
when: list[str] | None = None,
source: Literal["inline", "file"] | None = None,
)

Parsed MCP server definition from a capability manifest.

CAP-MCP-002: transport is inferred from fields (command -> stdio, url -> streamable-http).

to_server_config() -> t.Any

Convert to an MCPClient-compatible ServerConfig.

Resolves ${VAR} and ${VAR:-default} env placeholders at this point (connect time), not at capability load time, so that capabilities can be loaded/packaged without every secret being present.

WorkerDef(
name: str,
path: Path | None = None,
command: str | None = None,
args: list[str] = list(),
env: dict[str, str] = dict(),
when: list[str] | None = None,
)

Parsed worker entry from a capability manifest.

Two kinds, mutually exclusive (CAP-WTOP-004):

  • In-process Python worker — populates :attr:path; the runtime imports the module and drives the :class:Worker instance via WorkerRunner.
  • Subprocess worker — populates :attr:command (with optional :attr:args / :attr:env); the runtime spawns and supervises it, injecting the DREADNODE_RUNTIME_* env contract (CAP-WENV-001..003).

See CAP-WRK-001/002/007 and CAP-WTOP-004..009 in specs/capabilities/workers.md.

is_subprocess: bool

True when this worker is a runtime-spawned subprocess (CAP-WTOP-004). Capability flag definitions, validation, and resolution — v1 spec.

See specs/capabilities/flags.md for the canonical rules (CAP-FLAG-*).

FlagDef(name: str, description: str, default: bool = False)

Author-declared flag from capability.yaml.

ResolvedFlag(
name: str,
description: str,
default: bool,
effective: bool,
source: Literal["default", "binding", "env", "cli"],
)

Effective flag state after merging the four-layer override stack.

evaluate_when(
when: list[str] | None, resolved: list[ResolvedFlag]
) -> bool

Evaluate a when predicate against resolved flag state.

Returns True if the component should be loaded (CAP-FLAG-011).

flag_to_env_name(
capability_name: str, flag_name: str
) -> str

Convention env var injected into MCP subprocesses and tool imports (CAP-FLAG-021).

override_env_name(
capability_name: str, flag_name: str
) -> str

User-facing override env var (CAP-FLAG-032).

parse_cli_flags(
raw: list[str] | None,
) -> dict[str, dict[str, bool]]

Parse --capability-flag capability.flag=true|false values (CAP-FLAG-033).

read_env_overrides(
capability_name: str, flag_defs: list[FlagDef]
) -> dict[str, bool]

Read DREADNODE_CAPABILITY_FLAG__* overrides from os.environ (CAP-FLAG-032).

resolve_flags(
flag_defs: list[FlagDef],
persisted: dict[str, bool] | None = None,
env_overrides: dict[str, bool] | None = None,
cli_overrides: dict[str, bool] | None = None,
) -> list[ResolvedFlag]

Resolve effective state for each declared flag via the four-layer stack.

validate_flags_block(
raw: dict[str, Any] | None, manifest_path: Path
) -> list[FlagDef]

Validate the top-level flags block and return parsed definitions.

Returns an empty list when raw is None or \{\}.

validate_when(
when: Any,
declared_flags: set[str],
component_name: str,
manifest_path: Path,
*,
source: str = "inline",
component_kind: str = "MCP server",
) -> list[str] | None

Validate a when predicate on a gate-eligible component (MCP server or worker).

Returns the validated flag-name list, or None for “always load”. Capability worker — long-running background component.

A Worker is constructed at module level in a capability’s workers/*.py file. Decorator-based handlers register callables; the runtime dispatches them during the worker’s lifetime.

Example::

from dreadnode.capabilities.worker import Worker, EventEnvelope, RuntimeClient
worker = Worker(name="bridge")
@worker.on_startup
async def connect(client: RuntimeClient) -> None:
worker.state["ws"] = await open_websocket()
@worker.on_event("turn.completed")
async def on_turn(event: EventEnvelope, client: RuntimeClient) -> None:
await forward_result(worker.state["ws"], event.payload)
@worker.every(seconds=30)
async def heartbeat(client: RuntimeClient) -> None:
await worker.state["ws"].ping()
ClientHandler = Callable[["RuntimeClient"], Awaitable[None]]

Signature for on_startup, on_shutdown, every, and task handlers.

EventHandler = Callable[
["RuntimeEventEnvelope", "RuntimeClient"],
Awaitable[None],
]

Signature for on_event handlers.

RuntimeClient(
server_url: str | None = None,
*,
auth_token: str | None = None,
transport: AsyncBaseTransport | None = None,
default_notify_source: str | None = None,
default_session_labels: dict[str, list[str]]
| None = None,
default_session_origin: str | None = None,
)

Client for interacting with a running Dreadnode runtime server.

Provides session management, chat streaming, event subscription, and runtime discovery. Assumes the server is already running — use :class:~dreadnode.app.client.managed_client.ManagedRuntimeClient when you need to start or manage the server process.

is_started: bool

Whether the client has verified server connectivity.

browse_session_facets(
*,
archived: Literal[
"active", "archived", "any"
] = "active",
label: list[str] | None = None,
user_id: str | None = None,
project_id: list[str] | None = None,
origin: list[str] | None = None,
search: str | None = None,
include_workload_sessions: bool = False,
) -> models.SessionFacets

Per-key value counts for the sidebar facets on the table view.

Parallels :meth:browse_sessions — takes the same filter set (minus pagination / sort) and returns a typed :class:~dreadnode.app.client.models.SessionFacets envelope. Keys with zero matches are omitted by the platform, so the result only carries the keys the caller can act on. Honors the same SES-LST-009 workload default as the list endpoint.

browse_sessions(
*,
page: int = 1,
limit: int = 20,
sort_by: Literal[
"updated_at",
"last_message_at",
"created_at",
"message_count",
] = "updated_at",
sort_dir: Literal["asc", "desc"] = "desc",
archived: Literal[
"active", "archived", "any"
] = "active",
label: list[str] | None = None,
user_id: str | None = None,
project_id: list[str] | None = None,
origin: list[str] | None = None,
search: str | None = None,
include_workload_sessions: bool = False,
) -> models.SessionListResult

Paginated browse of platform-persisted sessions for this workspace.

Pass-through for the platform’s GET /sessions query shape — the runtime forwards every kwarg as a query param and returns the platform’s paginated envelope verbatim. In-process sessions are not merged on this path; the table view trusts that _register_session_with_platform syncs new sessions within a turn. Use :meth:list_sessions for live in-process state.

include_workload_sessions (SES-LST-009) defaults to False so the table view hides eval (and future optimization / training / world) runs. Callers that want them — the agents page, analytics — pass True.

cancel_session(session_id: str) -> None

Cancel the active turn for a session.

close() -> None

Close network resources (HTTP client and interactive transport).

compact_session(
session_id: str, *, guidance: str = ""
) -> dict[str, t.Any]

Request manual compaction of a session.

create_session(
*,
capability: str | None = None,
agent: str | None = None,
model: str | None = None,
session_id: str | None = None,
project: str | None = None,
generate_params_extra: dict[str, Any] | None = None,
policy: str | dict[str, Any] | None = None,
labels: dict[str, list[str]] | None = None,
origin: str | None = None,
) -> models.SessionInfo

Create or resolve a session on the server.

If session_id is provided and a session with that ID already exists, the call is idempotent and returns the existing session (CAP-WCLI-003).

delete_session(session_id: str) -> None

Delete a server session.

execute_shell(
command: str,
*,
cwd: str | None = None,
timeout: int = 30,
) -> dict[str, t.Any]

Execute a shell command on the server.

fetch_mcp_detail(
capability: str, server_name: str
) -> dict[str, t.Any]

Fetch full detail for an MCP server.

fetch_runtime_info() -> models.RuntimeInfo

Fetch runtime metadata from the connected server.

fetch_session_messages(
session_id: str,
) -> list[dict[str, t.Any]]

Fetch conversation messages for a session.

fetch_skill_content(name: str) -> str

Fetch rendered skill content by name.

fetch_skills() -> list[models.SkillInfo]

Fetch available skills from runtime.

fetch_tools() -> list[models.ToolInfo]

Fetch available tools from runtime.

fetch_worker_detail(
capability: str, worker_name: str
) -> dict[str, t.Any]

Fetch full detail for a capability worker.

list_files(
path: str | None = None, depth: int = 10
) -> list[dict[str, t.Any]]

List files in a directory on the server.

list_sessions(
*, include_platform: bool = False
) -> list[models.SessionInfo]

List in-process sessions from the connected server (the boot/swap fast path).

Returns only sessions the runtime knows about in memory. include_platform is preserved for callers that don’t yet differentiate the two paths — when true, the runtime falls back to delegating to browse_sessions(page=1, limit=100) and returns the flat sessions list. New code wanting paginated platform history should call :meth:browse_sessions directly so it gets the envelope (total, page, etc.).

notify(
title: str,
*,
body: str | None = None,
severity: Literal[
"info", "warning", "error", "success"
] = "info",
source: str | None = None,
session_id: str | None = None,
) -> dict[str, t.Any]

Publish a user-facing notification (CAP-WCLI-014, CAP-WEVT-004).

Notifications are runtime-scope unless session_id is provided. source defaults to the client’s configured default_notify_source — worker-hosted clients get capability.<name>; standalone clients leave it empty unless the caller supplies one.

publish(
kind: str,
payload: dict[str, Any] | None = None,
*,
session_id: str | None = None,
) -> dict[str, t.Any]

Publish an event onto the runtime event bus (CAP-WCLI-013).

When session_id is provided the event is session-scoped; otherwise it is runtime-scope. Subscribers matching the event’s kind receive it regardless of scope (CAP-WEVT-002). Reserved-prefix kinds (turn., prompt., session., transport., capabilities.) are rejected at the server per CAP-WEVT-003.

read_file(path: str) -> str

Read a file’s content from the server.

reconnect_mcp_server(
capability: str, server_name: str
) -> dict[str, t.Any]

Reconnect an MCP server and return updated detail.

reload_capabilities() -> models.RuntimeInfo

Tell the server to re-discover capabilities and return updated runtime info.

reset_session(session_id: str) -> None

Reset a server session.

restart_worker(
capability: str, worker_name: str
) -> dict[str, t.Any]

Restart a capability worker and return updated detail.

run_turn(
*,
session_id: str,
message: str,
model: str | None = None,
agent: str | None = None,
reset: bool = False,
generate_params_extra: dict[str, Any] | None = None,
) -> dict[str, t.Any]

Run a turn to completion and return the terminal turn.completed payload (CAP-WEVT-007): response_text, tool_calls, usage, duration_ms, turn_id.

Use this when you want the final result without iterating individual agent events. For streaming UIs, use :meth:stream_chat instead.

Raises :class:TurnFailedError on turn.failed (carrying the error_type, partial_response, and any attempted tool calls) and :class:TurnCancelledError on turn.cancelled.

send_human_input_response(
session_id: str, response: HumanInputResponse
) -> None

Send a human input response back to the server via the interactive websocket.

send_permission_response(
session_id: str, request_id: str, decision: str
) -> None

Send a permission decision back to the server via the interactive websocket.

set_session_policy(
session_id: str, policy: str | dict[str, Any] | None
) -> dict[str, t.Any]

Swap a session’s active policy mid-run.

Returns the server response dict with policy_name, policy_is_autonomous, and policy_display_label populated from the resolved policy class.

set_session_title(session_id: str, title: str) -> None

Persist a session title on the server.

start() -> None

Verify the server is reachable.

Subclasses override this to add server lifecycle management (e.g., auto-starting an in-process or subprocess server).

stream_chat(
*,
session_id: str,
message: str,
model: str | None = None,
agent: str | None = None,
reset: bool = False,
generate_params_extra: dict[str, Any] | None = None,
) -> t.AsyncIterator[dict[str, t.Any]]

Stream websocket chat events for one session turn.

subscribe(
*kinds: str,
) -> t.AsyncIterator[RuntimeEventEnvelope]

Subscribe to runtime-bus events filtered by kinds (CAP-WCLI-018).

Returns an async iterator yielding :class:RuntimeEventEnvelope values. kinds is variadic; passing none subscribes to every event. Session- and runtime-scope envelopes both flow through — consumers inspect session_id to distinguish (CAP-WEVT-002).

The iterator yields events until the caller closes it (aclose() or breaking out of async for) or authentication is rejected. History is not replayed (CAP-WCLI-020).

On transient transport loss the client reconnects with exponential backoff, reinstates the original kinds filter, and yields a synthetic transport.reconnected envelope before resuming (CAP-WCLI-021). Events published while disconnected are not replayed; subscribers that need durability own their own resync.

Peer of :meth:subscribe_session (CAP-WCLI-011); independent from the interactive transport, so standalone worker processes can iterate the runtime bus without opening a session-control channel.

subscribe_session(session_id: str) -> None

Keep a session subscribed on the interactive websocket.

unsubscribe_session(session_id: str) -> None

Drop a session subscription from the interactive websocket.

ScheduleSpec(
interval_seconds: float | None = None,
cron_expr: str | None = None,
)

Parsed schedule for an @worker.every(...) handler.

TurnCancelledError(
reason: str,
*,
partial_response: str | None = None,
turn_id: str | None = None,
)

Raised by :meth:RuntimeClient.run_turn on a turn.cancelled terminal.

Carries the synthesized turn trajectory (CAP-WEVT-009) so callers can recover any partial_response the agent produced before cancellation.

TurnFailedError(
error_type: str,
message: str,
*,
partial_response: str | None = None,
tool_calls_attempted: list[dict[str, Any]]
| None = None,
turn_id: str | None = None,
)

Raised by :meth:RuntimeClient.run_turn on a turn.failed terminal.

Carries the synthesized turn trajectory (CAP-WEVT-008) so callers can inspect error_type, partial_response, and any tool calls the model attempted before the failure.

Worker(name: str | None = None)

Capability worker — long-running background component.

Constructed at module level in a capability’s workers/ directory. Handler decorators register callables that the runtime dispatches during the worker’s lifetime. Workers interact with the runtime exclusively through a :class:RuntimeClient instance passed to each handler.

Construct a Worker (CAP-WAPI-001).

When loaded via a capability manifest, the manifest key is authoritative. If name is omitted, the loader assigns the key; if provided, it must match the key (mismatch is a validation error). Standalone workers (CAP-WTOP-002) must provide name.

arun() -> None

Async peer of :meth:run: install signal handlers, then drive the worker.

Factored from :meth:_run_until so tests can drive the lifecycle without touching process-wide signal state.

every(
*,
seconds: float | None = None,
minutes: float | None = None,
cron: str | None = None,
) -> t.Callable[[ClientHandler], ClientHandler]

Register a recurring schedule handler (CAP-WAPI-006).

Exactly one of seconds, minutes, or cron must be provided. Handler signature: async def handler(client) -> None.

on_event(
kind: str,
) -> t.Callable[[EventHandler], EventHandler]

Register an event handler (CAP-WAPI-005).

Returns a decorator. The decorated function is invoked for each broker event whose kind field matches kind exactly. Handler signature: async def handler(event, client) -> None.

on_shutdown(fn: ClientHandler) -> ClientHandler

Register a shutdown handler (CAP-WAPI-004).

Called once during worker stop, before the client is closed. Receives the runtime client as its first argument.

on_startup(fn: ClientHandler) -> ClientHandler

Register a startup handler (CAP-WAPI-003).

Called once when the worker starts, before any other handlers are active. Receives the runtime client as its first argument.

run() -> None

Launch this worker as a standalone process (CAP-WTOP-002).

Reads DREADNODE_RUNTIME_* env vars (CAP-WENV-001..003) via :class:RuntimeClient, runs the worker until SIGTERM/SIGINT. Intended use::

if __name__ == "__main__":
worker.run()

Use :meth:arun if you already have a running event loop.

task(fn: ClientHandler) -> ClientHandler

Register a supervised long-running task (CAP-WAPI-007).

The decorated function runs for the worker’s lifetime. If it returns or raises (except CancelledError), it is restarted with exponential backoff. Handler signature: async def handler(client) -> None.