Rigging’s pipelines (ChatPipeline, CompletionPipeline) offer powerful ways to control the generation flow and process results. This includes passive monitoring, adding post-processing steps, and even creating complex iterative generation loops.

Watch Callbacks

The simplest way to observe the pipeline is using watch callbacks. These are passive listeners that receive Chat or Completion objects as they are finalized within a pipeline run, but they don’t modify the pipeline’s execution. They are ideal for logging, monitoring, or triggering external actions based on generated content.

Register watch callbacks using the .watch() method on Generators, Pipelines, or Prompts. Rigging also provides pre-built watchers in the rigging.watchers module.

  • Generator.watch()
  • ChatPipeline.watch()
  • CompletionPipeline.watch()
  • Prompt.watch()
import rigging as rg

# Use a pre-built watcher to log chats to a file
log_to_file = rg.watchers.write_chats_to_jsonl("chats.jsonl")

# Define a custom async watch callback
async def print_chat_ids(chats: list[rg.Chat]) -> None:
    print(f"Watched {len(chats)} chats: {[chat.uuid for chat in chats]}")

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Explain why the sky is blue")
    .watch(log_to_file, print_chat_ids) # Register multiple watchers
)

# Watchers will be called during the run_many execution
chats = await pipeline.run_many(3)

Controlling Flow with then and map Callbacks

To actively modify chats or influence the generation process after a generation step completes, use the then() and map() callback methods.

  • ChatPipeline.then(): Processes each Chat object individually
  • ChatPipeline.map(): Processes a list[Chat] objects all at once (useful for batch operations)
  • These methods can also be called directly on Prompt objects

Basic Post-Processing

The simplest use case is to modify a chat after generation. Your callback receives the Chat (for then) or list[Chat] (for map) and can return the modified chat(s) or None (for then) to keep the original.

import rigging as rg

# Example: Add metadata based on content
async def add_sentiment_tag(chat: rg.Chat) -> rg.Chat | None:
    content = chat.last.content.lower()
    if "positive" in content:
        chat.meta(sentiment="positive")
    elif "negative" in content:
        chat.meta(sentiment="negative")
    # Return the modified chat (or None to keep original)
    return chat

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Generate a positive sentence.")
    .then(add_sentiment_tag)
)
chat = await pipeline.run()

print(chat.metadata.get("sentiment"))
# > positive

Iterative Generation and Validation

Callbacks can also drive further generation steps, enabling complex validation loops, conditional branching, or agent-like behavior.

To achieve this, a then or map callback can do either of the following:

  1. Return or yield a PipelineStepGenerator or PipelineStepContextManager. This is typically done by calling .step() on a new or restarted pipeline.
  2. Call run() or derivatives like run_many() or run_batch() directly inside the callback to execute new generation steps and return the final result.

Option 1 is generally preferred as it allows for more control over iterative pipeline execution when generations are nested. Without this, the pipelines above won’t be able to properly track the depth of the nested calls.

import rigging as rg
from rigging.chat import PipelineStepContextManager

# Example: Ensure the model mentions a specific animal
TARGET_ANIMAL = "cat"

async def ensure_animal_mentioned(chat: rg.Chat) -> PipelineStepContextManager | None:
    if TARGET_ANIMAL in chat.last.content.lower():
        return None # Condition met, stop iterating

    # Condition not met, ask the model to try again
    print(f"-> Assistant didn't mention '{TARGET_ANIMAL}', asking for revision.")
    follow_up_pipeline = chat.continue_(f"Please revise your previous response to include the animal '{TARGET_ANIMAL}'.")

    # Return the context manager from .step() to trigger another generation round
    return follow_up_pipeline.step()

# Limit recursion depth to prevent infinite loops
MAX_RECURSION = 3

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Tell me a short story about an animal.")
    .then(ensure_animal_mentioned, max_depth=MAX_RECURSION) # Control recursion depth
)

final_chat = await pipeline.run()

print("\n--- Final Conversation ---")
print(final_chat.conversation)

if TARGET_ANIMAL in final_chat.last.content.lower():
    print(f"\nSuccess: Final response mentions '{TARGET_ANIMAL}'.")
else:
    print(f"\nFailed: Final response did not mention '{TARGET_ANIMAL}' after {MAX_RECURSION} attempts.")

  • Recursion Control: The max_depth parameter on then() and map() is crucial. It limits how many nested pipeline steps can be triggered from within a callback, preventing infinite loops. If this depth is exceeded, a MaxDepthError is raised (or handled based on on_failed mode).

Output Parsing and Validation

A common use case for iterative generation is ensuring the model’s output successfully parses into a specific Model. Rigging provides the convenient ChatPipeline.until_parsed_as() method for this.

Internally, this method uses the then callback mechanism described above, attempting to parse the required model(s) and triggering regeneration with validation feedback if parsing fails.

import rigging as rg
from rigging.model import YesNoAnswer

# Define the desired output model
pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat(f"Answer yes or no. Is the sky blue? Respond within {YesNoAnswer.xml_tags()} tags.")
    # Ensure the output parses as YesNoAnswer, retrying up to 3 times if needed
    .until_parsed_as(YesNoAnswer, max_depth=3)
)

