Workers
Long-running background components bundled with a capability — in-process or subprocess, with decorator-based handlers and a supervised lifecycle.
A worker is a long-running background component shipped with a capability. It subscribes to runtime events, runs on a schedule, and maintains state across turns — the kind of work an agent can’t do because agents are request-response.
Here’s the smallest useful worker:
from dreadnode.capabilities.worker import Worker, EventEnvelope, RuntimeClient
worker = Worker(name="notifier")
@worker.on_event("session.created")async def announce(event: EventEnvelope, client: RuntimeClient) -> None: await client.notify(title=f"Session started: {event.session_id[:8]}")
if __name__ == "__main__": worker.run()The runtime imports this module when the capability loads, delivers every session.created event to announce, and closes the worker when the capability reloads.
The if __name__ == "__main__" guard is the recommended scaffold for every worker file. It’s a no-op when the runtime imports the module in-process, and it’s the bootstrap when the same file runs as a subprocess — so switching topologies is a one-line manifest change with no edits to the worker code.
Three worker topologies
Section titled “Three worker topologies”Workers run in one of three topologies. Every worker is declared in the manifest with either path: or command:; the topology follows from what you point at.
workers: notifier: # 1. in-process Python — same event loop as the runtime path: workers/notifier.py
bridge: # 2. Python subprocess — same decorators, separate process command: python args: ['${CAPABILITY_ROOT}/workers/bridge.py'] when: [bridge-enabled]
relay: # 3. non-Python subprocess — any executable command: ${CAPABILITY_ROOT}/bin/relay args: ['--addr=0.0.0.0:9090'] env: LOG_LEVEL: infoIn-process Python (path:) — the runtime imports your module during capability load and dispatches decorator-based handlers on its own event loop. Fastest; no process boundary; a crash in your handler surfaces through the worker state machine. Use for anything pure-Python that doesn’t need isolation.
Python subprocess (command: python, args: [<your worker.py>]) — same decorator-based handlers, but the runtime spawns a new process and your worker file bootstraps the framework itself with worker.run() (see below). Best when you want crash isolation, a heavy workload, or a blocking library that can’t co-exist on the runtime’s event loop.
Non-Python subprocess (command:) — any executable. The runtime spawns it, supervises the process, and gives it the connection credentials in environment variables. Your executable speaks HTTP + WebSocket back to the runtime in whatever language you like. Use for Go/Node/Rust daemons, pre-built binaries, or services you don’t want to rewrite.
Workers are never auto-discovered — every worker must have an explicit manifest entry.
Handler decorators
Section titled “Handler decorators”In-process and Python-subprocess workers share the same Worker class. A Worker instance exposes five decorators; every handler must be async def.
@worker.on_startup
Section titled “@worker.on_startup”Runs once when the worker starts, before any events or schedules fire. Use it to open connections and seed state.
@worker.on_startupasync def connect(client: RuntimeClient) -> None: worker.state["ws"] = await open_websocket("wss://events.example.com")@worker.on_shutdown
Section titled “@worker.on_shutdown”Runs once during worker stop, in reverse registration order, before the runtime client closes. Use it to flush queues and release resources. An exception here is logged and attached to the worker’s health entry, but the worker still transitions to stopped — it is not coming back.
@worker.on_shutdownasync def close(client: RuntimeClient) -> None: ws = worker.state.get("ws") if ws is not None: await ws.close()@worker.on_event(kind)
Section titled “@worker.on_event(kind)”Fires for every runtime event whose kind matches exactly. Multiple handlers can subscribe to the same kind; they all fire.
@worker.on_event("turn.completed")async def on_turn(event: EventEnvelope, client: RuntimeClient) -> None: await forward_result(worker.state["ws"], event.payload)See the event kinds reference for the full list and payload shapes. Handlers for the same kind can be invoked concurrently if events arrive faster than the handler completes — guard shared state with an asyncio.Lock yourself.
@worker.every(...)
Section titled “@worker.every(...)”Schedules a handler on an interval. Exactly one of seconds, minutes, or cron must be provided.
@worker.every(seconds=30)async def heartbeat(client: RuntimeClient) -> None: await worker.state["ws"].ping()
@worker.every(minutes=5)async def sweep(client: RuntimeClient) -> None: await reconcile_state(client)
@worker.every(cron="0 * * * *")async def hourly_sync(client: RuntimeClient) -> None: await reconcile_state(client)Cron expressions use the standard 5-field format (minute, hour, day-of-month, month, day-of-week).
@worker.task
Section titled “@worker.task”Registers a supervised long-running task. The runtime keeps the coroutine running for the worker’s lifetime; if it returns or raises (other than CancelledError), it restarts with exponential backoff — starting at 1 s and capping at 5 minutes, with the counter resetting after 60 seconds of stable run.
@worker.taskasync def reader(client: RuntimeClient) -> None: async for message in worker.state["ws"]: await process(message)Use @worker.task for anything that owns its own event loop — a socket reader, a queue consumer, a watcher. If every registered task exhausts its backoff cadence, the worker transitions to error.
Running a Python worker as a subprocess
Section titled “Running a Python worker as a subprocess”Any worker file with the worker.run() guard can run as a subprocess — flip the manifest entry from path: to command: python + args::
workers: notifier: command: python args: ['${CAPABILITY_ROOT}/workers/notifier.py']worker.run() reads the injected DREADNODE_RUNTIME_* variables (below), opens a RuntimeClient against the local runtime, installs SIGTERM/SIGINT handlers, and drives the same decorator dispatch loop the in-process runner uses. The subprocess parent treats exit code 0 as a clean stop and any non-zero exit as an error state.
Declaring dependencies with uv
Section titled “Declaring dependencies with uv”For anything beyond the Python standard library and dreadnode itself, ship the worker as a self-contained PEP 723 script and let uv resolve dependencies at spawn. This is the recommended pattern for Python subprocess workers — no shared venv to manage, dependencies live next to the code, and the same script runs identically in local dev and a sandbox.
workers: notifier: command: uv args: ['run', '${CAPABILITY_ROOT}/workers/notifier.py']# /// script# requires-python = ">=3.11"# dependencies = [# "dreadnode>=2.0,<3.0",# "httpx>=0.27",# ]# ///
from dreadnode.capabilities.worker import Worker, EventEnvelope, RuntimeClient
worker = Worker(name="notifier")
# ... handlers ...
if __name__ == "__main__": worker.run()uv run reads the /// script block, provisions an isolated environment on first spawn (cached across restarts), and execs the script. On subsequent spawns the environment is reused unless the dependency list changes.
Prefer this over declaring dependencies.python in the manifest for anything a subprocess owns — dependencies.python is sandbox-only (see Dependencies), but a PEP 723 script works the same locally and in a sandbox.
Non-Python subprocess workers
Section titled “Non-Python subprocess workers”Point command: at any executable. The runtime spawns it with the capability’s flag variables, your declared env:, and the runtime-connection variables (below). Your executable talks to the runtime over HTTP + WebSocket in whatever language you like.
The minimum contract:
- Read
DREADNODE_RUNTIME_URLandDREADNODE_RUNTIME_TOKENfrom the environment on startup. - Send
Authorization: Bearer <token>on every HTTP request and on the WebSocket handshake. - Handle
SIGTERM; the runtime waits 5 seconds before escalating toSIGKILL.
The endpoints that cover most worker use cases:
| Endpoint | Purpose |
|---|---|
POST /api/events | Publish a runtime-scope event. Body: {"kind": str, "payload": {}}. |
POST /api/sessions/{session_id}/events | Publish a session-scoped event. |
POST /api/events with kind: "notify" | Push a TUI notification. Payload: {source, title, body, severity}. |
GET /api/runtime | Read runtime health — capabilities, MCP, workers, with their states. |
GET /api/sessions | List active sessions. |
Reserved kind prefixes (turn., prompt., session., transport., capabilities., component.) are rejected at ingress — use your own prefix (for example capability.<name>.<event>) for events you emit.
See the Worker API reference for the full client surface. If the same code later wants to run in-process, write it in Python and use worker.run() instead — you get handler decorators for free.
Lifecycle
Section titled “Lifecycle”Workers move through a small state machine. The TUI capability manager exposes the current state — a crashed subprocess surfaces inline next to the worker name:

