Skip to content

dreadnode.storage

API reference for the dreadnode.storage module.

AzureBlobCredentials(
account_name: str,
account_key: str | None = None,
sas_token: str | None = None,
connection_string: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
use_managed_identity: bool = False,
)

Azure Blob Storage / ADLS Gen2 credentials.

Supports multiple authentication methods:

  • Connection string
  • Account key
  • SAS token
  • Service principal (client credentials)
  • Managed identity (when running on Azure)
account_key: str | None = None

Storage account access key.

account_name: str

Azure storage account name.

client_id: str | None = None

Azure AD client/application ID.

client_secret: str | None = None

Azure AD client secret.

connection_string: str | None = None

Full connection string (overrides other auth).

sas_token: str | None = None

Shared Access Signature token.

tenant_id: str | None = None

Azure AD tenant ID for service principal auth.

use_managed_identity: bool = False

Use Azure Managed Identity for auth.

to_storage_options() -> dict[str, Any]

Convert to adlfs storage options.

GCSCredentials(
project: str | None = None,
token: str | None = None,
access: str = "full_control",
use_anonymous: bool = False,
)

Google Cloud Storage credentials.

Supports multiple authentication methods:

  • Service account JSON key file
  • Service account JSON key content
  • Application Default Credentials (ADC)
  • Anonymous access (for public buckets)
access: str = 'full_control'

Access level: read_only, read_write, full_control.

project: str | None = None

GCP project ID.

token: str | None = None

Path to service account JSON key file, or the JSON content itself.

use_anonymous: bool = False

Use anonymous access (public buckets only).

to_storage_options() -> dict[str, Any]

Convert to gcsfs storage options.

MinioCredentials(
access_key_id: str,
secret_access_key: str,
session_token: str | None = None,
endpoint_url: str | None = None,
region: str | None = None,
)

MinIO credentials.

S3Credentials(
access_key_id: str,
secret_access_key: str,
session_token: str | None = None,
endpoint_url: str | None = None,
region: str | None = None,
)

AWS S3 / S3-compatible (R2, MinIO) credentials.

SessionStore(path: Path)

SQLite-backed session metadata and message index with FTS5 search.

first_user_message(
session_id: str, *, max_len: int = 200
) -> str | None

Return the content of the first user message in a session, truncated.

first_user_messages(
session_ids: list[str], *, max_len: int = 200
) -> dict[str, str]

Batch-fetch first user message for multiple sessions.

persist_session(
*,
session_id: str,
model: str,
project: str | None,
capability: str | None,
agent: str | None,
title: str | None,
created_at: datetime,
updated_at: datetime | None = None,
message_count: int = 0,
trajectory: dict[str, Any] | None = None,
messages: Sequence[Message] | None = None,
) -> None

Atomically persist session metadata and messages in one transaction.

Storage(
profile: Profile | None = None,
cache: Path | None = None,
api: ApiClient | None = None,
provider: StorageProvider | None = None,
*,
default_project: str | None = None,
)

Storage manager for local and remote storage.

Directory structure

~/.dreadnode/ packages/ datasets/ agents/ models/ tools/ environments/ capabilities/ / capability.yaml cas/ sha256/ ab/cd/… artifacts/ projects/ / / spans.jsonl metrics.jsonl sessions/ sessions.sqlite3 / chat_.json runtime_.json spans_.jsonl

When running in a sandbox, ~/.dreadnode is mounted via s3fs to the user’s workspace storage, with the S3 prefix already scoped to {org_id}/workspaces/{workspace_id}.

Create storage manager.

Parameters:

  • profile (Profile | None, default: None ) –Authenticated profile for RBAC context.
  • cache (Path | None, default: None ) –Root cache directory. Defaults to ~/.dreadnode.
  • api (ApiClient | None, default: None ) –API client for remote operations (blob credentials + registry uploads).
  • provider (StorageProvider | None, default: None ) –Storage provider for remote operations (s3, r2, minio).
  • default_project (str | None, default: None ) –Default project key.
api: ApiClient | None

Get the API client.

artifacts_path: Path

Path to artifacts CAS.

can_sync: bool

Whether remote sync is possible (has API client and profile).

capabilities_path: Path

Path to capabilities directory.

cas_path: Path

Path to CAS directory.

local_capability_state_path: Path

Path to persisted local capability state.

oci_registry_url: str

Get the OCI Distribution v2 registry URL.

packages_path: Path

Path to packages directory.

profile: Profile | None

Get the current profile.

project_key: str

Get the project key.

project_path: Path

Path to current project directory.

projects_path: Path

Path to projects directory.

remote_bucket: str

Get the remote storage bucket from credentials.

remote_prefix: str

Get the remote storage prefix from credentials.

session_db_path: Path

Path to the local SQLite session index.

session_store: SessionStore

Lazy SQLite-backed session metadata and message store.

sessions_path: Path

Path to sessions directory.

workspace_capabilities_path: Path

Path to workspace capability cache directory (CAP-LOAD-007).

artifact_blob_path(oid: str) -> Path

Path to artifact blob in workspace CAS.

