Error Handling
anyiostream provides Rust-inspired Ok/Err types for railway-oriented error handling — errors flow as values through the pipeline instead of crashing silently.
The Problem
With plain .map(), exceptions in a stage skip the item and log a warning:
result = await Stream.from_iterable([1, 2, 3]).map(risky_fn).collect()
# If risky_fn(2) throws, item 2 is silently dropped
Result Types
Use try_* stages to wrap results as Ok(value) or Err(PipelineError(...)):
from anyiostream import Stream, Ok, Err, PipelineError
Ok[T]
Wraps a successful value:
ok = Ok(42)
ok.value # 42
ok.is_ok() # True
ok.unwrap() # 42
ok.unwrap_or(0) # 42
Err[E]
Wraps an error (typically PipelineError):
err = Err(PipelineError(exception=ValueError("boom"), item=2, stage="parse"))
err.error # PipelineError(...)
err.is_err() # True
err.unwrap_or(0) # 0
err.error.exception # ValueError("boom")
err.error.item # 2
err.error.stage # "parse"
Pattern Matching (Python 3.12+)
match result:
case Ok(value=v):
print(f"Success: {v}")
case Err(error=e):
print(f"Failed at {e.stage}: {e.exception}")
Result-Aware Stages
try_map(fn, *, err=None)
Map with automatic Ok/Err wrapping:
results = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5) # Ok(response) or Err(PipelineError)
| pipe.try_map(parse) # chains on Ok, passes Err through
| pipe.collect()
)
# results: [Ok(data1), Err(PipelineError(...)), Ok(data3), ...]
try_flat_map(fn, *, err=None)
Flat map with Ok/Err wrapping — each sub-item becomes Ok:
results = await (
Stream.from_iterable(pages)
| pipe.try_flat_map(extract_links, workers=3)
| pipe.collect()
)
try_filter(pred)
Filter Ok values; Err always passes through:
results = await (
Stream.from_iterable(items)
| pipe.try_map(validate)
| pipe.try_filter(lambda x: x.score > 0.5) # only filters Ok values
| pipe.collect()
)
try_foreach(fn, *, err=None)
Side-effect on Ok values; items pass through unchanged:
results = await (
Stream.from_iterable(items)
| pipe.try_map(process)
| pipe.try_foreach(log_success, err=log_failure)
| pipe.collect()
)
The err=handler Option
try_map, try_flat_map, and try_foreach accept an optional err parameter to transform Err items:
# Without err= : Err passes through unchanged
| pipe.try_map(parse)
# With err= : transform the error
| pipe.try_map(parse, err=lambda e: log_and_rewrap(e))
The handler receives the PipelineError and returns a new error value (wrapped back into Err).
Exit Ramps
recover(fn)
Convert Err → value, unwrap Ok. After this stage, items are plain values:
results = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5)
| pipe.recover(lambda err: f"FAILED: {err.item}")
| pipe.collect()
)
# results: ["<html>...", "FAILED: http://bad.url", "<html>...", ...]
ok_only()
Keep Ok values (unwrapped), drop all Err:
successes = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch)
| pipe.ok_only()
| pipe.collect()
)
errors_only()
Keep Err values (unwrapped to PipelineError), drop all Ok:
failures = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch)
| pipe.errors_only()
| pipe.collect()
)
for err in failures:
print(f"{err.stage}: {err.exception} (item={err.item})")
collect_split()
Partition into (successes, failures) in one pass:
oks, errs = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5)
| pipe.try_map(parse)
| pipe.collect_split()
)
print(f"{len(oks)} succeeded, {len(errs)} failed")
Full Example
from anyiostream import Stream, pipe
async def process_urls(urls: list[str]) -> None:
oks, errs = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=10)
| pipe.try_map(parse, err=lambda e: e) # pass errors through
| pipe.try_filter(lambda doc: doc.is_relevant)
| pipe.try_foreach(save_to_db, workers=3)
| pipe.collect_split()
)
print(f"Processed {len(oks)} documents")
for err in errs:
print(f" Failed: {err.stage} - {err.exception}")