Skip to main content
Runs give you complete visibility into your research workflows, from model training to security assessments to agent evaluations. Every run captures the full context of your work—whether you’re training models, testing security vulnerabilities, running agent workflows, or evaluating performance across multiple tasks. Think of runs as your research sessions. Each run contains all the data, metrics, and artifacts from a complete operation, making it easy to reproduce results and share findings with your team.

Basic Usage

Let’s start with the simplest way to create a run: The most common way to create a run is using the context manager syntax:
import dreadnode as dn

dn.configure()

with dn.run("my-work"):
    # Everything in this block is tracked
    model = train_model(training_data)
    dn.log_metric("accuracy", model.evaluate())
The run starts when you enter the with block and ends when you exit. All data you log—inputs, outputs, metrics, artifacts—gets automatically associated with this run.

Naming Your Runs

Use descriptive names to identify your security research sessions:
Named runs
with dn.run("transformer-training"):
    train_transformer_model()

with dn.run("agent-evaluation"):
    test_agent_performance()

with dn.run("vulnerability-scan"):
    scan_for_vulnerabilities()
You’ll find prefixed names particularly useful when running multiple related experiments or when iterating on different approaches.

Organizing with Tags

Tags help you categorize and filter your research. Add them when creating runs or dynamically during execution:
Model Training
with dn.run("llm-training", tags=["transformer", "fine-tuning", "production"]):

    # Add tags based on training progress
    if convergence_detected:
        dn.tag("converged")

    # Tag based on results
    if validation_score > threshold:
        dn.tag("high-performance")
        dn.tag("deploy-candidate")
Tags make it easy to:
  • Find related work: Group all model training runs or filter by experiment type
  • Track methodologies: Separate different training approaches or evaluation methods
  • Organize by performance: Tag high-performing models or successful experiments
  • Filter exports: Export only runs from specific projects or time periods

Project Organization

Runs belong to projects—your way of organizing different research areas or engagements:
# Organize runs by research area
with dn.run("transformer-training", project="large-language-models"):
    train_transformer(dataset)

with dn.run("agent-evaluation", project="reasoning-agents"):
    evaluate_agent_performance(test_suite)

with dn.run("vulnerability-scan", project="security-assessment"):
    scan_application_security(target_app)
If you don’t specify a project, runs use the default project from dn.configure() or get placed in “Default”.

Common Execution Patterns

You can execute runs independently or in parallel depending on your research needs.

Sequential Testing

Run multiple independent experiments to compare different approaches:
Hyperparameter Search
# Test different model configurations
configs = [
    {"learning_rate": 0.001, "batch_size": 32},
    {"learning_rate": 0.01, "batch_size": 64},
    {"learning_rate": 0.1, "batch_size": 128}
]

for i, config in enumerate(configs):
    with dn.run(f"training-run-{i+1}"):
        dn.log_params(**config)
        model = train_model(**config)
        accuracy = evaluate_model(model)
        dn.log_metric("accuracy", accuracy)
        if accuracy > best_threshold:
            dn.log_output("model_checkpoint", model.state_dict())
Each run captures separate test results with complete isolation.

Parallel Execution

For efficient experimentation, run multiple operations simultaneously:
import asyncio

async def train_model_variant(config):
    with dn.run(f"model-{config['architecture']}"):
        dn.log_params(**config)
        model = await async_train_model(**config)
        metrics = await evaluate_model(model)
        dn.log_metrics({
            "accuracy": metrics.accuracy,
            "training_time": metrics.duration
        })
        return metrics

# Define different model architectures
configs = [
    {"architecture": "transformer", "layers": 12, "hidden_size": 768},
    {"architecture": "lstm", "layers": 3, "hidden_size": 512},
    {"architecture": "cnn", "filters": 128, "kernel_size": 3}
]

# Train all variants simultaneously
results = await asyncio.gather(*[train_model_variant(config) for config in configs])
This approach is essential for large-scale hyperparameter searches and model comparisons.

Distributed Operations

For complex workflows spanning multiple systems, you can transfer run context across distributed environments:
import dreadnode as dn

# On the coordination system
with dn.run("distributed-training") as run:
    dn.log_params(model_type="transformer", dataset="large-corpus")

    # Capture run context for worker systems
    context = dn.get_run_context()

    # Distribute context to training workers
    # (via message queue, API calls, shared storage, etc.)
    dispatch_to_workers(context, data_shards)

# On remote training workers
def worker_training(run_context, data_shard):
    # Continue the coordinated run
    with dn.continue_run(run_context):
        # All training metrics associate with the main run
        results = train_on_shard(data_shard)
        dn.log_metric("shard_loss", results.loss)
        dn.log_output("shard_weights", results.weights)
The get_run_context() function captures essential state including:
  • Run ID and metadata
  • OpenTelemetry trace context for distributed tracing
  • Project association
This enables advanced research patterns:
  • Distributed training: Coordinate model training across multiple GPUs/nodes
  • Containerized workflows: Move runs between Docker containers or Kubernetes pods
  • Cloud deployments: Continue runs across different cloud instances
  • Agent coordination: Parallel execution across multiple reasoning agents
  • Multi-stage pipelines: Track complex workflows across different systems