| State | When |
|---|---|
loading | Runtime is importing the module or preparing the subprocess |
starting | on_startup handlers are running, or the subprocess is spawning |
running | Handlers are dispatched normally; the subprocess is alive |
stopping | on_shutdown handlers are running, or the subprocess received SIGTERM |
stopped | Clean exit (including on_shutdown exceptions — error is attached to health) |
error | Startup failed, all @worker.task handlers crashed, or subprocess exited non-zero |
gated_off | when: predicate evaluated false — the worker was never started |
On capability reload
Section titled “On capability reload”When a capability reloads (operator toggles a flag in the TUI, the CLI pushes a new version, the runtime re-discovers on-disk changes), every worker it owns is stopped through the full stopping sequence — on_shutdown handlers run, subprocesses receive SIGTERM then SIGKILL after 5 seconds. The worker is then re-loaded against the updated manifest with gates re-evaluated. worker.state does not survive a reload.
Restart semantics
Section titled “Restart semantics”The runtime does not auto-restart a subprocess worker that exits with a non-zero code. It transitions to error and stays there until an operator restarts it from the TUI capability manager or a peer worker calls client.restart_worker(capability, worker_name). In-process @worker.task handlers do auto-restart with backoff — only the worker-as-a-whole stays down. A gated_off worker cannot be restarted until you flip the controlling flag.
Subprocess environment
Section titled “Subprocess environment”Subprocess workers receive environment variables from four layers, composed in this order (later wins):
- The inherited
os.environof the runtime process —PATH,HOME,SSL_CERT_FILE, plus anything the operator exported. - The capability’s flag variables — one
CAPABILITY_FLAG__<CAP>__<FLAG>per declared flag, value1or0. - Your manifest
env:entries. - The runtime-connection variables —
DREADNODE_RUNTIME_URL,DREADNODE_RUNTIME_TOKEN,DREADNODE_RUNTIME_ID. Authoritative: setting these in manifestenv:is a parse-time error.
In practice, printenv inside a subprocess worker looks like:
PATH=/usr/local/bin:/usr/bin:... # inheritedHOME=/Users/operator # inheritedCAPABILITY_ROOT=/Users/operator/.dreadnode/capabilities/bridgeCAPABILITY_FLAG__BRIDGE__RELAY_ENABLED=1LOG_LEVEL=info # from manifest env:DREADNODE_RUNTIME_URL=http://127.0.0.1:8787 # runtimeDREADNODE_RUNTIME_TOKEN=... # runtimeDREADNODE_RUNTIME_ID=... # runtimeCAPABILITY_ROOT is set to the absolute path of the capability directory and is also the working directory for the subprocess. Use ${CAPABILITY_ROOT} in command, args, or env: values to reference files inside the capability. See environment variables for the full catalog.
Subprocess worker stdout and stderr are merged and written to ~/.dreadnode/logs/worker-{capability}-{worker_name}.log. On every start the previous file is rotated to .log.prev — one level of history, no unbounded archive. The TUI capability detail panel shows the last 200 lines with the tail visible while the worker is alive, and the last 20 lines are attached to the error message when the subprocess exits non-zero. GET /api/workers/{cap}/{worker} returns the absolute path so you can open it by hand.
State and concurrency
Section titled “State and concurrency”worker.state is a plain dict shared across every handler in the worker. Multiple on_event handlers for the same kind, @every schedules, and @task loops all run on the same event loop and will interleave across await points. Guard any non-trivial shared mutation with an asyncio.Lock:
import asyncio
@worker.on_startupasync def init(client: RuntimeClient) -> None: worker.state["lock"] = asyncio.Lock() worker.state["seen"] = set()
@worker.on_event("turn.completed")async def dedupe(event: EventEnvelope, client: RuntimeClient) -> None: async with worker.state["lock"]: if event.payload["turn_id"] in worker.state["seen"]: return worker.state["seen"].add(event.payload["turn_id"]) await forward(event)Driving agents from a worker
Section titled “Driving agents from a worker”Workers have the full runtime client, so an event handler can open a session and run a turn. This is the pattern for acting on external signals: a webhook arrives, a worker picks it up, and a fresh agent session handles the decision.
@worker.on_event("capability.bridge.callback_received")async def triage(event: EventEnvelope, client: RuntimeClient) -> None: session = await client.create_session( capability="bridge", agent="triage", session_id=f"callback-{event.payload['callback_id']}", # idempotent ) async for _ in client.stream_chat( session_id=session.session_id, message=f"Investigate callback: {event.payload}", ): pass # discard stream — the turn runs to completion regardlesscreate_session is idempotent on session_id, which makes “one session per external entity” trivial. stream_chat returns an async iterator of events; the turn runs to completion whether or not the iterator is drained. See the Worker API reference for the full session and turn surface.
Testing workers
Section titled “Testing workers”Worker can be driven without the runtime — useful for unit tests over handler logic. Register handlers as normal, construct your own RuntimeClient (or a fake that implements the methods your handlers call), and dispatch events directly:
import pytestfrom workers.bridge import worker
@pytest.mark.asyncioasync def test_forward_on_turn_completed(fake_client, fake_ws): worker.state["ws"] = fake_ws envelope = make_envelope(kind="turn.completed", payload={"turn_id": "t1"})
for handler in worker._event_handlers["turn.completed"]: await handler(envelope, fake_client)
assert fake_ws.sent == [{"turn_id": "t1"}]For end-to-end coverage — startup, schedule, shutdown — drive the full runner against a stop event. See Worker._run_until in the SDK source for the lifecycle harness used by the framework’s own tests.
RuntimeClient
Section titled “RuntimeClient”Every handler receives a RuntimeClient — the worker’s channel back to the runtime. Use it to publish custom events, push notifications into the TUI, subscribe to event streams, drive agent turns, and inspect runtime state. See the Worker API reference for the full method surface.