Stream

class anyiostream.stream.Stream[source]

Bases: ResultStages, Generic

A lazy, composable async pipeline.

Holds a list of Process descriptors and a source factory. Nothing runs until a terminal operation (collect, count, or open()) is invoked.

__init__(source_factory, processes=None)[source]
Parameters:
Return type:

None

classmethod from_iterable(items, *, buffer_size=0)[source]

Create a stream from a sync or async iterable.

Parameters:
  • items (Iterable[T] | AsyncIterable[T]) – Source data.

  • buffer_size (float) – Buffer between source and first process (not used directly here — the first process’s own buffer_size controls it).

Return type:

Stream[T]

classmethod from_callable(factory, *, buffer_size=0)[source]

Lazily create a stream: factory is called at execution time.

Useful when the source is expensive to create or can only be iterated once.

Parameters:
Return type:

Stream[T]

map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

1:1 transformation. func may be sync or async.

Parameters:
  • func (Callable[[T], U | Awaitable[U]]) – Transform function T -> U.

  • workers (int) – Concurrent workers for this process.

  • buffer_size (float) – Backpressure buffer to downstream (item count).

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.

  • name (str | None) – Label for tracing.

Return type:

Stream[U]

flat_map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

1:N transformation. func returns an iterable or async iterable.

Parameters:
  • func (Callable[[T], AsyncIterable[U] | Iterable[U] | Awaitable[Any]]) – Transform function T -> Iterable[U] or T -> AsyncIterable[U].

  • workers (int) – Concurrent workers for this process.

  • buffer_size (float) – Backpressure buffer to downstream.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.

  • name (str | None) – Label for tracing.

Return type:

Stream[U]

filter(predicate, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

Keep only items where predicate returns truthy.

Parameters:
  • predicate (Callable[[T], bool | Awaitable[bool]]) – Filter function T -> bool.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer to downstream.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.

  • name (str | None) – Label for tracing.

Return type:

Stream[T]

foreach(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

Side-effect process: call func on each item without changing it.

Useful for logging, metrics, or caching.

Parameters:
  • func (Callable[[T], Any]) – Side-effect function T -> None.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer to downstream.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). When set, enables MemoryBuffer for memory-aware buffering.

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes. Only used when max_buffer_bytes is specified.

  • name (str | None) – Label for tracing.

Return type:

Stream[T]

flatten(*, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

Flatten iterables in the stream.

Converts a Stream[AsyncIterable[U] | Iterable[U]] into a Stream[U] by yielding each sub-item individually — equivalent to flat_map(identity).

Parameters:
  • workers (int) – Concurrent workers for this process.

  • buffer_size (float) – Backpressure buffer to downstream.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Label for tracing.

Return type:

Stream[Any]

async collect(*, batch=False)[source]

Execute the pipeline and collect all outputs into a list.

Parameters:

batch (bool) – If True, drain any AsyncIterable or Iterable items into sub-lists, so that a Stream[AsyncIterator[U]] produces list[list[U]] instead of list[AsyncIterator[U]].

Returns:

All items produced by the final process.

Return type:

list[T]

async count()[source]

Execute the pipeline, discarding outputs.

Returns:

Number of items processed.

Return type:

int

async reduce(func, initial)[source]

Fold all items into a single value.

Parameters:
  • func (Callable[[U, T], U | Awaitable[U]]) – Reducer (acc, item) -> acc.

  • initial (U) – Starting accumulator value.

Returns:

Final accumulated value.

Return type:

U

async first()[source]

Return the first item, or None if the stream is empty.

Return type:

T | None

async take(n)[source]

Collect at most n items.

Parameters:

n (int)

Return type:

list[T]

open()[source]

Context-manager API for manual iteration with structured concurrency.

Usage:

async with stream.open() as items:
        async for item in items:
                process(item)
Return type:

AsyncIterator[MemoryObjectReceiveStream[T]]

The Stream class is the core pipeline builder. It holds a lazy recipe of Process stages that execute only when a terminal operation is called.

Constructors

Stream.from_iterable(source)

Create a stream from a sync or async iterable.

# Sync
stream = Stream.from_iterable([1, 2, 3])

# Async generator
async def gen():
    for i in range(10):
        yield i
stream = Stream.from_iterable(gen())

Parameters:

Parameter

Type

Description

source

Iterable[T] | AsyncIterable[T]

Items to stream

Stream.from_callable(factory)

Lazy construction — factory is called at execution time.

stream = Stream.from_callable(lambda: range(10))
stream = Stream.from_callable(async_generator_function)

Parameters:

Parameter

Type

Description

factory

Callable[[], Iterable[T] | AsyncIterable[T]]

Called when pipeline materializes

Transform Stages

All stages return a new Stream (lazy — nothing executes yet).

.map(fn, *, workers=1, buffer_size=0, name=None)

1:1 transform. Accepts sync or async functions.

.flat_map(fn, *, workers=1, buffer_size=0, name=None)

1:N transform. Function returns an iterable or async iterable.

.filter(pred, *, workers=1, buffer_size=0, name=None)

Keep items where predicate is truthy.

.foreach(fn, *, workers=1, buffer_size=0, name=None)

Side-effect — calls fn(item), passes item through unchanged.

Common Parameters:

Parameter

Type

Default

Description

fn / pred

Callable

Transform or predicate function

workers

int

1

Number of concurrent workers

buffer_size

float

0

Channel buffer (0=rendezvous, math.inf=unbounded)

name

str | None

None

Debug label

Result-Aware Stages

See Result Types for Ok/Err details.

.try_map(fn, *, err=None, workers=1, buffer_size=0, name=None)

Map with Ok/Err wrapping. Exceptions become Err(PipelineError(...)).

.try_flat_map(fn, *, err=None, workers=1, buffer_size=0, name=None)

Flat map with Ok/Err wrapping.

.try_filter(pred, *, workers=1, buffer_size=0, name=None)

Filter Ok values. Err always passes through.

.try_foreach(fn, *, err=None, workers=1, buffer_size=0, name=None)

Side-effect on Ok values.

.recover(fn, *, workers=1, buffer_size=0, name=None)

Convert Err → value via fn(error), unwrap Ok. Exits Result mode.

.ok_only()

Keep and unwrap Ok values, drop Err.

.errors_only()

Keep Err values (unwrapped to error), drop Ok.

Terminal Operations

Terminals materialize the pipeline and return a result.

await stream.collect() list[T]

Collect all items into a list.

await stream.count() int

Consume all items, return the count.

await stream.reduce(fn, initial) U

Fold items: fn(accumulator, item) accumulator.

await stream.first() T | None

Return the first item, or None if empty.

await stream.take(n) list[T]

Collect at most n items.

async with stream.open() as recv

Context manager for manual iteration.

await stream.collect_split() tuple[list, list]

Partition into (ok_values, errors).

Pipe Operator

stream | pipe.map(fn) | pipe.collect()

The __or__ method supports pipe composition. See Pipe Operators.