Skip to main content
Rigging’s pipelines (ChatPipeline, CompletionPipeline) offer powerful ways to control the generation flow and process results. This includes passive monitoring, adding post-processing steps, and even creating complex iterative generation loops.

Watch Callbacks

The simplest way to observe the pipeline is using watch callbacks. These are passive listeners that receive Chat or Completion objects as they are finalized within a pipeline run, but they don’t modify the pipeline’s execution. They are ideal for logging, monitoring, or triggering external actions based on generated content. Register watch callbacks using the .watch() method on Generators, Pipelines, or Prompts. Rigging also provides pre-built watchers in the rigging.watchers module.
  • Generator.watch()
  • ChatPipeline.watch()
  • CompletionPipeline.watch()
  • Prompt.watch()
import rigging as rg

# Use a pre-built watcher to log chats to a file
log_to_file = rg.watchers.write_chats_to_jsonl("chats.jsonl")

# Define a custom async watch callback
async def print_chat_ids(chats: list[rg.Chat]) -> None:
    print(f"Watched {len(chats)} chats: {[chat.uuid for chat in chats]}")

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Explain why the sky is blue")
    .watch(log_to_file, print_chat_ids) # Register multiple watchers
)

# Watchers will be called during the run_many execution
chats = await pipeline.run_many(3)

Controlling Flow with then and map Callbacks

To actively modify chats or influence the generation process after a generation step completes, use the then() and map() callback methods.
  • ChatPipeline.then(): Processes each Chat object individually
  • ChatPipeline.map(): Processes a list[Chat] objects all at once (useful for batch operations)
  • These methods can also be called directly on Prompt objects

Basic Post-Processing

The simplest use case is to modify a chat after generation. Your callback receives the Chat (for then) or list[Chat] (for map) and can return the modified chat(s) or None (for then) to keep the original.
import rigging as rg

# Example: Add metadata based on content
async def add_sentiment_tag(chat: rg.Chat) -> rg.Chat | None:
    content = chat.last.content.lower()
    if "positive" in content:
        chat.meta(sentiment="positive")
    elif "negative" in content:
        chat.meta(sentiment="negative")
    # Return the modified chat (or None to keep original)
    return chat

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Generate a positive sentence.")
    .then(add_sentiment_tag)
)
chat = await pipeline.run()

print(chat.metadata.get("sentiment"))
# > positive

Iterative Generation and Validation

Callbacks can also drive further generation steps, enabling complex validation loops, conditional branching, or agent-like behavior. To achieve this, a then or map callback can do either of the following:
  1. Return or yield a PipelineStepGenerator or PipelineStepContextManager. This is typically done by calling .step() on a new or restarted pipeline.
  2. Call run() or derivatives like run_many() or run_batch() directly inside the callback to execute new generation steps and return the final result.
Option 1 is generally preferred as it allows for more control over iterative pipeline execution when generations are nested. Without this, the pipelines above won’t be able to properly track the depth of the nested calls.
import rigging as rg
from rigging.chat import PipelineStepContextManager

# Example: Ensure the model mentions a specific animal
TARGET_ANIMAL = "cat"

async def ensure_animal_mentioned(chat: rg.Chat) -> PipelineStepContextManager | None:
    if TARGET_ANIMAL in chat.last.content.lower():
        return None # Condition met, stop iterating

    # Condition not met, ask the model to try again
    print(f"-> Assistant didn't mention '{TARGET_ANIMAL}', asking for revision.")
    follow_up_pipeline = chat.continue_(f"Please revise your previous response to include the animal '{TARGET_ANIMAL}'.")

    # Return the context manager from .step() to trigger another generation round
    return follow_up_pipeline.step()

# Limit recursion depth to prevent infinite loops
MAX_RECURSION = 3

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Tell me a short story about an animal.")
    .then(ensure_animal_mentioned, max_depth=MAX_RECURSION) # Control recursion depth
)

final_chat = await pipeline.run()

print("\n--- Final Conversation ---")
print(final_chat.conversation)

if TARGET_ANIMAL in final_chat.last.content.lower():
    print(f"\nSuccess: Final response mentions '{TARGET_ANIMAL}'.")
else:
    print(f"\nFailed: Final response did not mention '{TARGET_ANIMAL}' after {MAX_RECURSION} attempts.")

  • Recursion Control: The max_depth parameter on then() and map() is crucial. It limits how many nested pipeline steps can be triggered from within a callback, preventing infinite loops. If this depth is exceeded, a MaxDepthError is raised (or handled based on on_failed mode).

