Pipe Operators

class anyiostream.operators._Pipe[source]

Namespace for pipe operators.

All methods return a _PipeOp (a callable Stream -> Stream) so they can be used with the | operator.

static map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

| pipe.map(fn) — 1:1 transform.

Parameters:
  • func (Callable[[T], U | Awaitable[U]]) – Transform function.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer (item count).

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Debug label.

Returns:

A pipe operator that can be used with |.

Return type:

Callable[[Stream[T]], Stream[U]]

static flat_map(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

| pipe.flat_map(fn) — 1:N transform.

Parameters:
  • func (Callable[[T], AsyncIterable[U] | Iterable[U] | Awaitable[Any]]) – Function returning iterable or async iterable.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Debug label.

Returns:

A pipe operator.

Return type:

Callable[[Stream[T]], Stream[U]]

static filter(predicate, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

| pipe.filter(pred) — keep items where predicate is truthy.

Parameters:
  • predicate (Callable[[T], bool | Awaitable[bool]]) – Filter function.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Debug label.

Returns:

A pipe operator.

Return type:

Callable[[Stream[T]], Stream[T]]

static foreach(func, *, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

| pipe.foreach(fn) — side effect, passes items through unchanged.

Parameters:
  • func (Callable[[T], Any]) – Side-effect function.

  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Debug label.

Returns:

A pipe operator.

Return type:

Callable[[Stream[T]], Stream[T]]

static flatten(*, workers=1, buffer_size=0, max_buffer_bytes=10000000, size_func=None, name=None)[source]

| pipe.flatten() — flatten iterables in the stream.

Parameters:
  • workers (int) – Concurrent workers.

  • buffer_size (float) – Backpressure buffer.

  • max_buffer_bytes (int) – Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes).

  • size_func (Callable[[Any], int] | None) – Optional function to calculate item size in bytes.

  • name (str | None) – Debug label.

Returns:

A pipe operator.

Return type:

Callable[[Stream[Any]], Stream[Any]]

static collect(*, batch=False)[source]

| pipe.collect() — terminal: collect all items into a list.

Parameters:

batch (bool) – If True, drain any AsyncIterable/Iterable items into sub-lists.

Returns:

Sentinel that triggers Stream.__or__ to call .collect().

Return type:

Any

static count()[source]

| pipe.count() — terminal: consume all items, return count.

Returns:

Sentinel that triggers Stream.__or__ to call .count().

Return type:

Any

static try_map(func, *, err=None, workers=1, buffer_size=0, name=None)[source]

| pipe.try_map(fn, err=handler) — map with Ok/Err wrapping.

Parameters:
Return type:

Callable[[Stream[Any]], Stream[Any]]

static try_flat_map(func, *, err=None, workers=1, buffer_size=0, name=None)[source]

| pipe.try_flat_map(fn, err=handler) — flat_map with Ok/Err wrapping.

Parameters:
Return type:

Callable[[Stream[Any]], Stream[Any]]

static try_filter(predicate, *, workers=1, buffer_size=0, name=None)[source]

| pipe.try_filter(pred) — filter Ok values, Err passes through.

Parameters:
Return type:

Callable[[Stream[Any]], Stream[Any]]

static try_foreach(func, *, err=None, workers=1, buffer_size=0, name=None)[source]

| pipe.try_foreach(fn, err=handler) — side effect on Ok values.

Parameters:
Return type:

Callable[[Stream[Any]], Stream[Any]]

static recover(func, *, workers=1, buffer_size=0, name=None)[source]

| pipe.recover(fn) — convert Err to value, unwrap Ok.

Parameters:
Return type:

Callable[[Stream[Any]], Stream[Any]]

static ok_only()[source]

| pipe.ok_only() — keep and unwrap Ok, drop Err.

Return type:

Callable[[Stream[Any]], Stream[Any]]

static errors_only()[source]

| pipe.errors_only() — keep and unwrap Err, drop Ok.

Return type:

Callable[[Stream[Any]], Stream[Any]]

static collect_split()[source]

| pipe.collect_split() — terminal: partition into (oks, errs).

Return type:

Any

The pipe singleton provides static methods that return callables for use with the | operator.

from anyiostream import pipe

result = await (
    Stream.from_iterable(items)
    | pipe.map(fn, workers=4)
    | pipe.filter(pred)
    | pipe.collect()
)

Transform Operators

pipe.map(fn, *, workers=1, buffer_size=0, name=None)

Returns a callable that applies .map() to the stream.

pipe.flat_map(fn, *, workers=1, buffer_size=0, name=None)

Returns a callable that applies .flat_map() to the stream.

pipe.filter(pred, *, workers=1, buffer_size=0, name=None)

Returns a callable that applies .filter() to the stream.

pipe.foreach(fn, *, workers=1, buffer_size=0, name=None)

Returns a callable that applies .foreach() to the stream.

Terminal Operators

pipe.collect()

Returns a sentinel that triggers .collect() via __or__.

pipe.count()

Returns a sentinel that triggers .count() via __or__.

pipe.collect_split()

Returns a sentinel that triggers .collect_split() via __or__.

Result-Aware Operators

pipe.try_map(fn, *, err=None, workers=1, buffer_size=0, name=None)

Map with Ok/Err wrapping.

pipe.try_flat_map(fn, *, err=None, workers=1, buffer_size=0, name=None)

Flat map with Ok/Err wrapping.

pipe.try_filter(pred, *, workers=1, buffer_size=0, name=None)

Filter Ok values; Err passes through.

pipe.try_foreach(fn, *, err=None, workers=1, buffer_size=0, name=None)

Side-effect on Ok values.

pipe.recover(fn, *, workers=1, buffer_size=0, name=None)

Convert Err → value, unwrap Ok.

pipe.ok_only()

Keep and unwrap Ok, drop Err.

pipe.errors_only()

Keep Err (unwrapped), drop Ok.