Source code for anyiostream.result

"""
Result type for pipeline error handling — Rust-inspired Ok/Err.

Provides a discriminated union for representing success (``Ok``) or
failure (``Err``) as values that flow through pipeline channels,
instead of silently dropping errors.

Works with Python 3.12+ ``match/case``::

	match item:
		case Ok(value=v):
			print(f"Success: {v}")
		case Err(error=e):
			print(f"Failed at {e.stage}: {e.exception}")
"""

from __future__ import annotations

import traceback as _tb
from collections.abc import AsyncIterable, Awaitable, Callable
from dataclasses import dataclass
from typing import Any, NoReturn, TypeVar

T = TypeVar("T")
E = TypeVar("E")


# ---------------------------------------------------------------------------
# Error context
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True, slots=True) class PipelineError: """Context captured when a pipeline stage raises an exception. Attributes: exception: The caught exception instance. item: The original input item that caused the error. stage: Human-readable label of the stage that failed. traceback: Formatted traceback string at the point of failure. """ exception: Exception item: Any = None stage: str | None = None traceback: str | None = None def __str__(self) -> str: prefix = f"[{self.stage}] " if self.stage else "" return f"{prefix}{self.exception}"
# --------------------------------------------------------------------------- # Result variants # ---------------------------------------------------------------------------
[docs] @dataclass(frozen=True, slots=True) class Ok[T]: """Successful result wrapping a value.""" value: T
[docs] def is_ok(self) -> bool: return True
[docs] def is_err(self) -> bool: return False
[docs] def unwrap(self) -> T: """Return the contained value.""" return self.value
[docs] def unwrap_or(self, default: T) -> T: # noqa: ARG002 """Return the contained value (ignores *default*).""" return self.value
[docs] def unwrap_err(self) -> NoReturn: """Raises — an ``Ok`` has no error.""" raise ValueError(f"Called unwrap_err on Ok({self.value!r})")
[docs] def map(self, func: Any) -> Ok[Any]: """Apply *func* to the contained value.""" return Ok(func(self.value))
[docs] def map_err(self, func: Any) -> Ok[T]: # noqa: ARG002 """No-op — ``Ok`` has no error to transform.""" return self
[docs] @dataclass(frozen=True, slots=True) class Err[E]: """Error result wrapping an error.""" error: E
[docs] def is_ok(self) -> bool: return False
[docs] def is_err(self) -> bool: return True
[docs] def unwrap(self) -> NoReturn: """Raises — an ``Err`` has no value.""" raise ValueError(f"Called unwrap on Err({self.error!r})")
[docs] def unwrap_or(self, default: Any) -> Any: """Return *default* since ``Err`` has no value.""" return default
[docs] def unwrap_err(self) -> E: """Return the contained error.""" return self.error
[docs] def map(self, func: Any) -> Err[E]: # noqa: ARG002 """No-op — ``Err`` has no value to transform.""" return self
[docs] def map_err(self, func: Any) -> Err[Any]: """Apply *func* to the contained error.""" return Err(func(self.error))
type Result[T, E] = Ok[T] | Err[E] # --------------------------------------------------------------------------- # Result wrapper factories (used by process.py mixin) # --------------------------------------------------------------------------- def _try_map_wrap( func: Callable[..., Any], label: str, err: Callable[..., Any] | None = None, ) -> Callable[..., Any]: """Universal MAP wrapper for Result-aware processing. - Raw value: apply *func*, wrap as ``Ok`` (exception → ``Err``). - ``Ok(value)``: unwrap, apply *func*, wrap as ``Ok``. - ``Err(error)``: if *err* provided, apply to error → ``Err(result)``. - Otherwise pass through unchanged. """ async def _wrapped(item: Any) -> Ok[Any] | Err[Any]: if isinstance(item, Err): if err is not None: result = err(item.error) if isinstance(result, Awaitable): result = await result return Err(result) return item value = item.value if isinstance(item, Ok) else item try: result = func(value) if isinstance(result, Awaitable): result = await result return Ok(result) except Exception as exc: return Err( PipelineError( exception=exc, item=value, stage=label, traceback=_tb.format_exc(), ) ) return _wrapped def _try_flat_map_wrap( func: Callable[..., Any], label: str, err: Callable[..., Any] | None = None, ) -> Callable[..., Any]: """Universal FLAT_MAP wrapper for Result-aware processing. Each sub-item → ``Ok``, exception → single ``Err``. ``Err`` items use the *err* handler or pass through as ``[Err(...)]``. """ async def _wrapped(item: Any) -> list[Ok[Any] | Err[Any]]: if isinstance(item, Err): if err is not None: result = err(item.error) if isinstance(result, Awaitable): result = await result return [Err(result)] return [item] value = item.value if isinstance(item, Ok) else item try: result = func(value) if isinstance(result, AsyncIterable): return [Ok(sub) async for sub in result] if isinstance(result, Awaitable): resolved = await result if isinstance(resolved, AsyncIterable): return [Ok(sub) async for sub in resolved] return [Ok(sub) for sub in resolved] return [Ok(sub) for sub in result] except Exception as exc: return [ Err( PipelineError( exception=exc, item=value, stage=label, traceback=_tb.format_exc(), ) ) ] return _wrapped