Output Parsing and Validation

A common use case for iterative generation is ensuring the model’s output successfully parses into a specific Model. Rigging provides the convenient ChatPipeline.until_parsed_as() method for this. Internally, this method uses the then callback mechanism described above, attempting to parse the required model(s) and triggering regeneration with validation feedback if parsing fails.
import rigging as rg
from rigging.model import YesNoAnswer

# Define the desired output model
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Answer yes or no. Is the sky blue? Respond within {YesNoAnswer.xml_tags()} tags.")
    # Ensure the output parses as YesNoAnswer, retrying up to 3 times if needed
    .until_parsed_as(YesNoAnswer, max_depth=3)
)

chat = await pipeline.run()

if not chat.failed:
    parsed_answer = chat.last.parse(YesNoAnswer)
    print(f"Parsed answer: {parsed_answer.boolean}")
else:
    print(f"Failed to get a valid YesNoAnswer after multiple attempts. Error: {chat.error}")

  • Parameter Change: Note that max_rounds from v2 is replaced by max_depth. The attempt_recovery and drop_dialog parameters are removed as recovery is implicit and dialog is preserved.

Prompt Caching

Rigging has prompt caching with supported providers to save tokens and reduce API costs. This feature lets you mark parts of a conversation with cache markers to communicate with the inference provider about where caching is best applied. Under the hood, caching works through cache_control entries on messages, which signal to the generator which parts of a conversation are eligible for caching. Rigging manages these entries automatically when you enable caching.
We do our best to gracefully handle cases where the underlying provider does not support caching, but some interactions might fail with these cache_control markers in place. Providers are also still defining how caching should work, and these mechanics may change in the future.

Enabling Caching

Caching can be enabled at two levels:
  1. Pipeline level - Apply caching policy to an entire conversation:
import rigging as rg

pipeline = (
    rg.get_generator("claude-3-7-sonnet-latest")
    .chat([
        Message(role="system", content="You are a helpful assistant."),
        Message(role="user", content="What is machine learning?")
    ])
    .cache("latest")  # Or just .cache() as "latest" is the default
)

# Run the pipeline with caching enabled
await rg.interact(pipeline)
  1. Message level - Control caching for individual messages:
import rigging as rg
from rigging import Message

message = Message(role="user", content="Please explain this document: [document]")

# Set `cache_control` to {"type": "ephemeral"}
message.cache()

# Or a custom cache control settings
message.cache({"type": "custom"})

chat = (
    await
    rg.get_generator("claude-3-7-sonnet-latest")
    .chat(message)
    .run()
)

Caching Modes

Currently, Rigging supports one caching mode for pipelines:
  • latest (default): Applies cache_control markers to the last 2 non-assistant messages before inference. This is effective for long-running conversation and want to both reuse the latest cache entry and establish a new one at every step.

Disabling Caching

You can disable caching by passing False to the cache method on pipelines or messages.
# At message level
message.cache(False)

# At pipeline level
pipeline.cache(False)

Fine-grained Step Control

For maximum control and introspection, you can use the ChatPipeline.step() (or step_many, step_batch) async context manager. This yields PipelineStep objects representing each stage of the execution (generated, callback, final). This allows you to examine intermediate states, inject custom logic between steps, or build highly complex generation flows beyond the standard callback system.
import rigging as rg

pipeline = rg.get_generator("openai/gpt-4o-mini").chat("Hello there!")

print("Stepping through pipeline execution:")
async with pipeline.step() as steps:
    async for step in steps:
        print(f"- Step State: {step.state}")
        print(f"  Chats ({len(step.chats)}): {[chat.uuid for chat in step.chats]}")
        if step.callback:
            print(f"  Callback: {rg.util.get_qualified_name(step.callback)}")
        if step.parent:
            print(f"  Parent Pipeline ID: {id(step.parent.pipeline)}")
        # Add custom logic here based on step state or content
print("Pipeline finished.")
Anywhere you see max_depth as a parameter, we are using this context manager under the hood to understand how many recursive generations are occurring.

Handling Failures

Pipelines provide robust ways to handle errors during generation, tool execution, parsing, and callback execution. Understanding the different levels of error handling helps you control when and how errors are surfaced versus handled gracefully.

Error Handling Hierarchy

