Source code for axio.stream
"""AgentStream: async iterator wrapper over the agent event generator."""
from __future__ import annotations
from collections.abc import AsyncGenerator
from .events import Error, SessionEndEvent, StreamEvent, TextDelta
from .exceptions import StreamError
[docs]
class AgentStream:
def __init__(self, generator: AsyncGenerator[StreamEvent, None]) -> None:
self._generator = generator
self._closed = False
def __aiter__(self) -> AgentStream:
return self
async def __anext__(self) -> StreamEvent:
if self._closed:
raise StopAsyncIteration
try:
return await self._generator.__anext__()
except StopAsyncIteration:
self._closed = True
raise
[docs]
async def aclose(self) -> None:
if not self._closed:
self._closed = True
await self._generator.aclose()
[docs]
async def get_final_text(self) -> str:
parts: list[str] = []
try:
async for event in self:
if isinstance(event, Error):
raise StreamError(str(event.exception)) from event.exception
if isinstance(event, TextDelta):
parts.append(event.delta)
finally:
await self.aclose()
return "".join(parts)
[docs]
async def get_session_end(self) -> SessionEndEvent:
result: SessionEndEvent | None = None
try:
async for event in self:
if isinstance(event, Error):
raise StreamError(str(event.exception)) from event.exception
if isinstance(event, SessionEndEvent):
result = event
finally:
await self.aclose()
if result is None:
raise StreamError("Stream ended without SessionEndEvent")
return result