Skip to content

Workers

Long-running background components bundled with a capability — in-process or subprocess, with decorator-based handlers and a supervised lifecycle.

A worker is a long-running background component shipped with a capability. It subscribes to runtime events, runs on a schedule, and maintains state across turns — the kind of work an agent can’t do because agents are request-response.

Here’s the smallest useful worker:

workers/notifier.py
from dreadnode.capabilities.worker import Worker, EventEnvelope, RuntimeClient
worker = Worker(name="notifier")
@worker.on_event("session.created")
async def announce(event: EventEnvelope, client: RuntimeClient) -> None:
await client.notify(title=f"Session started: {event.session_id[:8]}")
if __name__ == "__main__":
worker.run()

The runtime imports this module when the capability loads, delivers every session.created event to announce, and closes the worker when the capability reloads.

The if __name__ == "__main__" guard is the recommended scaffold for every worker file. It’s a no-op when the runtime imports the module in-process, and it’s the bootstrap when the same file runs as a subprocess — so switching topologies is a one-line manifest change with no edits to the worker code.

Workers run in one of three topologies. Every worker is declared in the manifest with either path: or command:; the topology follows from what you point at.

workers:
notifier: # 1. in-process Python — same event loop as the runtime
path: workers/notifier.py
bridge: # 2. Python subprocess — same decorators, separate process
command: python
args: ['${CAPABILITY_ROOT}/workers/bridge.py']
when: [bridge-enabled]
relay: # 3. non-Python subprocess — any executable
command: ${CAPABILITY_ROOT}/bin/relay
args: ['--addr=0.0.0.0:9090']
env:
LOG_LEVEL: info

In-process Python (path:) — the runtime imports your module during capability load and dispatches decorator-based handlers on its own event loop. Fastest; no process boundary; a crash in your handler surfaces through the worker state machine. Use for anything pure-Python that doesn’t need isolation.

Python subprocess (command: python, args: [<your worker.py>]) — same decorator-based handlers, but the runtime spawns a new process and your worker file bootstraps the framework itself with worker.run() (see below). Best when you want crash isolation, a heavy workload, or a blocking library that can’t co-exist on the runtime’s event loop.

Non-Python subprocess (command:) — any executable. The runtime spawns it, supervises the process, and gives it the connection credentials in environment variables. Your executable speaks HTTP + WebSocket back to the runtime in whatever language you like. Use for Go/Node/Rust daemons, pre-built binaries, or services you don’t want to rewrite.

Workers are never auto-discovered — every worker must have an explicit manifest entry.

In-process and Python-subprocess workers share the same Worker class. A Worker instance exposes five decorators; every handler must be async def.

Runs once when the worker starts, before any events or schedules fire. Use it to open connections and seed state.

@worker.on_startup
async def connect(client: RuntimeClient) -> None:
worker.state["ws"] = await open_websocket("wss://events.example.com")

Runs once during worker stop, in reverse registration order, before the runtime client closes. Use it to flush queues and release resources. An exception here is logged and attached to the worker’s health entry, but the worker still transitions to stopped — it is not coming back.

@worker.on_shutdown
async def close(client: RuntimeClient) -> None:
ws = worker.state.get("ws")
if ws is not None:
await ws.close()

Fires for every runtime event whose kind matches exactly. Multiple handlers can subscribe to the same kind; they all fire.

@worker.on_event("turn.completed")
async def on_turn(event: EventEnvelope, client: RuntimeClient) -> None:
await forward_result(worker.state["ws"], event.payload)

See the event kinds reference for the full list and payload shapes. Handlers for the same kind can be invoked concurrently if events arrive faster than the handler completes — guard shared state with an asyncio.Lock yourself.

Schedules a handler on an interval. Exactly one of seconds, minutes, or cron must be provided.

@worker.every(seconds=30)
async def heartbeat(client: RuntimeClient) -> None:
await worker.state["ws"].ping()
@worker.every(minutes=5)
async def sweep(client: RuntimeClient) -> None:
await reconcile_state(client)
@worker.every(cron="0 * * * *")
async def hourly_sync(client: RuntimeClient) -> None:
await reconcile_state(client)

Cron expressions use the standard 5-field format (minute, hour, day-of-month, month, day-of-week).

Registers a supervised long-running task. The runtime keeps the coroutine running for the worker’s lifetime; if it returns or raises (other than CancelledError), it restarts with exponential backoff — starting at 1 s and capping at 5 minutes, with the counter resetting after 60 seconds of stable run.

