anyiostream

Composable async pipelines with structured concurrency

CI PyPI Python License


anyiostream provides lazy, composable async pipelines with true inter-stage concurrency, backpressure, and Rust-inspired error handling — all built on anyio for seamless asyncio + trio support.

Quick Example

from anyiostream import Stream

result = await (
    Stream.from_iterable(range(100))
    .map(lambda x: x * 2, workers=4)
    .filter(lambda x: x > 50)
    .collect()
)
from anyiostream import Stream, pipe

result = await (
    Stream.from_iterable(urls)
    | pipe.map(fetch, workers=10)
    | pipe.flat_map(extract_links, workers=5)
    | pipe.filter(is_valid)
    | pipe.map(normalize)
    | pipe.collect()
)
from anyiostream import Stream, pipe

oks, errs = await (
    Stream.from_iterable(urls)
    | pipe.try_map(fetch, workers=5)
    | pipe.try_map(parse)
    | pipe.collect_split()
)

Key Features

  • True inter-stage concurrency — each stage runs in its own task via TaskGroup

  • Fan-out workersworkers=N for load-balanced parallelism per stage

  • Backpressure — bounded channels between stages prevent resource exhaustion

  • Rust-inspired Ok/Err — railway-oriented error handling with try_map, recover, collect_split

  • Backend-portable — asyncio + trio via anyio

  • Structured concurrency — automatic cleanup, no leaked tasks

Installation

pip install anyiostream

Or with uv:

uv add anyiostream

Next Steps

More