Rigging has multiple layers of error handling, each with different scopes and behaviors:
  1. Tool-level error handling - Controls how individual tool call errors are handled
  2. Pipeline-level error handling - Controls how pipeline execution errors are handled
  3. Parsing-level error handling - Controls how model parsing errors are handled
Any exceptions not handled inside then or map callbacks will propagate up to the pipeline level, where they can be caught by the pipeline’s error handling.

Pipeline-Level Error Handling

The on_failed parameter determines behavior when catchable errors occur during pipeline execution:
  • raise (Default): The exception is raised, halting execution.
  • skip: The chat where the error occurred is discarded. run_many/run_batch/run_over will return only the successful chats. (Not valid for single .run()).
  • include: The chat is marked with .failed = True and the exception stored in .error. The chat is included in the results, but may be incomplete or invalid.
import rigging as rg

pipeline = (
    rg.get_generator("model-that-fails-often")
    .chat("Create an answer")
    .catch(on_failed="include")
)

# Example: run_many might succeed for some, fail for others
chats = await pipeline.run_many(5)

successful_chats = [c for c in chats if not c.failed]
failed_chats = [c for c in chats if c.failed]

print(f"Succeeded: {len(successful_chats)}, Failed: {len(failed_chats)}")
for failed_chat in failed_chats:
    print(f"  Error in chat {failed_chat.uuid}: {failed_chat.error}")

Defining Catchable Errors

By default, pipelines catch critical internal errors like ValidationError and MaxDepthError when on_failed is skip or include. You can specify additional exception types to be treated as non-fatal errors using ChatPipeline.catch().
import rigging as rg
# Assume SomeCustomAPIError exists
from some_api import SomeCustomAPIError

pipeline = (
    rg.get_generator(...)
    .chat(...)
    # Treat SomeCustomAPIError as a non-fatal error
    .catch(SomeCustomAPIError, on_failed="skip")
)

# Now, if SomeCustomAPIError occurs, the chat will be skipped instead of raising
chats = await pipeline.run_many(10)

Handling Tool Errors

By default, tools catch common errors like json.JSONDecodeError and ValidationError and return them as error messages rather than raising exceptions. This allows the conversation to continue with error feedback to the model.
import rigging as rg
from pydantic import ValidationError

@rg.tool()  # Default: catch={json.JSONDecodeError, ValidationError}
async def add(a: int, b: int) -> int:
    """Adds two numbers together."""
    return a + b

# The tool will gracefully handle validation errors
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Use the add tool with only argument 'a' set to 1, no 'b'")
    .using(add)
)

chat = await pipeline.run()
# Model receives error feedback and can retry or acknowledge the error
Important: Tool argument validation errors are handled at the tool level and cannot be caught by pipeline-level error handling without configuration. The error becomes part of the conversation flow as a tool response message. You can override tool error handling when adding tools to a pipeline:
# Let pipeline handle all tool errors
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Use the add tool incorrectly")
    .using(add, catch=False)  # Override: catch no errors
    .catch(on_failed="include")  # Now pipeline can catch them
)

# Or configure at tool definition
@rg.tool(catch=False)  # This tool won't catch any errors
async def strict_add(a: int, b: int) -> int:
    return a + b
When tools don’t catch errors, they flow up to the pipeline level where you can handle them with .catch(), otherwise the pipeline will raise the exception. See more in the tool documentation.

Handling Parsing Errors

When using until_parsed_as(), you can control whether parsing errors trigger regeneration or flow up to the pipeline. By default, parsing errors are caught and trigger a regeneration attempt (up to max_depth).
import rigging as rg
from rigging.model import YesNoAnswer

# Default: catch=True (parsing errors trigger regeneration)
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Answer yes or no within {YesNoAnswer.xml_tags()} tags.")
    .until_parsed_as(YesNoAnswer, max_depth=3)
)

# Alternative: let parsing errors flow to pipeline level
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Answer yes or no within {YesNoAnswer.xml_tags()} tags.")
    .until_parsed_as(YesNoAnswer, catch=False)  # Don't catch parsing errors
    .catch(on_failed="include")  # Handle at pipeline level
)

Error Metadata and Debugging

When errors are handled gracefully (rather than raised), Rigging attempts to preserve error information in message metadata for debugging:
# Tool errors become part of conversation with metadata
chat = await pipeline.run()
for message in chat.all:
    if message.metadata.get("failed"):
        print(f"Message failed: {message.metadata['error']}")

# Pipeline errors are stored on the chat object
if chat.failed:
    print(f"Chat failed: {chat.error}")