Pipe Operators
- class anyiostream.operators._Pipe[source]
Namespace for pipe operators.
All methods return a
_PipeOp(a callableStream -> Stream) so they can be used with the|operator.- static map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
| pipe.map(fn)— 1:1 transform.- Parameters:
func (Callable[[T], U | Awaitable[U]]) – Transform function.
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer (item count).
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Debug label.
- Returns:
A pipe operator that can be used with
|.- Return type:
- static flat_map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
| pipe.flat_map(fn)— 1:N transform.- Parameters:
func (Callable[[T], AsyncIterable[U] | Iterable[U] | Awaitable[Any]]) – Function returning iterable or async iterable.
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Debug label.
- Returns:
A pipe operator.
- Return type:
- static filter(predicate, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
| pipe.filter(pred)— keep items where predicate is truthy.- Parameters:
predicate (Callable[[T], bool | Awaitable[bool]]) – Filter function.
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Debug label.
- Returns:
A pipe operator.
- Return type:
- static foreach(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
| pipe.foreach(fn)— side effect, passes items through unchanged.- Parameters:
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Debug label.
- Returns:
A pipe operator.
- Return type:
- static flatten(*, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
| pipe.flatten()— flatten iterables in the stream.- Parameters:
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Debug label.
- Returns:
A pipe operator.
- Return type:
- static count()[source]
| pipe.count()— terminal: consume all items, return count.- Returns:
Sentinel that triggers
Stream.__or__to call.count().- Return type:
- static try_map(func, *, err=None, workers=1, buffer_size=0, name=None)[source]
| pipe.try_map(fn, err=handler)— map with Ok/Err wrapping.
- static try_flat_map(func, *, err=None, workers=1, buffer_size=0, name=None)[source]
| pipe.try_flat_map(fn, err=handler)— flat_map with Ok/Err wrapping.
- static try_filter(predicate, *, workers=1, buffer_size=0, name=None)[source]
| pipe.try_filter(pred)— filter Ok values, Err passes through.
- static try_foreach(func, *, err=None, workers=1, buffer_size=0, name=None)[source]
| pipe.try_foreach(fn, err=handler)— side effect on Ok values.
- static recover(func, *, workers=1, buffer_size=0, name=None)[source]
| pipe.recover(fn)— convert Err to value, unwrap Ok.
The pipe singleton provides static methods that return callables for use with the | operator.
from anyiostream import pipe
result = await (
Stream.from_iterable(items)
| pipe.map(fn, workers=4)
| pipe.filter(pred)
| pipe.collect()
)
Transform Operators
pipe.map(fn, *, workers=1, buffer_size=0, name=None)
Returns a callable that applies .map() to the stream.
pipe.flat_map(fn, *, workers=1, buffer_size=0, name=None)
Returns a callable that applies .flat_map() to the stream.
pipe.filter(pred, *, workers=1, buffer_size=0, name=None)
Returns a callable that applies .filter() to the stream.
pipe.foreach(fn, *, workers=1, buffer_size=0, name=None)
Returns a callable that applies .foreach() to the stream.
Terminal Operators
pipe.collect()
Returns a sentinel that triggers .collect() via __or__.
pipe.count()
Returns a sentinel that triggers .count() via __or__.
pipe.collect_split()
Returns a sentinel that triggers .collect_split() via __or__.
Result-Aware Operators
pipe.try_map(fn, *, err=None, workers=1, buffer_size=0, name=None)
Map with Ok/Err wrapping.
pipe.try_flat_map(fn, *, err=None, workers=1, buffer_size=0, name=None)
Flat map with Ok/Err wrapping.
pipe.try_filter(pred, *, workers=1, buffer_size=0, name=None)
Filter Ok values; Err passes through.
pipe.try_foreach(fn, *, err=None, workers=1, buffer_size=0, name=None)
Side-effect on Ok values.
pipe.recover(fn, *, workers=1, buffer_size=0, name=None)
Convert Err → value, unwrap Ok.
pipe.ok_only()
Keep and unwrap Ok, drop Err.
pipe.errors_only()
Keep Err (unwrapped), drop Ok.