Stream
- class anyiostream.stream.Stream[source]
Bases:
ResultStages,GenericA lazy, composable async pipeline.
Holds a list of
Processdescriptors and a source factory. Nothing runs until a terminal operation (collect,count, oropen()) is invoked.- classmethod from_iterable(items, *, buffer_size=0)[source]
Create a stream from a sync or async iterable.
- Parameters:
items (Iterable[T] | AsyncIterable[T]) – Source data.
buffer_size (float) – Buffer between source and first process (not used directly here — the first process’s own
buffer_sizecontrols it).
- Return type:
Stream[T]
- classmethod from_callable(factory, *, buffer_size=0)[source]
Lazily create a stream:
factoryis called at execution time.Useful when the source is expensive to create or can only be iterated once.
- Parameters:
factory (Callable[[], Iterable[T] | AsyncIterable[T]])
buffer_size (float)
- Return type:
Stream[T]
- map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
1:1 transformation.
funcmay be sync or async.- Parameters:
func (Callable[[T], U | Awaitable[U]]) – Transform function
T -> U.workers (int) – Concurrent workers for this process.
buffer_size (float) – Backpressure buffer to downstream (item count).
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.
name (str | None) – Label for tracing.
- Return type:
Stream[U]
- flat_map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
1:N transformation.
funcreturns an iterable or async iterable.- Parameters:
func (Callable[[T], AsyncIterable[U] | Iterable[U] | Awaitable[Any]]) – Transform function
T -> Iterable[U]orT -> AsyncIterable[U].workers (int) – Concurrent workers for this process.
buffer_size (float) – Backpressure buffer to downstream.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.
name (str | None) – Label for tracing.
- Return type:
Stream[U]
- filter(predicate, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
Keep only items where
predicatereturns truthy.- Parameters:
predicate (Callable[[T], bool | Awaitable[bool]]) – Filter function
T -> bool.workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer to downstream.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.
name (str | None) – Label for tracing.
- Return type:
Stream[T]
- foreach(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
Side-effect process: call
funcon each item without changing it.Useful for logging, metrics, or caching.
- Parameters:
workers (int) – Concurrent workers.
buffer_size (float) – Backpressure buffer to downstream.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.
name (str | None) – Label for tracing.
- Return type:
Stream[T]
- flatten(*, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]
Flatten iterables in the stream.
Converts a
Stream[AsyncIterable[U] | Iterable[U]]into aStream[U]by yielding each sub-item individually — equivalent toflat_map(identity).- Parameters:
workers (int) – Concurrent workers for this process.
buffer_size (float) – Backpressure buffer to downstream.
max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).
size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.
name (str | None) – Label for tracing.
- Return type:
- async count()[source]
Execute the pipeline, discarding outputs.
- Returns:
Number of items processed.
- Return type:
The Stream class is the core pipeline builder. It holds a lazy recipe of Process stages that execute only when a terminal operation is called.
Constructors
Stream.from_iterable(source)
Create a stream from a sync or async iterable.
# Sync
stream = Stream.from_iterable([1, 2, 3])
# Async generator
async def gen():
for i in range(10):
yield i
stream = Stream.from_iterable(gen())
Parameters:
Parameter |
Type |
Description |
|---|---|---|
|
|
Items to stream |
Stream.from_callable(factory)
Lazy construction — factory is called at execution time.
stream = Stream.from_callable(lambda: range(10))
stream = Stream.from_callable(async_generator_function)
Parameters:
Parameter |
Type |
Description |
|---|---|---|
|
|
Called when pipeline materializes |
Transform Stages
All stages return a new Stream (lazy — nothing executes yet).
.map(fn, *, workers=1, buffer_size=0, name=None)
1:1 transform. Accepts sync or async functions.
.flat_map(fn, *, workers=1, buffer_size=0, name=None)
1:N transform. Function returns an iterable or async iterable.
.filter(pred, *, workers=1, buffer_size=0, name=None)
Keep items where predicate is truthy.
.foreach(fn, *, workers=1, buffer_size=0, name=None)
Side-effect — calls fn(item), passes item through unchanged.
Common Parameters:
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
— |
Transform or predicate function |
|
|
|
Number of concurrent workers |
|
|
|
Channel buffer (0=rendezvous, |
|
|
|
Debug label |
Result-Aware Stages
See Result Types for Ok/Err details.
.try_map(fn, *, err=None, workers=1, buffer_size=0, name=None)
Map with Ok/Err wrapping. Exceptions become Err(PipelineError(...)).
.try_flat_map(fn, *, err=None, workers=1, buffer_size=0, name=None)
Flat map with Ok/Err wrapping.
.try_filter(pred, *, workers=1, buffer_size=0, name=None)
Filter Ok values. Err always passes through.
.try_foreach(fn, *, err=None, workers=1, buffer_size=0, name=None)
Side-effect on Ok values.
.recover(fn, *, workers=1, buffer_size=0, name=None)
Convert Err → value via fn(error), unwrap Ok. Exits Result mode.
.ok_only()
Keep and unwrap Ok values, drop Err.
.errors_only()
Keep Err values (unwrapped to error), drop Ok.
Terminal Operations
Terminals materialize the pipeline and return a result.
await stream.collect() → list[T]
Collect all items into a list.
await stream.count() → int
Consume all items, return the count.
await stream.reduce(fn, initial) → U
Fold items: fn(accumulator, item) → accumulator.
await stream.first() → T | None
Return the first item, or None if empty.
await stream.take(n) → list[T]
Collect at most n items.
async with stream.open() as recv
Context manager for manual iteration.
await stream.collect_split() → tuple[list, list]
Partition into (ok_values, errors).
Pipe Operator
stream | pipe.map(fn) | pipe.collect()
The __or__ method supports pipe composition. See Pipe Operators.