# Example: Distributed hyperparameter search
import asyncio
from concurrent.futures import ProcessPoolExecutor

def training_worker(run_context, hyperparams):
    """Worker that trains model with specific hyperparameters"""
    with dn.continue_run(run_context):
        # Train model with these hyperparameters
        model = train_model(**hyperparams)
        accuracy = evaluate_model(model)

        dn.log_params(**hyperparams)
        dn.log_metric("accuracy", accuracy)
        dn.log_output("model_checkpoint", model.state_dict())
        return accuracy

async def distributed_search():
    with dn.run("hyperparameter-search") as run:
        context = dn.get_run_context()

        # Define search space
        param_combinations = [
            {"lr": 0.1, "batch_size": 32, "dropout": 0.1},
            {"lr": 0.01, "batch_size": 64, "dropout": 0.2},
            {"lr": 0.001, "batch_size": 128, "dropout": 0.3}
        ]

        # Run training in parallel across workers
        with ProcessPoolExecutor() as executor:
            futures = [
                executor.submit(training_worker, context, params)
                for params in param_combinations
            ]

            accuracies = [future.result() for future in futures]

        # All worker results automatically part of coordinated run
        dn.log_metric("best_accuracy", max(accuracies))

await distributed_search()

Real-time Monitoring

Strikes provides live visibility into your research operations. Updates are automatically batched and sent to the server, letting you monitor progress in real-time through the UI:
with dn.run("model-training"):
    for epoch in range(num_epochs):
        # Training step
        train_loss = train_epoch(model, train_loader)
        val_accuracy = validate_model(model, val_loader)

        # Data appears in UI automatically
        dn.log_metric("train_loss", train_loss, step=epoch)
        dn.log_metric("val_accuracy", val_accuracy, step=epoch)

        # Continue training - UI updates in background
You’ll see your training progress update automatically without any extra configuration.

Immediate Updates

For critical results that need immediate visibility, force an update:
with dn.run("model-evaluation"):
    for checkpoint in model_checkpoints:
        result = evaluate_checkpoint(checkpoint)

        if result.accuracy > best_so_far:
            dn.log_metric("new_best_accuracy", result.accuracy)
            dn.log_output("best_model", checkpoint.state_dict())

            # Push important milestones immediately
            dn.push_update()  # (1)!
  1. Forces immediate delivery of breakthrough results to your team

Error Handling

Runs automatically capture exceptions and mark failed operations. Handle errors gracefully to continue logging even when individual operations fail:
with dn.run("agent-evaluation"):
    for test_case in evaluation_suite:
        try:
            # Run agent on test case that might fail
            response = agent.process_task(test_case)
            dn.log_metric("task_success", response.success)
            if response.success:
                dn.log_output("agent_response", response.output)
        except TimeoutError:
            dn.log_metric("task_timeout", 1.0)
        except Exception as e:
            dn.log_metric("task_error", 1.0)
            dn.log_output("error_details", str(e))

Advanced Patterns

Task Hierarchy Analysis

Every run maintains relationships with its tasks—the individual operations executed within the run context:
with dn.run("model-pipeline") as run:
    # These tasks are tracked as part of the run
    processed_data = await data_preprocessing.run(raw_dataset)
    trained_model = await model_training.run(processed_data)
    eval_results = await model_evaluation.run(trained_model)

# Analyze the run's task execution
print(f"Pipeline included {len(run.tasks)} primary stages")

for task in run.tasks:
    print(f"Stage: {task.name}")
    print(f"Duration: {task.duration}ms")
    print(f"Subtasks: {len(task.tasks)}")
You can analyze all tasks recursively to understand operation success rates:
with dn.run("multi-model-evaluation") as run:
    await evaluate_model_suite(model_collection)

    # Get complete task hierarchy
    all_tasks = run.all_tasks
    print(f"Evaluation '{run.run_id}' executed {len(all_tasks)} total tasks")

    # Calculate success metrics
    successful_tasks = [t for t in all_tasks if not t.failed]
    success_rate = len(successful_tasks) / len(all_tasks) if all_tasks else 0.0

    dn.log_metric("evaluation_success_rate", success_rate)
    print(f"Overall success rate: {success_rate * 100:.1f}%")

Best Practices

Consistent naming makes your research searchable. Use patterns like {model}-{dataset}-{iteration} or {experiment_type}-{variant} for easy filtering.
  1. Use descriptive run names: "transformer-training-v2" instead of "experiment-1"
  2. Log parameters extensively: Track hyperparameters, data configurations, model architectures—anything that affects results
  3. Separate runs by scope: Create distinct runs for training, evaluation, and deployment phases
  4. Organize with projects: Group related work like "language-models-2024" or "security-research"
  5. Standardize metrics: Use consistent metric names across similar experiments to enable comparison
  6. Structure complex workflows: Use hierarchical tasks to organize multi-stage pipelines and maintain clear execution flow
Never log sensitive data like API keys, credentials, or personal information in run parameters or outputs. Use references, hashed identifiers, or environment variables instead.
I