Pipes and Filters
also known as Pipeline, Streaming Pipeline, EIP Pipeline
Compose stream-shaped processing as a chain of small filters connected by pipes.
This pattern helps complete certain larger patterns —
- used-byChat Chain★— Decompose a long, multi-disciplinary task into ordered phases; within each phase, run a paired-role chat between two agents until the phase artefact is signed off; pass the artefact to the next phase.
Context
A team is building a data-transformation flow in which input passes through several distinct steps before becoming output — for example a document goes through PDF extraction, OCR cleanup, language detection, chunking, and embedding, or an inbound message goes through parsing, classification, transformation, validation, and formatting. Each stage has a single responsibility and could in principle be tested or reused on its own, but only if it has a clean boundary. The team is choosing how to structure the code.
Problem
If the whole transformation lives in one monolithic function, the stages are tangled together and none of them can be tested in isolation; a bug in the OCR step is only reachable by running the entire pipeline end to end. If the team writes a bespoke pipeline each time, every project reinvents the plumbing for connecting one stage to the next and the stages cannot be shared across pipelines. Both extremes block the reuse and isolated testing the team wants.
Forces
- Filter granularity: too small = overhead; too big = back to monolith.
- Pipe contracts (typed messages) need agreement.
- Backpressure across pipes.
Example
A document-processing agent has grown into a 1500-line monolith that does PDF extraction, OCR cleanup, language detection, chunking, and embedding all in one function — and is impossible to test in isolation. The team rebuilds it as pipes-and-filters: each stage becomes a small filter with a single responsibility, connected by typed pipes. The OCR-cleanup filter can now be tested against a fixture in isolation, the chunking filter is reused by another product, and a new language-detection filter is dropped in without touching the others.
Diagram
Solution
Therefore:
Decompose the transformation into small filters with single responsibilities. Connect them via typed pipes (function call, queue, stream). Each filter is testable in isolation. Filters can be reused across pipelines.
What this pattern forbids. Filters communicate only through pipes with typed contracts.
The smaller patterns that complete this one —
- generalisesPrompt Chaining★★— Decompose a task into a fixed sequence of LLM calls where each step's output becomes the next step's input.
And the patterns that stand alongside it, or against it —
- composes-withMapReduce for Agents★— Split an oversize task into independent chunks, process each in parallel, then aggregate.
- alternative-toTopic-Based Routing★— Route inter-agent messages through named topics that agents subscribe to, instead of having senders address each other by id.
Neighbourhood
Click any neighbour to follow the language. Scroll to zoom, drag to pan.