chat = await pipeline.run()

if not chat.failed:
    parsed_answer = chat.last.parse(YesNoAnswer)
    print(f"Parsed answer: {parsed_answer.boolean}")
else:
    print(f"Failed to get a valid YesNoAnswer after multiple attempts. Error: {chat.error}")

  • Parameter Change: Note that max_rounds from v2 is replaced by max_depth. The attempt_recovery and drop_dialog parameters are removed as recovery is implicit and dialog is preserved.

Prompt Caching

Rigging has prompt caching with supported providers to save tokens and reduce API costs. This feature lets you mark parts of a conversation with cache markers to communicate with the inference provider about where caching is best applied.

Under the hood, caching works through cache_control entries on messages, which signal to the generator which parts of a conversation are eligible for caching. Rigging manages these entries automatically when you enable caching.

We do our best to gracefully handle cases where the underlying provider does not support caching, but some interactions might fail with these cache_control markers in place. Providers are also still defining how caching should work, and these mechanics may change in the future.

Enabling Caching

Caching can be enabled at two levels:

  1. Pipeline level - Apply caching policy to an entire conversation:
import rigging as rg

pipeline = (
    rg.get_generator("claude-3-7-sonnet-latest")
    .chat([
        Message(role="system", content="You are a helpful assistant."),
        Message(role="user", content="What is machine learning?")
    ])
    .cache("latest")  # Or just .cache() as "latest" is the default
)

# Run the pipeline with caching enabled
await rg.interact(pipeline)
  1. Message level - Control caching for individual messages:
import rigging as rg
from rigging import Message

message = Message(role="user", content="Please explain this document: [document]")

# Set `cache_control` to {"type": "ephemeral"}
message.cache()

# Or a custom cache control settings
message.cache({"type": "custom"})

chat = (
    await
    rg.get_generator("claude-3-7-sonnet-latest")
    .chat(message)
    .run()
)

Caching Modes

Currently, Rigging supports one caching mode for pipelines:

  • latest (default): Applies cache_control markers to the last 2 non-assistant messages before inference. This is effective for long-running conversation and want to both re-use the latest cache entry and establish a new one at every step.

Disabling Caching

You can disable caching by passing False to the cache method on pipelines or messages.

# At message level
message.cache(False)

# At pipeline level
pipeline.cache(False)

Fine-grained Step Control

For maximum control and introspection, you can use the ChatPipeline.step() (or step_many, step_batch) async context manager. This yields PipelineStep objects representing each stage of the execution (generated, callback, final).

This allows you to examine intermediate states, inject custom logic between steps, or build highly complex generation flows beyond the standard callback system.

import rigging as rg

pipeline = rg.get_generator("openai/gpt-4o-mini").chat("Hello there!")

print("Stepping through pipeline execution:")
async with pipeline.step() as steps:
    async for step in steps:
        print(f"- Step State: {step.state}")
        print(f"  Chats ({len(step.chats)}): {[chat.uuid for chat in step.chats]}")
        if step.callback:
            print(f"  Callback: {rg.util.get_qualified_name(step.callback)}")
        if step.parent:
            print(f"  Parent Pipeline ID: {id(step.parent.pipeline)}")
        # Add custom logic here based on step state or content
print("Pipeline finished.")

Anywhere you see max_depth as a parameter, we are using this context manager under the hood to understand how many recursive generations are occuring.

Handling Failures

Pipelines provide robust ways to handle errors during generation or callback execution. You might want to avoid certain errors from halting the entire pipeline, especially in batch processing scenarios.

Setting Error Behavior

The on_failed parameter (set with .catch() or on run_many/run_batch/run_over) determines behavior when a catchable error occurs:

  • raise (Default): The exception is raised, halting execution.
  • skip: The chat where the error occurred is discarded. run_many/run_batch/run_over will return only the successful chats. (Not valid for single .run()).
  • include: The chat is marked with .failed = True and the exception stored in .error. The chat is included in the results, but may be incomplete or invalid.
import rigging as rg

pipeline = (
    rg.get_generator("openai/gpt-4o-mini")
    .chat("Potentially problematic prompt")
    .catch(on_failed="include")
)

# Example: run_many might succeed for some, fail for others
chats = await pipeline.run_many(5)

successful_chats = [c for c in chats if not c.failed]
failed_chats = [c for c in chats if c.failed]

print(f"Succeeded: {len(successful_chats)}, Failed: {len(failed_chats)}")
for failed_chat in failed_chats:
    print(f"  Error in chat {failed_chat.uuid}: {failed_chat.error}")

Defining Catchable Errors

By default, pipelines catch critical internal errors like ValidationError and MaxDepthError when on_failed is skip or include. You can specify additional exception types to be treated as non-fatal errors using ChatPipeline.catch().

import rigging as rg
# Assume SomeCustomAPIError exists
from some_api import SomeCustomAPIError

pipeline = (
    rg.get_generator(...)
    .chat(...)
    # Treat SomeCustomAPIError as a non-fatal error
    .catch(SomeCustomAPIError, on_failed="skip") 
)

# Now, if SomeCustomAPIError occurs, the chat will be skipped instead of raising
chats = await pipeline.run_many(10)