Pipeline Basics
Stream Constructors
Stream.from_iterable(items)
Create a stream from any sync or async iterable:
# Sync iterable
stream = Stream.from_iterable([1, 2, 3])
stream = Stream.from_iterable(range(100))
# Async generator
async def generate():
for i in range(10):
yield i
stream = Stream.from_iterable(generate())
Stream.from_callable(factory)
Lazy construction — the factory is called each time the pipeline executes:
stream = Stream.from_callable(lambda: range(10))
# Useful for async generators (pass the function, not the result)
stream = Stream.from_callable(generate) # not generate()
Transform Stages
All stages are lazy — they return a new Stream without executing anything.
.map(fn, *, workers=1, buffer_size=0, name=None)
1:1 transform. Works with sync or async functions:
# Sync
stream.map(lambda x: x * 2)
# Async
stream.map(async_fetch, workers=5)
.flat_map(fn, *, workers=1, buffer_size=0, name=None)
1:N transform. The function returns an iterable (sync or async):
# Each URL → multiple links
stream.flat_map(extract_links, workers=5)
# Sync expansion
stream.flat_map(lambda x: [x, x + 1])
.filter(pred, *, workers=1, buffer_size=0, name=None)
Keep items where the predicate is truthy:
stream.filter(lambda x: x > 0)
stream.filter(is_valid, workers=4)
.foreach(fn, *, workers=1, buffer_size=0, name=None)
Side-effect only — items pass through unchanged:
stream.foreach(print)
stream.foreach(log_item, workers=2)
Terminal Operations
Terminals materialize the pipeline and return a result.
.collect() → list[T]
Collect all items into a list:
items = await stream.collect()
.count() → int
Consume all items, return the count:
n = await stream.count()
.reduce(fn, initial) → U
Fold all items into a single value:
total = await stream.reduce(lambda acc, x: acc + x, 0)
.first() → T | None
Return the first item, or None if empty:
item = await stream.first()
.take(n) → list[T]
Collect at most n items:
top5 = await stream.take(5)
.open() → AsyncContextManager
Manual iteration with structured concurrency:
async with stream.open() as items:
async for item in items:
await process(item)
.collect_split() → tuple[list, list]
Partition Ok/Err results into (successes, failures):
oks, errs = await stream.try_map(risky_fn).collect_split()
Stage Options
Every stage accepts these keyword arguments:
Option |
Default |
Description |
|---|---|---|
|
|
Number of concurrent workers for this stage |
|
|
Channel buffer size (0 = rendezvous) |
|
|
Label for debugging |
See Concurrency & Workers for tuning guidance.