@worker.task
async def reader(client: RuntimeClient) -> None:
async for message in worker.state["ws"]:
await process(message)

Use @worker.task for anything that owns its own event loop — a socket reader, a queue consumer, a watcher. If every registered task exhausts its backoff cadence, the worker transitions to error.

Any worker file with the worker.run() guard can run as a subprocess — flip the manifest entry from path: to command: python + args::

workers:
notifier:
command: python
args: ['${CAPABILITY_ROOT}/workers/notifier.py']

worker.run() reads the injected DREADNODE_RUNTIME_* variables (below), opens a RuntimeClient against the local runtime, installs SIGTERM/SIGINT handlers, and drives the same decorator dispatch loop the in-process runner uses. The subprocess parent treats exit code 0 as a clean stop and any non-zero exit as an error state.

For anything beyond the Python standard library and dreadnode itself, ship the worker as a self-contained PEP 723 script and let uv resolve dependencies at spawn. This is the recommended pattern for Python subprocess workers — no shared venv to manage, dependencies live next to the code, and the same script runs identically in local dev and a sandbox.

workers:
notifier:
command: uv
args: ['run', '${CAPABILITY_ROOT}/workers/notifier.py']
workers/notifier.py
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "dreadnode>=2.0,<3.0",
# "httpx>=0.27",
# ]
# ///
from dreadnode.capabilities.worker import Worker, EventEnvelope, RuntimeClient
worker = Worker(name="notifier")
# ... handlers ...
if __name__ == "__main__":
worker.run()

uv run reads the /// script block, provisions an isolated environment on first spawn (cached across restarts), and execs the script. On subsequent spawns the environment is reused unless the dependency list changes.

Prefer this over declaring dependencies.python in the manifest for anything a subprocess owns — dependencies.python is sandbox-only (see Dependencies), but a PEP 723 script works the same locally and in a sandbox.

Point command: at any executable. The runtime spawns it with the capability’s flag variables, your declared env:, and the runtime-connection variables (below). Your executable talks to the runtime over HTTP + WebSocket in whatever language you like.

The minimum contract:

  • Read DREADNODE_RUNTIME_URL and DREADNODE_RUNTIME_TOKEN from the environment on startup.
  • Send Authorization: Bearer <token> on every HTTP request and on the WebSocket handshake.
  • Handle SIGTERM; the runtime waits 5 seconds before escalating to SIGKILL.

The endpoints that cover most worker use cases:

EndpointPurpose
POST /api/eventsPublish a runtime-scope event. Body: {"kind": str, "payload": {}}.
POST /api/sessions/{session_id}/eventsPublish a session-scoped event.
POST /api/events with kind: "notify"Push a TUI notification. Payload: {source, title, body, severity}.
GET /api/runtimeRead runtime health — capabilities, MCP, workers, with their states.
GET /api/sessionsList active sessions.

Reserved kind prefixes (turn., prompt., session., transport., capabilities., component.) are rejected at ingress — use your own prefix (for example capability.<name>.<event>) for events you emit.

See the Worker API reference for the full client surface. If the same code later wants to run in-process, write it in Python and use worker.run() instead — you get handler decorators for free.

Workers move through a small state machine. The TUI capability manager exposes the current state — a crashed subprocess surfaces inline next to the worker name:

Capability detail showing a worker in the error state

StateWhen
loadingRuntime is importing the module or preparing the subprocess
startingon_startup handlers are running, or the subprocess is spawning
runningHandlers are dispatched normally; the subprocess is alive
stoppingon_shutdown handlers are running, or the subprocess received SIGTERM
stoppedClean exit (including on_shutdown exceptions — error is attached to health)
errorStartup failed, all @worker.task handlers crashed, or subprocess exited non-zero
gated_offwhen: predicate evaluated false — the worker was never started

When a capability reloads (operator toggles a flag in the TUI, the CLI pushes a new version, the runtime re-discovers on-disk changes), every worker it owns is stopped through the full stopping sequence — on_shutdown handlers run, subprocesses receive SIGTERM then SIGKILL after 5 seconds. The worker is then re-loaded against the updated manifest with gates re-evaluated. worker.state does not survive a reload.

The runtime does not auto-restart a subprocess worker that exits with a non-zero code. It transitions to error and stays there until an operator restarts it from the TUI capability manager or a peer worker calls client.restart_worker(capability, worker_name). In-process @worker.task handlers do auto-restart with backoff — only the worker-as-a-whole stays down. A gated_off worker cannot be restarted until you flip the controlling flag.

