Pipeline Basics

Stream Constructors

Stream.from_iterable(items)

Create a stream from any sync or async iterable:

# Sync iterable
stream = Stream.from_iterable([1, 2, 3])
stream = Stream.from_iterable(range(100))

# Async generator
async def generate():
    for i in range(10):
        yield i

stream = Stream.from_iterable(generate())

Stream.from_callable(factory)

Lazy construction — the factory is called each time the pipeline executes:

stream = Stream.from_callable(lambda: range(10))

# Useful for async generators (pass the function, not the result)
stream = Stream.from_callable(generate)  # not generate()

Transform Stages

All stages are lazy — they return a new Stream without executing anything.

.map(fn, *, workers=1, buffer_size=0, name=None)

1:1 transform. Works with sync or async functions:

# Sync
stream.map(lambda x: x * 2)

# Async
stream.map(async_fetch, workers=5)

.flat_map(fn, *, workers=1, buffer_size=0, name=None)

1:N transform. The function returns an iterable (sync or async):

# Each URL → multiple links
stream.flat_map(extract_links, workers=5)

# Sync expansion
stream.flat_map(lambda x: [x, x + 1])

.filter(pred, *, workers=1, buffer_size=0, name=None)

Keep items where the predicate is truthy:

stream.filter(lambda x: x > 0)
stream.filter(is_valid, workers=4)

.foreach(fn, *, workers=1, buffer_size=0, name=None)

Side-effect only — items pass through unchanged:

stream.foreach(print)
stream.foreach(log_item, workers=2)

Terminal Operations

Terminals materialize the pipeline and return a result.

.collect() list[T]

Collect all items into a list:

items = await stream.collect()

.count() int

Consume all items, return the count:

n = await stream.count()

.reduce(fn, initial) U

Fold all items into a single value:

total = await stream.reduce(lambda acc, x: acc + x, 0)

.first() T | None

Return the first item, or None if empty:

item = await stream.first()

.take(n) list[T]

Collect at most n items:

top5 = await stream.take(5)

.open() AsyncContextManager

Manual iteration with structured concurrency:

async with stream.open() as items:
    async for item in items:
        await process(item)

.collect_split() tuple[list, list]

Partition Ok/Err results into (successes, failures):

oks, errs = await stream.try_map(risky_fn).collect_split()

Stage Options

Every stage accepts these keyword arguments:

Option

Default

Description

workers

1

Number of concurrent workers for this stage

buffer_size

0

Channel buffer size (0 = rendezvous)

name

None

Label for debugging

See Concurrency & Workers for tuning guidance.