Pipelines
Control the generation process and react to outputs using callbacks and pipeline steps.
Rigging’s pipelines (ChatPipeline
, CompletionPipeline
) offer powerful ways to control the generation flow and process results. This includes passive monitoring, adding post-processing steps, and even creating complex iterative generation loops.
Watch Callbacks
The simplest way to observe the pipeline is using watch callbacks. These are passive listeners that receive Chat
or Completion
objects as they are finalized within a pipeline run, but they don’t modify the pipeline’s execution. They are ideal for logging, monitoring, or triggering external actions based on generated content.
Register watch callbacks using the .watch()
method on Generators, Pipelines, or Prompts. Rigging also provides pre-built watchers in the rigging.watchers
module.
Generator.watch()
ChatPipeline.watch()
CompletionPipeline.watch()
Prompt.watch()
Controlling Flow with then
and map
Callbacks
To actively modify chats or influence the generation process after a generation step completes, use the then()
and map()
callback methods.
ChatPipeline.then()
: Processes eachChat
object individuallyChatPipeline.map()
: Processes alist[Chat]
objects all at once (useful for batch operations)- These methods can also be called directly on
Prompt
objects
Basic Post-Processing
The simplest use case is to modify a chat after generation. Your callback receives the Chat
(for then
) or list[Chat]
(for map
) and can return the modified chat(s) or None
(for then
) to keep the original.
Iterative Generation and Validation
Callbacks can also drive further generation steps, enabling complex validation loops, conditional branching, or agent-like behavior.
To achieve this, a then
or map
callback can do either of the following:
- Return or yield a
PipelineStepGenerator
orPipelineStepContextManager
. This is typically done by calling.step()
on a new or restarted pipeline. - Call
run()
or derivatives likerun_many()
orrun_batch()
directly inside the callback to execute new generation steps and return the final result.
Option 1 is generally preferred as it allows for more control over iterative pipeline execution when generations are nested. Without this, the pipelines above won’t be able to properly track the depth of the nested calls.
- Recursion Control: The
max_depth
parameter onthen()
andmap()
is crucial. It limits how many nested pipeline steps can be triggered from within a callback, preventing infinite loops. If this depth is exceeded, aMaxDepthError
is raised (or handled based onon_failed
mode).
Output Parsing and Validation
A common use case for iterative generation is ensuring the model’s output successfully parses into a specific Model
. Rigging provides the convenient ChatPipeline.until_parsed_as()
method for this.
Internally, this method uses the then
callback mechanism described above, attempting to parse the required model(s) and triggering regeneration with validation feedback if parsing fails.
- Parameter Change: Note that
max_rounds
from v2 is replaced bymax_depth
. Theattempt_recovery
anddrop_dialog
parameters are removed as recovery is implicit and dialog is preserved.
Prompt Caching
Rigging has prompt caching with supported providers to save tokens and reduce API costs. This feature lets you mark parts of a conversation with cache markers to communicate with the inference provider about where caching is best applied.
Under the hood, caching works through cache_control
entries on messages, which signal to the generator which parts of a conversation are eligible for caching. Rigging manages these entries automatically when you enable caching.
We do our best to gracefully handle cases where the underlying provider does not support caching, but some interactions might fail with these cache_control
markers in place. Providers are also still defining how caching should work, and these mechanics may change in the future.
Enabling Caching
Caching can be enabled at two levels:
- Pipeline level - Apply caching policy to an entire conversation:
- Message level - Control caching for individual messages:
Caching Modes
Currently, Rigging supports one caching mode for pipelines:
latest
(default): Appliescache_control
markers to the last 2 non-assistant messages before inference. This is effective for long-running conversation and want to both re-use the latest cache entry and establish a new one at every step.
Disabling Caching
You can disable caching by passing False
to the cache method on pipelines or messages.
Fine-grained Step Control
For maximum control and introspection, you can use the ChatPipeline.step()
(or step_many
, step_batch
) async context manager. This yields PipelineStep
objects representing each stage of the execution (generated
, callback
, final
).
This allows you to examine intermediate states, inject custom logic between steps, or build highly complex generation flows beyond the standard callback system.
Anywhere you see max_depth
as a parameter, we are using this context manager under the hood to understand how many recursive generations are occuring.
Handling Failures
Pipelines provide robust ways to handle errors during generation or callback execution. You might want to avoid certain errors from halting the entire pipeline, especially in batch processing scenarios.
Setting Error Behavior
The on_failed
parameter (set with .catch()
or on run_many
/run_batch
/run_over
) determines behavior when a catchable error occurs:
raise
(Default): The exception is raised, halting execution.skip
: The chat where the error occurred is discarded.run_many
/run_batch
/run_over
will return only the successful chats. (Not valid for single.run()
).include
: The chat is marked with.failed = True
and the exception stored in.error
. The chat is included in the results, but may be incomplete or invalid.
Defining Catchable Errors
By default, pipelines catch critical internal errors like ValidationError
and MaxDepthError
when on_failed
is skip
or include
. You can specify additional exception types to be treated as non-fatal errors using ChatPipeline.catch()
.