Stream Events¶
All agent I/O flows through typed stream events. The transport produces events, the agent processes them, and consumers (like the TUI) render them.
Event pipeline¶
flowchart LR
T[Transport] -->|StreamEvent| A[Agent]
A -->|StreamEvent| S[AgentStream]
S -->|StreamEvent| C[Consumer]
The transport yields events as they arrive from the LLM. The agent enriches
the stream with ToolResult events after dispatching tool calls, then
forwards everything through AgentStream to the consumer.
Event types¶
All events are frozen dataclasses with slots=True:
TextDeltaA chunk of text output from the model.
@dataclass(frozen=True, slots=True) class TextDelta: index: int delta: str
ReasoningDeltaA chunk of reasoning/thinking output (for models that support it). Same shape as
TextDelta.ToolUseStartSignals the beginning of a tool call.
@dataclass(frozen=True, slots=True) class ToolUseStart: index: int tool_use_id: ToolCallID name: ToolName
ToolInputDeltaA partial JSON fragment of tool input, streamed incrementally.
@dataclass(frozen=True, slots=True) class ToolInputDelta: index: int tool_use_id: ToolCallID partial_json: str
ToolResultThe result of executing a tool, added by the agent after dispatch.
@dataclass(frozen=True, slots=True) class ToolResult: tool_use_id: ToolCallID name: ToolName is_error: bool content: str = "" input: dict[str, Any] = field(default_factory=dict)
IterationEndMarks the end of one transport call. Carries the stop reason and token usage.
@dataclass(frozen=True, slots=True) class IterationEnd: iteration: int stop_reason: StopReason usage: Usage
ErrorWraps an exception that occurred during streaming.
SessionEndEventFinal event of the session. Carries the stop reason and cumulative token usage.
@dataclass(frozen=True, slots=True) class SessionEndEvent: stop_reason: StopReason total_usage: Usage
StreamEvent union¶
All events are combined into a single type alias:
type StreamEvent = (
ReasoningDelta | TextDelta | ToolUseStart | ToolInputDelta
| ToolResult | IterationEnd | Error | SessionEndEvent
)
Use match or isinstance to dispatch on event types:
async for event in agent.run_stream("Hello", context):
match event:
case TextDelta(delta=text):
print(text, end="", flush=True)
case ToolResult(name=name, content=content):
print(f"\n[Tool: {name}] {content}")
case SessionEndEvent():
print("\n--- Done ---")
AgentStream¶
AgentStream is a thin async-iterator wrapper around the event generator:
class AgentStream:
def __aiter__(self) -> AgentStream: ...
async def __anext__(self) -> StreamEvent: ...
async def aclose(self) -> None: ...
It also provides convenience methods:
get_final_text() -> strConsume the stream and return only the concatenated text deltas. Raises
StreamErroronErrorevents.get_session_end() -> SessionEndEventConsume the stream and return the final
SessionEndEvent.