Subprocess workers receive environment variables from four layers, composed in this order (later wins):

  1. The inherited os.environ of the runtime process — PATH, HOME, SSL_CERT_FILE, plus anything the operator exported.
  2. The capability’s flag variables — one CAPABILITY_FLAG__<CAP>__<FLAG> per declared flag, value 1 or 0.
  3. Your manifest env: entries.
  4. The runtime-connection variables — DREADNODE_RUNTIME_URL, DREADNODE_RUNTIME_TOKEN, DREADNODE_RUNTIME_ID. Authoritative: setting these in manifest env: is a parse-time error.

In practice, printenv inside a subprocess worker looks like:

PATH=/usr/local/bin:/usr/bin:... # inherited
HOME=/Users/operator # inherited
CAPABILITY_ROOT=/Users/operator/.dreadnode/capabilities/bridge
CAPABILITY_FLAG__BRIDGE__RELAY_ENABLED=1
LOG_LEVEL=info # from manifest env:
DREADNODE_RUNTIME_URL=http://127.0.0.1:8787 # runtime
DREADNODE_RUNTIME_TOKEN=... # runtime
DREADNODE_RUNTIME_ID=... # runtime

CAPABILITY_ROOT is set to the absolute path of the capability directory and is also the working directory for the subprocess. Use ${CAPABILITY_ROOT} in command, args, or env: values to reference files inside the capability. See environment variables for the full catalog.

Subprocess worker stdout and stderr are merged and written to ~/.dreadnode/logs/worker-{capability}-{worker_name}.log. On every start the previous file is rotated to .log.prev — one level of history, no unbounded archive. The TUI capability detail panel shows the last 200 lines with the tail visible while the worker is alive, and the last 20 lines are attached to the error message when the subprocess exits non-zero. GET /api/workers/{cap}/{worker} returns the absolute path so you can open it by hand.

worker.state is a plain dict shared across every handler in the worker. Multiple on_event handlers for the same kind, @every schedules, and @task loops all run on the same event loop and will interleave across await points. Guard any non-trivial shared mutation with an asyncio.Lock:

import asyncio
@worker.on_startup
async def init(client: RuntimeClient) -> None:
worker.state["lock"] = asyncio.Lock()
worker.state["seen"] = set()
@worker.on_event("turn.completed")
async def dedupe(event: EventEnvelope, client: RuntimeClient) -> None:
async with worker.state["lock"]:
if event.payload["turn_id"] in worker.state["seen"]:
return
worker.state["seen"].add(event.payload["turn_id"])
await forward(event)

Workers have the full runtime client, so an event handler can open a session and run a turn. This is the pattern for acting on external signals: a webhook arrives, a worker picks it up, and a fresh agent session handles the decision.

@worker.on_event("capability.bridge.callback_received")
async def triage(event: EventEnvelope, client: RuntimeClient) -> None:
session = await client.create_session(
capability="bridge",
agent="triage",
session_id=f"callback-{event.payload['callback_id']}", # idempotent
)
async for _ in client.stream_chat(
session_id=session.session_id,
message=f"Investigate callback: {event.payload}",
):
pass # discard stream — the turn runs to completion regardless

create_session is idempotent on session_id, which makes “one session per external entity” trivial. stream_chat returns an async iterator of events; the turn runs to completion whether or not the iterator is drained. See the Worker API reference for the full session and turn surface.

Worker can be driven without the runtime — useful for unit tests over handler logic. Register handlers as normal, construct your own RuntimeClient (or a fake that implements the methods your handlers call), and dispatch events directly:

import pytest
from workers.bridge import worker
@pytest.mark.asyncio
async def test_forward_on_turn_completed(fake_client, fake_ws):
worker.state["ws"] = fake_ws
envelope = make_envelope(kind="turn.completed", payload={"turn_id": "t1"})
for handler in worker._event_handlers["turn.completed"]:
await handler(envelope, fake_client)
assert fake_ws.sent == [{"turn_id": "t1"}]

For end-to-end coverage — startup, schedule, shutdown — drive the full runner against a stop event. See Worker._run_until in the SDK source for the lifecycle harness used by the framework’s own tests.

Every handler receives a RuntimeClient — the worker’s channel back to the runtime. Use it to publish custom events, push notifications into the TUI, subscribe to event streams, drive agent turns, and inspect runtime state. See the Worker API reference for the full method surface.