Process & Pipeline Internals
Core building blocks that power the pipeline execution engine.
ProcessKind
Enum describing the transformation semantics of a pipeline stage.
from anyiostream import ProcessKind
Value |
Meaning |
Input → Output |
|---|---|---|
|
1:1 transform |
Each input produces exactly one output |
|
1:N expand |
Each input produces zero or more outputs |
|
1:0|1 predicate |
Each input is kept or dropped |
|
Side-effect |
Passes input through unchanged |
ProcessConfig
Per-stage tunables. Frozen dataclass with slots=True.
from anyiostream import ProcessConfig
cfg = ProcessConfig(workers=4, buffer_size=8, name="fetch")
Fields
Field |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Concurrent fan-out workers. |
|
|
|
Backpressure buffer between stages. |
|
|
|
Human-readable label for debugging / tracing. |
Validation
workersmust be ≥ 1 (raisesValueError)buffer_sizemust be ≥ 0 (raisesValueError)
Process
A single unit of work in the pipeline graph.
from anyiostream import Process, ProcessKind, ProcessConfig
Fields
Field |
Type |
Description |
|---|---|---|
|
|
The transformation function |
|
|
Transformation semantics |
|
|
Concurrency & buffering tunables |
Execution Model
Each Process runs inside an anyio TaskGroup:
Input channel — receives items from the previous stage
Worker tasks —
config.workersconcurrent tasks pull from the inputOutput channel — bounded by
config.buffer_size, feeds the next stage
Workers respect structured concurrency: if any worker raises, the entire pipeline cancels cleanly.
ResultStages (mixin)
Mixed into Stream to provide Result-aware pipeline stages. You don’t instantiate this directly — use it through Stream.
Stages
Method |
Kind |
Description |
|---|---|---|
|
MAP |
Wraps output in |
|
FLAT_MAP |
Like |
|
FILTER |
Drops items where predicate fails; catches exceptions as |
|
FOREACH |
Side-effect with error capture |
All try_* stages accept:
workers=/buffer_size=/name=— same as regular stageserr=handler— optional callback(PipelineError) → Tinvoked on error instead of emittingErr
Exit Ramps
Method |
Input |
Output |
Description |
|---|---|---|---|
|
|
|
Unwrap |
|
|
|
Keep only |
|
|
|
Keep only |
|
|
|
Terminal — collect into two lists |