Agent & the Agentic Loop¶
The Agent is the central orchestrator. It connects a transport, a set of
tools, and a context store into a single loop that streams LLM responses and
dispatches tool calls until the model signals it is done.
The Agent dataclass¶
@dataclass(slots=True)
class Agent:
system: str
tools: list[Tool]
transport: CompletionTransport
max_iterations: int = 50
systemThe system prompt sent with every request.
toolsAvailable tools. The agent searches this list by name when the model issues a tool call.
transportAny object satisfying the CompletionTransport protocol.
max_iterationsSafety limit preventing runaway loops. The agent emits a
SessionEndEventwith an error if this limit is reached.
How the loop works¶
flowchart TD
A[User message] --> B[Append to context]
B --> C[Get history from context]
C --> D[Stream from transport]
D --> E{Tool calls?}
E -- Yes --> F[Dispatch tools concurrently]
F --> G[Append results to context]
G --> C
E -- No --> H{Stop reason?}
H -- end_turn --> I[SessionEndEvent]
H -- max_tokens / error --> J[Error event]
The user message is appended to the context store.
The agent retrieves the full conversation history and streams it to the transport along with the tool definitions and system prompt.
As
StreamEventvalues arrive, the agent accumulates text deltas and buffers pending tool calls.When the transport yields an
IterationEndevent:If tool-use blocks were collected, the agent dispatches all tool calls concurrently via
asyncio.gather, appends the assistant message and tool results to context, and loops back to step 2.If only text was produced and the stop reason is
end_turn, the agent emits aSessionEndEventand returns.
If
max_iterationsis exceeded, the loop terminates with an error.
Streaming API¶
Agent exposes two methods:
run_stream(user_message, context) -> AgentStreamReturns an
AgentStream— an async iterator overStreamEventvalues. Use this when you need per-token streaming or want to observe tool calls as they happen.run(user_message, context) -> strConvenience wrapper that consumes the stream and returns the final text.
Concurrent tool dispatch¶
When the model requests multiple tool calls in a single response, the agent runs them all concurrently:
async def dispatch_tools(
self,
blocks: list[ToolUseBlock],
iteration: int,
) -> list[ToolResultBlock]:
tasks = [self._call_tool(block) for block in blocks]
return list(await asyncio.gather(*tasks))
Each tool call goes through the full guard chain before execution. If a tool
raises an exception, the agent catches it and wraps it in a ToolResultBlock
with is_error=True — the model sees the error and can react accordingly.