Source code for anyiostream.operators

"""
Pipe operators — aiostream-style ``|`` composition.

Usage::

	from anyiostream import Stream, pipe

	result = await (
		Stream.from_iterable(range(10))
		| pipe.map(lambda x: x * 2, workers=3)
		| pipe.filter(lambda x: x > 5)
		| pipe.collect()
	)

Each ``pipe.*`` call returns a callable that accepts a ``Stream``
and returns a new ``Stream`` (or a coroutine for terminals).
"""

from __future__ import annotations

from collections.abc import AsyncIterable, Awaitable, Callable, Iterable
from typing import Any, TypeVar

from anyiostream.stream import (
	_COLLECT_BATCH_SENTINEL,
	_COLLECT_SENTINEL,
	_COLLECT_SPLIT_SENTINEL,
	_COUNT_SENTINEL,
	Stream,
)

T = TypeVar("T")
U = TypeVar("U")


# ---------------------------------------------------------------------------
# Pipe namespace
# ---------------------------------------------------------------------------


[docs] class _Pipe: """ Namespace for pipe operators. All methods return a ``_PipeOp`` (a callable ``Stream -> Stream``) so they can be used with the ``|`` operator. """ __slots__ = ()
[docs] @staticmethod def map( func: Callable[[T], U | Awaitable[U]], *, workers: int = 1, buffer_size: float = 0, max_buffer_bytes: int = 10_000_000, size_func: Callable[[Any], int] | None = None, name: str | None = None, ) -> Callable[[Stream[T]], Stream[U]]: """ ``| pipe.map(fn)`` — 1:1 transform. Args: func: Transform function. workers: Concurrent workers. buffer_size: Backpressure buffer (item count). max_buffer_bytes: Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). size_func: Optional function to calculate item size in bytes. name: Debug label. Returns: A pipe operator that can be used with ``|``. """ def _apply(stream: Stream[T]) -> Stream[U]: return stream.map( func, workers=workers, buffer_size=buffer_size, max_buffer_bytes=max_buffer_bytes, size_func=size_func, name=name, ) return _apply
[docs] @staticmethod def flat_map( func: Callable[[T], AsyncIterable[U] | Iterable[U] | Awaitable[Any]], *, workers: int = 1, buffer_size: float = 0, max_buffer_bytes: int = 10_000_000, size_func: Callable[[Any], int] | None = None, name: str | None = None, ) -> Callable[[Stream[T]], Stream[U]]: """ ``| pipe.flat_map(fn)`` — 1:N transform. Args: func: Function returning iterable or async iterable. workers: Concurrent workers. buffer_size: Backpressure buffer. max_buffer_bytes: Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). size_func: Optional function to calculate item size in bytes. name: Debug label. Returns: A pipe operator. """ def _apply(stream: Stream[T]) -> Stream[U]: return stream.flat_map( func, workers=workers, buffer_size=buffer_size, max_buffer_bytes=max_buffer_bytes, size_func=size_func, name=name, ) return _apply
[docs] @staticmethod def filter( predicate: Callable[[T], bool | Awaitable[bool]], *, workers: int = 1, buffer_size: float = 0, max_buffer_bytes: int = 10_000_000, size_func: Callable[[Any], int] | None = None, name: str | None = None, ) -> Callable[[Stream[T]], Stream[T]]: """ ``| pipe.filter(pred)`` — keep items where predicate is truthy. Args: predicate: Filter function. workers: Concurrent workers. buffer_size: Backpressure buffer. max_buffer_bytes: Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). size_func: Optional function to calculate item size in bytes. name: Debug label. Returns: A pipe operator. """ def _apply(stream: Stream[T]) -> Stream[T]: return stream.filter( predicate, workers=workers, buffer_size=buffer_size, max_buffer_bytes=max_buffer_bytes, size_func=size_func, name=name, ) return _apply
[docs] @staticmethod def foreach( func: Callable[[T], Any], *, workers: int = 1, buffer_size: float = 0, max_buffer_bytes: int = 10_000_000, size_func: Callable[[Any], int] | None = None, name: str | None = None, ) -> Callable[[Stream[T]], Stream[T]]: """ ``| pipe.foreach(fn)`` — side effect, passes items through unchanged. Args: func: Side-effect function. workers: Concurrent workers. buffer_size: Backpressure buffer. max_buffer_bytes: Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). size_func: Optional function to calculate item size in bytes. name: Debug label. Returns: A pipe operator. """ def _apply(stream: Stream[T]) -> Stream[T]: return stream.foreach( func, workers=workers, buffer_size=buffer_size, max_buffer_bytes=max_buffer_bytes, size_func=size_func, name=name, ) return _apply
[docs] @staticmethod def flatten( *, workers: int = 1, buffer_size: float = 0, max_buffer_bytes: int = 10_000_000, size_func: Callable[[Any], int] | None = None, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """ ``| pipe.flatten()`` — flatten iterables in the stream. Args: workers: Concurrent workers. buffer_size: Backpressure buffer. max_buffer_bytes: Memory-based buffer limit in bytes. Defaults to 10MB (10_000_000 bytes). size_func: Optional function to calculate item size in bytes. name: Debug label. Returns: A pipe operator. """ def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.flatten( workers=workers, buffer_size=buffer_size, max_buffer_bytes=max_buffer_bytes, size_func=size_func, name=name, ) return _apply
[docs] @staticmethod def collect(*, batch: bool = False) -> Any: """ ``| pipe.collect()`` — terminal: collect all items into a list. Args: batch: If ``True``, drain any ``AsyncIterable``/``Iterable`` items into sub-lists. Returns: Sentinel that triggers ``Stream.__or__`` to call ``.collect()``. """ if batch: return _COLLECT_BATCH_SENTINEL return _COLLECT_SENTINEL
[docs] @staticmethod def count() -> Any: """ ``| pipe.count()`` — terminal: consume all items, return count. Returns: Sentinel that triggers ``Stream.__or__`` to call ``.count()``. """ return _COUNT_SENTINEL
# ------------------------------------------------------------------ # Result-aware operators # ------------------------------------------------------------------
[docs] @staticmethod def try_map( func: Callable[..., Any], *, err: Callable[..., Any] | None = None, workers: int = 1, buffer_size: float = 0, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.try_map(fn, err=handler)`` — map with Ok/Err wrapping.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.try_map( func, err=err, workers=workers, buffer_size=buffer_size, name=name, ) return _apply
[docs] @staticmethod def try_flat_map( func: Callable[..., Any], *, err: Callable[..., Any] | None = None, workers: int = 1, buffer_size: float = 0, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.try_flat_map(fn, err=handler)`` — flat_map with Ok/Err wrapping.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.try_flat_map( func, err=err, workers=workers, buffer_size=buffer_size, name=name, ) return _apply
[docs] @staticmethod def try_filter( predicate: Callable[..., Any], *, workers: int = 1, buffer_size: float = 0, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.try_filter(pred)`` — filter Ok values, Err passes through.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.try_filter( predicate, workers=workers, buffer_size=buffer_size, name=name, ) return _apply
[docs] @staticmethod def try_foreach( func: Callable[..., Any], *, err: Callable[..., Any] | None = None, workers: int = 1, buffer_size: float = 0, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.try_foreach(fn, err=handler)`` — side effect on Ok values.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.try_foreach( func, err=err, workers=workers, buffer_size=buffer_size, name=name, ) return _apply
[docs] @staticmethod def recover( func: Callable[..., Any], *, workers: int = 1, buffer_size: float = 0, name: str | None = None, ) -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.recover(fn)`` — convert Err to value, unwrap Ok.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.recover( func, workers=workers, buffer_size=buffer_size, name=name, ) return _apply
[docs] @staticmethod def ok_only() -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.ok_only()`` — keep and unwrap Ok, drop Err.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.ok_only() return _apply
[docs] @staticmethod def errors_only() -> Callable[[Stream[Any]], Stream[Any]]: """``| pipe.errors_only()`` — keep and unwrap Err, drop Ok.""" def _apply(stream: Stream[Any]) -> Stream[Any]: return stream.errors_only() return _apply
[docs] @staticmethod def collect_split() -> Any: """``| pipe.collect_split()`` — terminal: partition into (oks, errs).""" return _COLLECT_SPLIT_SENTINEL
# Module-level singleton pipe = _Pipe()