Tasks are a fundamental building block in Strikes that help structure and track your code execution. Tasks are a very powerful primitive that exist inside runs and let you scope inputs, outputs, and metrics to a smaller unit of work. Tasks keep track of when and where they are called within eachother, and inside the run. You can write your code the way you’d like and Strikes will track the flow.

We’ll cover some advanced use cases, but using tasks works just like functions, and should feel familiar to any workflow framework you’ve used. You might use tasks to represent one of your agents, data-loading code, tool call, or the processing of a sample batch from a dataset.

What is a Task?

In Strikes, a task is a unit of work with:

  • A well-defined input/output contract
  • Tracing with execution time and relationships to other tasks
  • The ability to scope and report metrics
  • Storage for input and output objects

Creating Tasks

The most common way to create a task is by decorating a function:

import dreadnode as dn

@dn.task()
async def analyze_file(path: str) -> dict:
    """Analyze a file and return results."""
    # Your analysis code here
    return {"vulnerabilities": 2, "score": 0.85}

Once decorated, your function will automatically:

  • Track its execution time
  • Store its input arguments
  • Store its return value
  • Create spans in the OpenTelemetry trace

For when you need more flexible task boundaries or don’t want to refactor existing code, you can use the task span context manager:

import dreadnode as dn

with dn.run("my-experiment"):
    with dn.task_span("data-processing") as task:
        # Load data
        data = load_data()

        # Process data
        result = process_data(data)

        # Log the output manually
        task.log_output("processed_data", result)

This approach gives you more control over when the task starts and ends, and lets you manually log inputs and outputs.

Task Configuration

Tasks can be configured with several options:

@dn.task(
    name="File Analysis",          # Human-readable name (default: function name)
    label="file_analysis",         # Machine-readable label for grouping (default: function name)
    log_params=False,              # Do not log any arguments as parameters
    log_inputs=["path"],           # Log specific arguments as inputs (True for all, False for none)
    log_output=True,               # Log the return value as an output
    tags=["security", "static"],   # Tags to categorize this task later
    scorers=[score_vuln]           # Functions to score the output
)
async def analyze_file(path: str) -> dict:
    # ...

Working with Task Results

When you call a task, you either get the result of the task (task()) or a TaskSpan object (task.run()) that provides access to the task’s context, metrics, and output. You can get the raw TaskSpan object by calling .run() on the task.

import dreadnode as dn

@dn.task()
async def add(a: int, b: int) -> int:
    return a + b

with dn.run("math-operations"):
    # Call the task directly to get its return value
    result = await add(2, 3)
    print(result)  # 5

    # Call .run() to get the task span with more information
    span = await add.run(3, 4)
    print(span.output)  # 7
    print(span.span_id)  # unique span ID

Logging Data within Tasks

Within tasks, you can explicitly log data using several methods:

@dn.task()
async def process_document(doc_id: str) -> dict:
    # Log parameters (key-value pairs for configuration)
    dn.log_param("batch_size", 32)

    # Log input objects (structured data used by the task)
    document = fetch_document(doc_id)
    dn.log_input("document", document)

    # Log metrics (measurements of performance or behavior)
    dn.log_metric("document_size", len(document))

    # Process the document
    result = analyze_document(document)

    # Log output objects (results produced by the task)
    dn.log_output("analysis_result", result)

    return result

Data logged within a task is automatically associated with that task’s span, making it easy to track the flow of data through your system.

Task Execution Patterns

Tasks support several execution patterns to handle different workflows:

result1 = await task1()
result2 = await task2(result1)
result3 = await task3(result2)

Error Handling

Any task that raises an exception will be marked as failed in the UI. You can handle errors using the try_() and try_map() methods:

# Try to run a task, return None if it fails
result = await task.try_()

# Try to run a task multiple times, skip failures
results = await task.try_map(5)  # Run 5 times, return list of successes

Measuring Task Performance

One of the most powerful features of tasks is their ability to measure and track performance:

@dn.task()
async def classify_image(image_path: str) -> str:
    # Log a metric when something interesting happens
    dn.log_metric("image_loaded", 1)

    # Log metrics with values
    start = time.time()
    result = run_classification(image_path)
    duration = time.time() - start
    dn.log_metric("classification_time", duration)

    return result

When the task runs, the scorer will automatically evaluate the output and log a metric with the score.

Finding the Best Results

Tasks also provide methods to filter and sort results based on metrics:

# Run the task 10 times and get all results
spans = await task.map_run(10, input_data)

# Sort the results by their average metric value
sorted_spans = spans.sorted()

# Get the top 3 results
top_spans = spans.top_n(3)

# Get just the outputs of the top 3 results
top_outputs = spans.top_n(3, as_outputs=True)

This pattern is particularly useful for generative tasks where you want to generate multiple candidates and pick the best ones.

Understanding Labels

Every task in Strikes has both a name and a label:

@dn.task(
    name="Process Document",   # Human-readable display name
    label="process_document"   # Machine-readable identifier
)
async def process_document(doc_id: str) -> dict:
    # ...

How Labels Work

Labels play an important role in organizing and identifying metrics within your tasks, as outlined below:

  • Default Derivation: If you don’t specify a label, it’s automatically derived from the function name by converting it to lowercase and replacing spaces with underscores.
  • Label Usage: Labels are used internally to:
    • Prefix metrics logged within the task
    • Create namespaces for data organization
    • Enable filtering in the UI and exports

Label Impact on Data

The most important thing to understand about labels is how they affect metrics:

@dn.task(label="tokenize")
async def tokenize_text(text: str) -> list:
    # This metric is namespaced under "tokenize.token_count"
    dn.log_metric("token_count", len(tokens))
    return tokens

When this task logs a metric named token_count, that metric is:

  1. Stored with the task span as token_count
  2. Mirrored at the run level with the prefix tokenize.token_count

Best Practices

  1. Keep tasks focused: Each task should do one thing well, making it easier to trace and debug.
  2. Use meaningful names: Task names appear in the UI, so make them human-readable.
  3. Log relevant data: Be intentional about what you log as inputs, outputs, and metrics.
  4. Handle errors appropriately: Use try_run() and similar methods to handle task failures gracefully.
  5. Use tasks to structure your code: Tasks help create natural boundaries in your application.
  6. Combine with Rigging tools: Tasks work seamlessly with Rigging tools for LLM agents.