blob_exists(oid: str) -> bool

Check if blob exists in local CAS.

blob_path(oid: str) -> Path

Path to blob in CAS.

download_blob(oid: str) -> Path

Download blob from remote to local CAS.

download_blobs(
oids: list[str], *, skip_existing: bool = True
) -> tuple[int, int]

Download multiple blobs from remote storage.

Parameters:

  • oids (list[str]) –Object IDs to download.
  • skip_existing (bool, default: True ) –Skip blobs that already exist locally.

Returns:

  • tuple[int, int] –Tuple of (downloaded_count, skipped_count).
get_artifact(oid: str) -> Path

Get artifact from workspace CAS, downloading if needed.

get_blob(oid: str) -> Path

Get blob from local CAS.

get_manifest(
package_type: PackageType, name: str, version: str
) -> str

Get manifest.json content.

hash_files(
paths: list[Path], algo: str = "sha256"
) -> dict[Path, str]

Compute hashes for multiple files.

Parameters:

  • paths (list[Path]) –Files to hash.
  • algo (str, default: 'sha256' ) –Hash algorithm.

Returns:

  • dict[Path, str] –Mapping of path to hash.
latest_version(
package_type: PackageType, name: str
) -> str | None

Get latest version.

list_local_runs() -> list[str]

List locally cached run IDs for the current project.

list_versions(
package_type: PackageType, name: str
) -> list[str]

List available versions.

manifest_exists(
package_type: PackageType, name: str, version: str
) -> bool

Check if manifest exists.

manifest_path(
package_type: PackageType, name: str, version: str
) -> Path

Path to manifest.json.

oci_client() -> OCIRegistryClient

Create an OCI registry client for push/pull operations.

package_path(
package_type: PackageType,
name: str,
version: str | None = None,
) -> Path

Path to package directory.

Returns: ~/.dreadnode/packages///[version/]

remote_artifact_path(oid: str) -> str

Remote path for artifact blob.

remote_blob_exists(oid: str) -> bool

Check if blob exists in remote storage.

remote_blob_path(oid: str) -> str

Remote path for blob (includes bucket for s3fs).

resolve(
uri: str, **storage_options: Any
) -> tuple[AbstractFileSystem, str]

Resolve URI to filesystem and path.

run_path(run_id: str | UUID) -> Path

Path to run directory for trace data.

session_chat_path(
session_id: str | UUID, ext: str = "json"
) -> Path

Path to a session chat file.

Parameters:

  • session_id (str | UUID) –The session identifier.
  • ext (str, default: 'json' ) –File extension without the leading dot.

Returns:

  • Path –Full path to the session chat file.
session_path(session_id: str | UUID) -> Path

Path to a session directory.

session_runtime_path(
session_id: str | UUID, ext: str = "json"
) -> Path

Path to a session runtime manifest file.

Parameters:

  • session_id (str | UUID) –The session identifier.
  • ext (str, default: 'json' ) –File extension without the leading dot.

Returns:

  • Path –Full path to the session runtime manifest file.
session_spans_path(
session_id: str | UUID, ext: str = "jsonl"
) -> Path

Path to a session-scoped tracing file.

store_artifact(source: Path, *, upload: bool = True) -> str

Store artifact in workspace CAS and optionally upload to remote.

Parameters:

  • source (Path) –Path to the file to store.
  • upload (bool, default: True ) –Whether to upload to remote storage immediately.

Returns:

  • str –The oid (sha256:) of the stored artifact.
store_blob(oid: str, source: Path) -> Path

Store blob in local CAS.

store_manifest(
package_type: PackageType,
name: str,
version: str,
content: str,
) -> Path

Store manifest.json.

trace_path(
run_id: str | UUID, filename: str = "spans.jsonl"
) -> Path

Path to trace file within a run directory.

Parameters:

  • run_id (str | UUID) –The run identifier.
  • filename (str, default: 'spans.jsonl' ) –Full filename with extension (e.g., ‘spans.jsonl’, ‘spans.parquet’).

Returns:

  • Path –Full path to the trace file.
upload_artifact(oid: str) -> None

Upload artifact from workspace CAS to remote storage.

upload_blob(oid: str) -> None

Upload blob from local CAS to remote.

upload_blobs(
files: dict[Path, str], *, skip_existing: bool = True
) -> tuple[int, int]

Upload multiple blobs to remote storage.

Parameters:

  • files (dict[Path, str]) –Mapping of local path to oid.
  • skip_existing (bool, default: True ) –Skip blobs that already exist remotely.

Returns:

  • tuple[int, int] –Tuple of (uploaded_count, skipped_count).
from_provider(
provider: StorageProvider,
credentials: dict[str, Any] | None = None,
) -> AbstractFileSystem

Create filesystem from provider and credentials.

Parameters:

  • provider (StorageProvider) –Storage provider type.
  • credentials (dict[str, Any] | None, default: None ) –Provider-specific credentials dict.

Returns:

  • AbstractFileSystem –Configured filesystem instance.

Raises:

  • ValueError –If provider is unsupported or credentials missing.
  • ImportError –If required package not installed.