API Reference

Auto-generated documentation from source docstrings.

Core

axio - public API.

class axio.Agent(system: 'str', transport: 'CompletionTransport', tools: 'list[Tool[Any]]' = <factory>, selector: 'ToolSelector | None' = None, max_iterations: 'int' = 50, last_iteration_message: 'Message | None' = None)[source]

Bases: object

copy(**overrides: Any) Self[source]

Return a new Agent with overrides applied.

async dispatch_tools(blocks: list[ToolUseBlock], iteration: int) list[ToolResultBlock][source]
last_iteration_message: Message | None
max_iterations: int
async run(user_message: str, context: ContextStore) str[source]
run_stream(user_message: str, context: ContextStore) AgentStream[source]
selector: ToolSelector | None
system: str
tools: list[Tool[Any]]
transport: CompletionTransport
class axio.AgentStream(generator: AsyncGenerator[StreamEvent, None])[source]

Bases: object

async aclose() None[source]
async get_final_text() str[source]
async get_session_end() SessionEndEvent[source]
class axio.AudioOutputDelta(data: bytes, media_type: str = 'audio/pcm;rate=24000')[source]

Bases: object

Streaming audio chunk from the assistant in a realtime session.

data: bytes
media_type: str
class axio.CompletionTransport(*args, **kwargs)[source]

Bases: Protocol

stream(messages: list[Message], tools: list[Tool[Any]], system: str) AsyncIterator[StreamEvent][source]
class axio.ConcurrentGuard[source]

Bases: PermissionGuard, ABC

Guard with concurrency control.

Subclass and override check(). __call__ acquires the semaphore then delegates to check(). Set concurrency to control parallelism (default 1 - one check at a time).

concurrency: int = 1
class axio.ContextStore[source]

Bases: ABC

async add_context_tokens(input_tokens: int, output_tokens: int) None[source]
abstractmethod async append(message: Message) None[source]
async clear() None[source]
async close() None[source]

No-op by default.

async fork() ContextStore[source]

Returns a MemoryContextStore deep copy by default.

async classmethod from_context(context: ContextStore) Self[source]
async classmethod from_history(history: list[Message]) Self[source]

Create a new ContextStore pre-populated with history.

async get_context_tokens() tuple[int, int][source]

Returns (0, 0) by default.

abstractmethod async get_history() list[Message][source]
async list_sessions() list[SessionInfo][source]

List available sessions. Default: returns a single entry for the current session.

property session_id: str

Lazy-init UUID hex; works without calling super().__init__().

async set_context_tokens(input_tokens: int, output_tokens: int) None[source]

No-op by default; tokens are silently dropped.

axio.Field(description: str = '', default: Any = MISSING, ge: int | float | None = None, le: int | float | None = None) FieldInfo[source]

Annotate a handler parameter with metadata (description, default, constraints).

Usage:

async def search(
    query: Annotated[str, Field(description="Search query")],
    limit: Annotated[int, Field(default=10, ge=1, le=100)],
) -> str: ...
class axio.FieldInfo(description: str = '', default: Any = MISSING, ge: int | float | None = None, le: int | float | None = None, strict: bool = False)[source]

Bases: object

Metadata attached to a handler parameter via Annotated[T, FieldInfo(...)].

default: Any = MISSING
description: str = ''
ge: int | float | None = None
le: int | float | None = None
strict: bool = False
validate(value: Any, name: str, hint: Any) None[source]

Validate value against this FieldInfo’s constraints, raising if invalid.

exception axio.GuardError[source]

Bases: ToolError

Guard denied or crashed during permission check.

exception axio.HandlerError[source]

Bases: ToolError

Handler raised during execution.

class axio.IterationEnd(iteration: 'int', stop_reason: 'StopReason', usage: 'Usage')[source]

Bases: object

iteration: int
stop_reason: StopReason
usage: Usage
class axio.MemoryContextStore(history: list[Message] | None = None)[source]

Bases: ContextStore

Simple in-memory context store. fork() returns a deep copy.

async append(message: Message) None[source]
async clear() None[source]
async close() None[source]

No-op by default.

async fork() MemoryContextStore[source]

Returns a MemoryContextStore deep copy by default.

async get_context_tokens() tuple[int, int][source]

Returns (0, 0) by default.

async get_history() list[Message][source]
property session_id: str

Lazy-init UUID hex; works without calling super().__init__().

async set_context_tokens(input_tokens: int, output_tokens: int) None[source]

No-op by default; tokens are silently dropped.

class axio.Message(role: "Literal['user', 'assistant', 'system']", content: 'list[ContentBlock]' = <factory>)[source]

Bases: object

content: list[ContentBlock]
classmethod from_dict(data: dict[str, Any]) Message[source]
role: Literal['user', 'assistant', 'system']
to_dict() dict[str, Any][source]
class axio.PermissionGuard[source]

Bases: ABC

Gate for tool calls. Return modified kwargs to allow, raise to deny.

Tool invokes guards via await guard(tool, **kwargs).

Guards receive the Tool object and the raw keyword arguments before execution. Return the (possibly modified) dict to allow; raise GuardError to deny. Because guards see all inputs up front, they are also the right place for logging and auditing:

class AuditGuard(PermissionGuard):
    async def check(self, tool: Tool, **kwargs: Any) -> dict[str, Any]:
        logger.info("tool=%s args=%s", tool.name, kwargs)
        return kwargs  # always allow

See examples/agent_swarm/agent_swarm/__main__.py (RoleGuard) for a production example.

abstractmethod async check(tool: Tool[Any], **kwargs: Any) dict[str, Any][source]
class axio.RealtimeAgent(system: str, transport: RealtimeTransport, tools: list[Tool[Any]] = <factory>, voice: str | None = None, input_audio_format: str = 'audio/pcm;rate=16000', output_audio_format: str = 'audio/pcm;rate=24000', raise_on_error: bool = True)[source]

Bases: object

Drives a duplex realtime session against a RealtimeTransport.

The agent intercepts ToolUseStart/ToolInputDelta/TurnComplete events to assemble tool calls and dispatches them as background tasks so that streaming audio output from the provider is not blocked. Each tool result is sent back to the session via RealtimeSession.send_tool_result() as soon as the handler returns; interrupt() cancels in-flight tasks.

Lifecycle:

async with RealtimeAgent(system="...", transport=t, tools=[...]) as agent:
    await agent.send(AudioBlock(data=mic_chunk, media_type="audio/pcm"))
    async for event in agent.events():
        match event:
            case AudioOutputDelta(data=pcm):
                speaker.feed(pcm)
            ...
async close() None[source]
async commit() None[source]
async connect() None[source]
async events() AsyncIterator[StreamEvent][source]

Yield events from the session, dispatching tool calls as side-effects.

Tool dispatch runs as background tasks — slow handlers do not block the stream of AudioOutputDelta from the provider.

input_audio_format: str
async interrupt() None[source]
output_audio_format: str
raise_on_error: bool

Re-raise the exception wrapped in any Error event yielded by the session. Set to False to receive Error events as data and decide what to do per error (transient retry, log-and-continue, etc.).

async send(content: ContentBlock | list[ContentBlock]) None[source]
property session: RealtimeSession
system: str
tools: list[Tool[Any]]
transport: RealtimeTransport
voice: str | None
class axio.RealtimeSession(*args, **kwargs)[source]

Bases: Protocol

Active duplex realtime session — bidirectional audio / text / tools.

Returned by RealtimeTransport.connect(). Events from the provider arrive on events(); user input is pushed via send().

async close() None[source]

Tear down the session and release resources.

async commit() None[source]

Signal end-of-utterance for manual VAD; no-op with server VAD.

events() AsyncIterator[StreamEvent][source]

Async iterator over server events for the lifetime of this session.

async interrupt() None[source]

Abort in-flight assistant generation (e.g. user interrupted).

async send(content: ContentBlock | list[ContentBlock]) None[source]

Append user content (audio chunk, text, image) to the input buffer.

async send_tool_result(tool_use_id: ToolCallID, name: ToolName, content: str | list[ContentBlock]) None[source]

Deliver a tool’s result to the provider so generation can resume.

name is included because some providers (e.g. Gemini Live) require the tool name alongside the call id; OpenAI realtime can ignore it.

class axio.RealtimeTransport(*args, **kwargs)[source]

Bases: Protocol

Provider that supports duplex realtime sessions (e.g. OpenAI Realtime, Gemini Live). Distinct from CompletionTransport because the interaction is bidirectional, not request/response.

async connect(*, system: str, tools: list[Tool[Any]], voice: str | None = None, input_audio_format: str = 'audio/pcm;rate=16000', output_audio_format: str = 'audio/pcm;rate=24000') RealtimeSession[source]
class axio.SpeechStarted[source]

Bases: object

Server VAD detected the user started speaking (realtime).

class axio.SpeechStopped[source]

Bases: object

Server VAD detected the user stopped speaking (realtime).

class axio.StopReason(*values)[source]

Bases: StrEnum

end_turn = 'end_turn'
error = 'error'
max_tokens = 'max_tokens'
tool_use = 'tool_use'
type axio.StreamEvent = ReasoningDelta | TextDelta | ImageOutput | AudioOutput | VideoOutput | ToolUseStart | ToolInputDelta | ToolFieldStart | ToolFieldDelta | ToolFieldEnd | ToolOutputDelta | ToolResult | IterationEnd | Error | SessionEndEvent | AudioOutputDelta | TranscriptDelta | SpeechStarted | SpeechStopped | TurnComplete
class axio.TextBlock(text: 'str')[source]

Bases: ContentBlock

text: str
class axio.TextDelta(index: 'int', delta: 'str')[source]

Bases: object

delta: str
index: int
class axio.Tool(name: 'ToolName', handler: 'Callable[..., Awaitable[Any]]', description: 'str' = '', guards: 'tuple[PermissionGuard, ...]' = (), concurrency: 'int | None' = None, context: 'T' = mappingproxy({}), schema: 'MappingProxyType[str, Any]' = mappingproxy({}))[source]

Bases: Generic

async call_streaming(**kwargs: Any) AsyncGenerator[tuple[str, str], None][source]

Execute handler, yielding (key, text) chunks for streaming output.

Uses handler.stream(**kwargs) if the handler exposes one. Otherwise falls back to __call__() and yields the full result as a single ("output", ...) chunk. Semaphore is held for the entire iteration.

concurrency: int | None
context: T
description: str
format_stream_result(chunks: list[tuple[float, str, str]]) str[source]

Aggregate streamed chunks into the final tool result string.

Handlers may attach a format_stream_result callable for structured output (e.g. shell log records). Defaults to text concatenation.

guards: tuple[PermissionGuard, ...]
handler: Callable[[...], Awaitable[Any]]
property input_schema: JSONSchema
name: ToolName
schema: MappingProxyType
property supports_streaming: bool

Handler supports streaming if it exposes a .stream async-generator attribute.

class axio.ToolInputDelta(index: 'int', tool_use_id: 'ToolCallID', partial_json: 'str')[source]

Bases: object

index: int
partial_json: str
tool_use_id: ToolCallID
class axio.ToolResult(tool_use_id: 'ToolCallID', name: 'ToolName', is_error: 'bool', content: 'str' = '', input: 'dict[str, Any]'=<factory>)[source]

Bases: object

content: str
input: dict[str, Any]
is_error: bool
name: ToolName
tool_use_id: ToolCallID
class axio.ToolResultBlock(tool_use_id: 'ToolCallID', content: 'str | list[TextBlock | ImageBlock | AudioBlock | VideoBlock]', is_error: 'bool' = False)[source]

Bases: ContentBlock

content: str | list[TextBlock | ImageBlock | AudioBlock | VideoBlock]
is_error: bool
tool_use_id: ToolCallID
class axio.ToolSelector(*args, **kwargs)[source]

Bases: Protocol

async select(messages: Iterable[Message], tools: Iterable[Tool[Any]]) Iterable[Tool[Any]][source]
class axio.ToolUseBlock(id: 'ToolCallID', name: 'ToolName', input: 'dict[str, Any]')[source]

Bases: ContentBlock

id: ToolCallID
input: dict[str, Any]
name: ToolName
class axio.ToolUseStart(index: 'int', tool_use_id: 'ToolCallID', name: 'ToolName')[source]

Bases: object

index: int
name: ToolName
tool_use_id: ToolCallID
class axio.TranscriptDelta(role: Literal['user', 'assistant'], delta: str)[source]

Bases: object

Live transcript delta — server-side STT of user mic, or assistant speech transcription, depending on role.

delta: str
role: Literal['user', 'assistant']
class axio.TurnComplete(stop_reason: StopReason, usage: Usage | None = None)[source]

Bases: object

Assistant turn finished in a realtime session. stop_reason may be StopReason.tool_use to signal that pending tool calls should run before the next turn starts.

stop_reason: StopReason
usage: Usage | None
class axio.Usage(input_tokens: 'int', output_tokens: 'int')[source]

Bases: object

input_tokens: int
output_tokens: int

Agent: the core agentic loop orchestrating transport, tools, and context.

class axio.agent.Agent(system: 'str', transport: 'CompletionTransport', tools: 'list[Tool[Any]]' = <factory>, selector: 'ToolSelector | None' = None, max_iterations: 'int' = 50, last_iteration_message: 'Message | None' = None)[source]

Bases: object

copy(**overrides: Any) Self[source]

Return a new Agent with overrides applied.

async dispatch_tools(blocks: list[ToolUseBlock], iteration: int) list[ToolResultBlock][source]
last_iteration_message: Message | None
max_iterations: int
async run(user_message: str, context: ContextStore) str[source]
run_stream(user_message: str, context: ContextStore) AgentStream[source]
selector: ToolSelector | None
system: str
tools: list[Tool[Any]]
transport: CompletionTransport

Tool: frozen dataclass binding a handler callable to a name, guards, and concurrency.

type axio.tool.JSONSchema = dict[str, Any]
class axio.tool.Tool(name: 'ToolName', handler: 'Callable[..., Awaitable[Any]]', description: 'str' = '', guards: 'tuple[PermissionGuard, ...]' = (), concurrency: 'int | None' = None, context: 'T' = mappingproxy({}), schema: 'MappingProxyType[str, Any]' = mappingproxy({}))[source]

Bases: Generic

async call_streaming(**kwargs: Any) AsyncGenerator[tuple[str, str], None][source]

Execute handler, yielding (key, text) chunks for streaming output.

Uses handler.stream(**kwargs) if the handler exposes one. Otherwise falls back to __call__() and yields the full result as a single ("output", ...) chunk. Semaphore is held for the entire iteration.

concurrency: int | None
context: T
description: str
format_stream_result(chunks: list[tuple[float, str, str]]) str[source]

Aggregate streamed chunks into the final tool result string.

Handlers may attach a format_stream_result callable for structured output (e.g. shell log records). Defaults to text concatenation.

guards: tuple[PermissionGuard, ...]
handler: Callable[[...], Awaitable[Any]]
property input_schema: JSONSchema
name: ToolName
schema: MappingProxyType
property supports_streaming: bool

Handler supports streaming if it exposes a .stream async-generator attribute.

axio.tool.hint_from_json_schema(prop_schema: dict[str, Any]) Any[source]

Return the Python type hint for a single JSON Schema property definition.

Context & Events

ContextStore: protocol for conversation history storage.

class axio.context.ContextStore[source]

Bases: ABC

async add_context_tokens(input_tokens: int, output_tokens: int) None[source]
abstractmethod async append(message: Message) None[source]
async clear() None[source]
async close() None[source]

No-op by default.

async fork() ContextStore[source]

Returns a MemoryContextStore deep copy by default.

async classmethod from_context(context: ContextStore) Self[source]
async classmethod from_history(history: list[Message]) Self[source]

Create a new ContextStore pre-populated with history.

async get_context_tokens() tuple[int, int][source]

Returns (0, 0) by default.

abstractmethod async get_history() list[Message][source]
async list_sessions() list[SessionInfo][source]

List available sessions. Default: returns a single entry for the current session.

property session_id: str

Lazy-init UUID hex; works without calling super().__init__().

async set_context_tokens(input_tokens: int, output_tokens: int) None[source]

No-op by default; tokens are silently dropped.

class axio.context.MemoryContextStore(history: list[Message] | None = None)[source]

Bases: ContextStore

Simple in-memory context store. fork() returns a deep copy.

async append(message: Message) None[source]
async clear() None[source]
async close() None[source]

No-op by default.

async fork() MemoryContextStore[source]

Returns a MemoryContextStore deep copy by default.

async get_context_tokens() tuple[int, int][source]

Returns (0, 0) by default.

async get_history() list[Message][source]
property session_id: str

Lazy-init UUID hex; works without calling super().__init__().

async set_context_tokens(input_tokens: int, output_tokens: int) None[source]

No-op by default; tokens are silently dropped.

class axio.context.SessionInfo(session_id: 'str', message_count: 'int', preview: 'str', created_at: 'str', input_tokens: 'int' = 0, output_tokens: 'int' = 0)[source]

Bases: object

created_at: str
input_tokens: int
message_count: int
output_tokens: int
preview: str
session_id: str

Stream events: all variants emitted by AgentStream.

class axio.events.AudioOutput(index: int, data: bytes, media_type: AudioMediaType)[source]

Bases: object

Audio content from a tool result (e.g. read_file on an audio file).

data: bytes
index: int
media_type: AudioMediaType
class axio.events.AudioOutputDelta(data: bytes, media_type: str = 'audio/pcm;rate=24000')[source]

Bases: object

Streaming audio chunk from the assistant in a realtime session.

data: bytes
media_type: str
class axio.events.Error(exception: 'BaseException')[source]

Bases: object

exception: BaseException
class axio.events.ImageOutput(index: int, data: bytes, media_type: ImageMediaType)[source]

Bases: object

Model generated an image inline (e.g. Nano Banana / Gemini Image).

data: bytes
index: int
media_type: ImageMediaType
class axio.events.IterationEnd(iteration: 'int', stop_reason: 'StopReason', usage: 'Usage')[source]

Bases: object

iteration: int
stop_reason: StopReason
usage: Usage
class axio.events.ReasoningDelta(index: 'int', delta: 'str')[source]

Bases: object

delta: str
index: int
class axio.events.SessionEndEvent(stop_reason: 'StopReason', total_usage: 'Usage')[source]

Bases: object

stop_reason: StopReason
total_usage: Usage
class axio.events.SpeechStarted[source]

Bases: object

Server VAD detected the user started speaking (realtime).

class axio.events.SpeechStopped[source]

Bases: object

Server VAD detected the user stopped speaking (realtime).

type axio.events.StreamEvent = ReasoningDelta | TextDelta | ImageOutput | AudioOutput | VideoOutput | ToolUseStart | ToolInputDelta | ToolFieldStart | ToolFieldDelta | ToolFieldEnd | ToolOutputDelta | ToolResult | IterationEnd | Error | SessionEndEvent | AudioOutputDelta | TranscriptDelta | SpeechStarted | SpeechStopped | TurnComplete
class axio.events.TextDelta(index: 'int', delta: 'str')[source]

Bases: object

delta: str
index: int
class axio.events.ToolFieldDelta(index: 'int', tool_use_id: 'ToolCallID', key: 'str', text: 'str')[source]

Bases: object

index: int
key: str
text: str
tool_use_id: ToolCallID
class axio.events.ToolFieldEnd(index: 'int', tool_use_id: 'ToolCallID', key: 'str')[source]

Bases: object

index: int
key: str
tool_use_id: ToolCallID
class axio.events.ToolFieldStart(index: 'int', tool_use_id: 'ToolCallID', key: 'str')[source]

Bases: object

index: int
key: str
tool_use_id: ToolCallID
class axio.events.ToolInputDelta(index: 'int', tool_use_id: 'ToolCallID', partial_json: 'str')[source]

Bases: object

index: int
partial_json: str
tool_use_id: ToolCallID
class axio.events.ToolOutputDelta(tool_use_id: 'ToolCallID', name: 'ToolName', key: 'str', delta: 'str')[source]

Bases: object

delta: str
key: str
name: ToolName
tool_use_id: ToolCallID
class axio.events.ToolResult(tool_use_id: 'ToolCallID', name: 'ToolName', is_error: 'bool', content: 'str' = '', input: 'dict[str, Any]'=<factory>)[source]

Bases: object

content: str
input: dict[str, Any]
is_error: bool
name: ToolName
tool_use_id: ToolCallID
class axio.events.ToolUseStart(index: 'int', tool_use_id: 'ToolCallID', name: 'ToolName')[source]

Bases: object

index: int
name: ToolName
tool_use_id: ToolCallID
class axio.events.TranscriptDelta(role: Literal['user', 'assistant'], delta: str)[source]

Bases: object

Live transcript delta — server-side STT of user mic, or assistant speech transcription, depending on role.

delta: str
role: Literal['user', 'assistant']
class axio.events.TurnComplete(stop_reason: StopReason, usage: Usage | None = None)[source]

Bases: object

Assistant turn finished in a realtime session. stop_reason may be StopReason.tool_use to signal that pending tool calls should run before the next turn starts.

stop_reason: StopReason
usage: Usage | None
class axio.events.VideoOutput(index: int, data: bytes, media_type: VideoMediaType)[source]

Bases: object

Model generated a video inline.

data: bytes
index: int
media_type: VideoMediaType

Protocols

Transport protocols: completion, image gen, TTS, STT.

Transports should be stateless - all request state lives in the arguments passed to each method call. This allows multiple agents to share a single transport instance and call it concurrently without interference.

The one allowed exception is a reusable connection pool (e.g. an aiohttp.ClientSession), which is safe to share across concurrent calls.

class axio.transport.AudioGenTransport(*args, **kwargs)[source]

Bases: Protocol

Generate n non-speech audio samples for a text prompt — music, sound effects, ambient. Returns raw audio bytes (MP3 / WAV / OGG — provider-defined). Distinct from TTSTransport, which is text-to-speech.

async generate_audios(prompt: str, *, model: str | None = None, n: int = 1) list[bytes][source]
class axio.transport.CompletionTransport(*args, **kwargs)[source]

Bases: Protocol

stream(messages: list[Message], tools: list[Tool[Any]], system: str) AsyncIterator[StreamEvent][source]
class axio.transport.DummyAudioGenTransport(*args, **kwargs)[source]

Bases: DummyTransport, AudioGenTransport

async generate_audios(prompt: str, *, model: str | None = None, n: int = 1) list[bytes][source]
class axio.transport.DummyCompletionTransport(*args, **kwargs)[source]

Bases: DummyTransport, CompletionTransport

stream(messages: list[Message], tools: list[Tool[Any]], system: str) AsyncIterator[StreamEvent][source]
class axio.transport.DummyEmbeddingTransport(*args, **kwargs)[source]

Bases: DummyTransport, EmbeddingTransport

embed(texts: list[str]) Any[source]
class axio.transport.DummyImageGenTransport(*args, **kwargs)[source]

Bases: DummyTransport, ImageGenTransport

async generate_images(prompt: str, *, model: str | None = None, n: int = 1) list[bytes][source]
class axio.transport.DummySTTTransport(*args, **kwargs)[source]

Bases: DummyTransport, STTTransport

async transcribe(audio: bytes, media_type: str = 'audio/wav') str[source]
class axio.transport.DummyTTSTransport(*args, **kwargs)[source]

Bases: DummyTransport, TTSTransport

synthesize(text: str, *, voice: str | None = None) AsyncIterator[bytes][source]
class axio.transport.DummyTransport[source]

Bases: object

Placeholder transport that fails loudly if actually used.

Assign this as the default transport when constructing agent prototypes that will be configured later via agent.copy(transport=real_transport).

Example:

from axio.agent import Agent
from axio.transport import DummyCompletionTransport

researcher = Agent(
    system="You are a research assistant...",
    transport=DummyCompletionTransport(),
)

# At runtime, swap in the real transport:
active = researcher.copy(transport=OpenAITransport())
result = await active.run(task, context)
class axio.transport.DummyVideoGenTransport(*args, **kwargs)[source]

Bases: DummyTransport, VideoGenTransport

async generate_videos(prompt: str, *, model: str | None = None, n: int = 1, image: bytes | None = None, duration_seconds: int | None = None, aspect_ratio: str | None = None) list[bytes][source]
class axio.transport.EmbeddingTransport(*args, **kwargs)[source]

Bases: Protocol

async embed(texts: list[str]) list[list[float]][source]
class axio.transport.ImageGenTransport(*args, **kwargs)[source]

Bases: Protocol

Generate n image samples for a text prompt. Returns raw image bytes (PNG / JPEG / WebP — provider-defined).

async generate_images(prompt: str, *, model: str | None = None, n: int = 1) list[bytes][source]
class axio.transport.RealtimeSession(*args, **kwargs)[source]

Bases: Protocol

Active duplex realtime session — bidirectional audio / text / tools.

Returned by RealtimeTransport.connect(). Events from the provider arrive on events(); user input is pushed via send().

async close() None[source]

Tear down the session and release resources.

async commit() None[source]

Signal end-of-utterance for manual VAD; no-op with server VAD.

events() AsyncIterator[StreamEvent][source]

Async iterator over server events for the lifetime of this session.

async interrupt() None[source]

Abort in-flight assistant generation (e.g. user interrupted).

async send(content: ContentBlock | list[ContentBlock]) None[source]

Append user content (audio chunk, text, image) to the input buffer.

async send_tool_result(tool_use_id: ToolCallID, name: ToolName, content: str | list[ContentBlock]) None[source]

Deliver a tool’s result to the provider so generation can resume.

name is included because some providers (e.g. Gemini Live) require the tool name alongside the call id; OpenAI realtime can ignore it.

class axio.transport.RealtimeTransport(*args, **kwargs)[source]

Bases: Protocol

Provider that supports duplex realtime sessions (e.g. OpenAI Realtime, Gemini Live). Distinct from CompletionTransport because the interaction is bidirectional, not request/response.

async connect(*, system: str, tools: list[Tool[Any]], voice: str | None = None, input_audio_format: str = 'audio/pcm;rate=16000', output_audio_format: str = 'audio/pcm;rate=24000') RealtimeSession[source]
class axio.transport.STTTransport(*args, **kwargs)[source]

Bases: Protocol

async transcribe(audio: bytes, media_type: str = 'audio/wav') str[source]
class axio.transport.TTSTransport(*args, **kwargs)[source]

Bases: Protocol

synthesize(text: str, *, voice: str | None = None) AsyncIterator[bytes][source]
class axio.transport.VideoGenTransport(*args, **kwargs)[source]

Bases: Protocol

Generate n video samples for a text prompt. Returns raw video bytes (MP4 / WebM — provider-defined). Provider-specific knobs (duration, aspect ratio, seed image, etc.) live as extra kwargs on the implementation.

async generate_videos(prompt: str, *, model: str | None = None, n: int = 1, image: bytes | None = None, duration_seconds: int | None = None, aspect_ratio: str | None = None) list[bytes][source]

Permission system: guards that gate tool execution.

class axio.permission.AllowAllGuard[source]

Bases: PermissionGuard

async check(tool: Tool[Any], **kwargs: Any) dict[str, Any][source]
class axio.permission.ConcurrentGuard[source]

Bases: PermissionGuard, ABC

Guard with concurrency control.

Subclass and override check(). __call__ acquires the semaphore then delegates to check(). Set concurrency to control parallelism (default 1 - one check at a time).

concurrency: int = 1
class axio.permission.DenyAllGuard[source]

Bases: PermissionGuard

async check(tool: Tool[Any], **kwargs: Any) dict[str, Any][source]
class axio.permission.PermissionGuard[source]

Bases: ABC

Gate for tool calls. Return modified kwargs to allow, raise to deny.

Tool invokes guards via await guard(tool, **kwargs).

Guards receive the Tool object and the raw keyword arguments before execution. Return the (possibly modified) dict to allow; raise GuardError to deny. Because guards see all inputs up front, they are also the right place for logging and auditing:

class AuditGuard(PermissionGuard):
    async def check(self, tool: Tool, **kwargs: Any) -> dict[str, Any]:
        logger.info("tool=%s args=%s", tool.name, kwargs)
        return kwargs  # always allow

See examples/agent_swarm/agent_swarm/__main__.py (RoleGuard) for a production example.

abstractmethod async check(tool: Tool[Any], **kwargs: Any) dict[str, Any][source]
class axio.selector.ToolSelector(*args, **kwargs)[source]

Bases: Protocol

async select(messages: Iterable[Message], tools: Iterable[Tool[Any]]) Iterable[Tool[Any]][source]

Data Types

Message: the fundamental unit of conversation history.

class axio.messages.Message(role: "Literal['user', 'assistant', 'system']", content: 'list[ContentBlock]' = <factory>)[source]

Bases: object

content: list[ContentBlock]
classmethod from_dict(data: dict[str, Any]) Message[source]
role: Literal['user', 'assistant', 'system']
to_dict() dict[str, Any][source]

Content blocks: TextBlock, ImageBlock, AudioBlock, ToolUseBlock, ToolResultBlock.

class axio.blocks.AudioBlock(media_type: 'AudioMediaType', data: 'bytes')[source]

Bases: ContentBlock

data: bytes
media_type: AudioMediaType
type axio.blocks.AudioMediaType = Literal['audio/x-aac', 'audio/flac', 'audio/mp3', 'audio/m4a', 'audio/mpeg', 'audio/mpga', 'audio/mp4', 'audio/ogg', 'audio/pcm', 'audio/wav', 'audio/webm']
class axio.blocks.ContentBlock[source]

Bases: object

Base class for all content blocks.

class axio.blocks.ImageBlock(media_type: 'ImageMediaType', data: 'bytes')[source]

Bases: ContentBlock

data: bytes
media_type: ImageMediaType
type axio.blocks.ImageMediaType = Literal['image/jpeg', 'image/png', 'image/gif', 'image/webp']
class axio.blocks.TextBlock(text: 'str')[source]

Bases: ContentBlock

text: str
class axio.blocks.ToolResultBlock(tool_use_id: 'ToolCallID', content: 'str | list[TextBlock | ImageBlock | AudioBlock | VideoBlock]', is_error: 'bool' = False)[source]

Bases: ContentBlock

content: str | list[TextBlock | ImageBlock | AudioBlock | VideoBlock]
is_error: bool
tool_use_id: ToolCallID
class axio.blocks.ToolUseBlock(id: 'ToolCallID', name: 'ToolName', input: 'dict[str, Any]')[source]

Bases: ContentBlock

id: ToolCallID
input: dict[str, Any]
name: ToolName
class axio.blocks.VideoBlock(media_type: 'VideoMediaType', data: 'bytes')[source]

Bases: ContentBlock

data: bytes
media_type: VideoMediaType
type axio.blocks.VideoMediaType = Literal['video/mp4', 'video/mpeg', 'video/mov', 'video/avi', 'video/x-flv', 'video/mpg', 'video/webm', 'video/wmv', 'video/3gpp']
axio.blocks.from_dict(data: dict[str, Any]) ContentBlock[source]

Deserialize a plain dict to a ContentBlock.

axio.blocks.to_dict(block: ContentBlock) dict[str, Any][source]
axio.blocks.to_dict(block: TextBlock) dict[str, Any]
axio.blocks.to_dict(block: ImageBlock) dict[str, Any]
axio.blocks.to_dict(block: AudioBlock) dict[str, Any]
axio.blocks.to_dict(block: VideoBlock) dict[str, Any]
axio.blocks.to_dict(block: ToolUseBlock) dict[str, Any]
axio.blocks.to_dict(block: ToolResultBlock) dict[str, Any]

Serialize a ContentBlock to a plain dict.

Primitive types: ToolName, ToolCallID, StopReason, Usage.

class axio.types.StopReason(*values)[source]

Bases: StrEnum

end_turn = 'end_turn'
error = 'error'
max_tokens = 'max_tokens'
tool_use = 'tool_use'
type axio.types.ToolCallID = str
type axio.types.ToolName = str
class axio.types.Usage(input_tokens: 'int', output_tokens: 'int')[source]

Bases: object

input_tokens: int
output_tokens: int

Transport-agnostic model types: Capability, ModelSpec, ModelRegistry.

class axio.models.Capability(*values)[source]

Bases: StrEnum

audio = 'audio'
embedding = 'embedding'
image_generation = 'image_generation'
json_mode = 'json_mode'
reasoning = 'reasoning'
structured_outputs = 'structured_outputs'
text = 'text'
tool_use = 'tool_use'
video = 'video'
video_generation = 'video_generation'
vision = 'vision'
class axio.models.ModelRegistry(models: Iterable[ModelSpec] | None = None)[source]

Bases: MutableMapping[str, ModelSpec]

by_capability(*caps: Capability) ModelRegistry[source]
by_cost(*, output: bool = False, desc: bool = False) ModelRegistry[source]

Return registry ordered by cost (input by default, output if output=True).

by_prefix(prefix: str) ModelRegistry[source]
clear() None.  Remove all items from D.[source]
first() ModelSpec[source]
ids() list[str][source]
items() a set-like object providing a view on D's items[source]
keys() a set-like object providing a view on D's keys[source]
last() ModelSpec[source]
search(*q: str) ModelRegistry[source]

search by parts of id

values() an object providing a view on D's values[source]
class axio.models.ModelSpec(id: 'str', capabilities: 'frozenset[Capability]' = frozenset(), max_output_tokens: 'int' = 8192, context_window: 'int' = 128000, input_cost: 'float' = 0.0, output_cost: 'float' = 0.0)[source]

Bases: object

capabilities: frozenset[Capability]
context_window: int
id: str
input_cost: float
max_output_tokens: int
output_cost: float

Field metadata for tool handler functions - lightweight replacement for pydantic.Field.

axio.field.Field(description: str = '', default: Any = MISSING, ge: int | float | None = None, le: int | float | None = None) FieldInfo[source]

Annotate a handler parameter with metadata (description, default, constraints).

Usage:

async def search(
    query: Annotated[str, Field(description="Search query")],
    limit: Annotated[int, Field(default=10, ge=1, le=100)],
) -> str: ...
class axio.field.FieldInfo(description: str = '', default: Any = MISSING, ge: int | float | None = None, le: int | float | None = None, strict: bool = False)[source]

Bases: object

Metadata attached to a handler parameter via Annotated[T, FieldInfo(...)].

default: Any = MISSING
description: str = ''
ge: int | float | None = None
le: int | float | None = None
strict: bool = False
validate(value: Any, name: str, hint: Any) None[source]

Validate value against this FieldInfo’s constraints, raising if invalid.

axio.field.MISSING: Final[MissingSentinel] = MISSING

Sentinel value meaning “this field has no default and is required”.

class axio.field.MissingSentinel[source]

Bases: object

Singleton sentinel - distinguishes ‘no default’ from None.

axio.field.StrictStr

Drop-in replacement for from pydantic import StrictStr. Rejects non-string values (e.g. integers) without coercion.

alias of Annotated[str, FieldInfo(description=, default=MISSING, ge=None, le=None, strict=True)]

axio.field.bare_type(hint: Any) type[source]

Return the base Python type, stripping Annotated, Optional, and generic wrappers.

axio.field.check_list_items(value: list[Any], name: str, inner: Any) None[source]

Raise TypeError when any list element violates the generic item type.

axio.field.check_scalar(value: Any, name: str, b: type, strict: bool) None[source]

Raise TypeError when value does not satisfy the scalar type b.

axio.field.check_type(value: Any, name: str, inner: Any, *, strict: bool) None[source]

Dispatch type validation for value against inner (already unwrapped).

axio.field.get_field_info(annotation: Any) FieldInfo | None[source]

Extract a FieldInfo from an Annotated annotation, or return None.

axio.field.is_classvar(annotation: Any) bool[source]

Return True if annotation is ClassVar or ClassVar[X].

axio.field.unwrap_hint(hint: Any) tuple[Any, bool][source]

Strip Annotated and Optional wrappers.

Returns (inner_type, is_optional) where inner_type has no Annotated or Union-with-None wrappers and is_optional is True when None was one of the Union members.

JSON-schema builder for plain async handler functions.

Produces clean schemas with no "title" keys - no post-processing needed in transports.

axio.schema.build_tool_schema(fn: Any, hints: dict[str, Any] | None = None) dict[str, Any][source]

Return a JSON schema object for fn (a callable or class).

The schema has the form:

{
    "type": "object",
    "properties": {"field": {"type": "string"}, ...},
    "required": ["field", ...]   # only when non-empty
}

No "title" keys are emitted anywhere in the schema.

Parameters

fn:

A plain async function or a class whose annotations define the fields.

hints:

Pre-computed get_type_hints(fn, include_extras=True) result. When supplied, the call to get_type_hints is skipped.

axio.schema.property_schema(annotation: Any) dict[str, Any][source]

Recursively convert a Python type annotation to a JSON schema fragment.

Utilities

AgentStream: async iterator wrapper over the agent event generator.

class axio.stream.AgentStream(generator: AsyncGenerator[StreamEvent, None])[source]

Bases: object

async aclose() None[source]
async get_final_text() str[source]
async get_session_end() SessionEndEvent[source]

Load Agent prototypes from declarative config files (TOML, JSON, INI).

Each file describes one agent. The loader scans a directory, parses every recognised file, resolves tool names against a caller-supplied toolbox, and returns a dict[str, tuple[str, Agent]] that matches the shape used by agent_swarm and similar orchestrators.

Supported formats

TOML:

name = "architect"
description = "System design and interface specs"
max_iterations = 100
tools = ["read_file", "write_file"]

[system]
text = """
You are an expert software architect...
"""

JSON:

{
  "name": "architect",
  "description": "System design and interface specs",
  "max_iterations": 100,
  "tools": ["read_file", "write_file"],
  "system": "You are an expert software architect..."
}

INI:

[agent]
name = architect
description = System design and interface specs
max_iterations = 100
tools = read_file, write_file

[system]
text = You are an expert software architect...

In all formats name falls back to the file stem when omitted. system may be a plain string or a {"text": "..."} mapping (TOML/JSON). INI tools are comma-separated.

Custom sources

Subclass AgentLoader and implement load() - the base load_file() will handle reading the file and calling your implementation automatically:

class DbAgentLoader(AgentLoader):
    def load(self, content: str) -> AgentSpec:
        row = json.loads(content)
        return AgentSpec(name=row["name"], ...)
class axio.agent_loader.AgentContext[source]

Bases: TypedDict

agent_name: str
context_factory: Callable[[], ContextStore]
on_event: Callable[[str, StreamEvent], None] | None
proto: Agent
transport: CompletionTransport
class axio.agent_loader.AgentLoader[source]

Bases: object

Base class for format-specific agent loaders.

Subclasses implement load() to parse a raw string. The source of that string is entirely up to the caller - files, databases, HTTP, etc. load_file() is provided on the base class and calls load() automatically.

extensions: tuple[str, ...] = ()
load(content: str) AgentSpec[source]

Parse content and return an AgentSpec.

name defaults to "" when not present; load_file() patches it from the file stem after calling this method.

load_file(path: Path) AgentSpec[source]

Read path and delegate to load(), patching name from the stem.

Any ValueError raised by load() is re-raised with path prepended so errors are easy to locate.

scan(directory: Path, toolbox: Mapping[str, Tool[Any]] = mappingproxy({})) dict[str, tuple[str, Agent]][source]

Scan directory for agent files and return a name→(description, agent) dict.

class axio.agent_loader.AgentSpec(name: str, description: str, system: str, max_iterations: int = 50, tools: tuple[str, ...] = (), model: str | None = None)[source]

Bases: object

Parsed agent definition before transport/tools are injected.

description: str
max_iterations: int = 50
model: str | None = None
name: str
system: str
to_agent(toolbox: Mapping[str, Tool[Any]] = mappingproxy({})) Agent[source]

Return a prototype Agent with toolbox tools attached.

The agent uses DummyCompletionTransport - call agent.copy(transport=real_transport) before running it.

Raises KeyError if any name in self.tools is absent from toolbox.

tools: tuple[str, ...] = ()
class axio.agent_loader.IniAgentLoader[source]

Bases: AgentLoader

Load agent definitions from .ini files.

Expected sections: [agent] for metadata, [system] for the prompt. The tools key is comma-separated. Multiline system prompts work via standard ConfigParser continuation (indent subsequent lines).

extensions: tuple[str, ...] = ('ini',)
load(content: str) AgentSpec[source]

Parse content and return an AgentSpec.

name defaults to "" when not present; load_file() patches it from the file stem after calling this method.

class axio.agent_loader.JsonAgentLoader[source]

Bases: AgentLoader

Load agent definitions from .json files.

extensions: tuple[str, ...] = ('json',)
load(content: str) AgentSpec[source]

Parse content and return an AgentSpec.

name defaults to "" when not present; load_file() patches it from the file stem after calling this method.

class axio.agent_loader.MultiFormatLoader(loaders: list[AgentLoader] = <factory>)[source]

Bases: object

Aggregate loader that handles TOML, JSON, and INI files.

loaders: list[AgentLoader]
scan(directory: Path, toolbox: Mapping[str, Tool[Any]] = mappingproxy({})) dict[str, tuple[str, Agent]][source]

Scan directory with all registered loaders, later loaders win on name collision.

class axio.agent_loader.TomlAgentLoader[source]

Bases: AgentLoader

Load agent definitions from .toml files.

extensions: tuple[str, ...] = ('toml',)
load(content: str) AgentSpec[source]

Parse content and return an AgentSpec.

name defaults to "" when not present; load_file() patches it from the file stem after calling this method.

async axio.agent_loader.agent_tool(task: ~typing.Annotated[str, ~axio.field.FieldInfo(description=Full task instructions., default=MISSING, ge=None, le=None, strict=False)]) str[source]

Delegate a task to a sub-agent.

axio.agent_loader.load_agents(directory: Path, toolbox: Mapping[str, Tool[Any]] = mappingproxy({})) dict[str, tuple[str, Agent]][source]

Scan directory for .toml, .json, and .ini agent files.

Returns dict[name, (description, agent)] - same shape as the AGENTS registry used in agent_swarm and similar examples.

Example:

from pathlib import Path
from axio.agent_loader import load_agents

AGENTS = load_agents(
    Path(__file__).parent / "roles",
    toolbox={"read_file": read_file_tool, "write_file": write_file_tool},
)
axio.agent_loader.load_agents_from_dir(directory: ~pathlib.Path, transport: ~axio.transport.CompletionTransport, context_factory: ~collections.abc.Callable[[], ~axio.context.ContextStore] = <class 'axio.context.MemoryContextStore'>, on_event: ~collections.abc.Callable[[str, ~axio.events.StreamEvent], None] | None = None) list[Tool[AgentContext]][source]
axio.agent_loader.make_agent_tools(agents: dict[str, tuple[str, ~axio.agent.Agent]], transport: ~axio.transport.CompletionTransport, context_factory: ~collections.abc.Callable[[], ~axio.context.ContextStore] = <class 'axio.context.MemoryContextStore'>, on_event: ~collections.abc.Callable[[str, ~axio.events.StreamEvent], None] | None = None, agent_name_prefix: str = '') list[Tool[AgentContext]][source]

Convert each agent into its own Tool.

Each tool is named after the agent and accepts a single task field. Runtime dependencies (transport, context factory, event callback) are stored in context and injected on each call.

Parameters

agents:

dict[name, (description, prototype_agent)] - e.g. the value returned by load_agents().

transport:

Transport assigned to the selected agent via agent.copy().

context_factory:

Called once per invocation to produce a fresh ContextStore. Defaults to MemoryContextStore.

on_event:

Optional callback receiving (agent_name, event) for every StreamEvent the agent emits.

agent_name_prefix:

Prefix to prepend to each agent name.

Example:

from axio.agent_loader import load_agents, make_agent_tools

agents = load_agents(Path("roles"), toolbox={"read_file": read_file_tool})
tools = make_agent_tools(agents, transport=my_transport)
orchestrator = Agent(system="...", transport=my_transport, tools=tools)

Incremental streaming parser for tool call JSON arguments.

Feeds partial JSON chunks (from ToolInputDelta.partial_json) and emits structured ToolField* events as top-level object fields are discovered.

Top-level string values are decoded (escape sequences resolved, quotes stripped). All other top-level values are emitted as raw JSON fragments.

class axio.tool_args.State(*values)[source]

Bases: IntEnum

AFTER = 7
COLON = 3
ESC = 8
INIT = 0
KEY = 2
OBJ = 1
RAW = 6
STR = 5
UESC = 9
VAL = 4
class axio.tool_args.ToolArgStream(tool_use_id: str, index: int = 0)[source]

Bases: object

O(1)-per-character streaming parser for tool argument JSON.

Usage:

stream = ToolArgStream("call_1")
events = stream.feed('{"path":"/tmp/f')
# [ToolFieldStart(0, "call_1", "path"), ToolFieldDelta(0, "call_1", "path", "/tmp/f")]
events = stream.feed('oo.py"}')
# [ToolFieldDelta(0, "call_1", "path", "oo.py"), ToolFieldEnd(0, "call_1", "path")]
property current_key: str

The field currently being streamed, or "".

property done: bool

Whether the top-level JSON object has been fully parsed.

feed(chunk: str) list[ToolFieldEvent][source]

Process a partial JSON chunk and return any field events produced.

type axio.tool_args.ToolFieldEvent = ToolFieldStart | ToolFieldDelta | ToolFieldEnd

Context compaction: summarise old history to stay within token limits.

class axio.compaction.AutoCompactStore(store: ContextStore, transport: CompletionTransport, *, keep_recent: int = 6, max_tokens: int | None = None, threshold: float = 0.75)[source]

Bases: ContextStore

Delegating ContextStore wrapper that auto-compacts the inner store when token usage exceeds a threshold. Works with any ContextStore backend - MemoryContextStore, SQLiteContextStore, etc.

The threshold defaults to 75 % of transport.model.context_window (read via getattr; falls back to 128 000 if the transport has no model attribute). Pass max_tokens explicitly to override.

Compaction fires from add_context_tokens(), which the agent loop calls immediately after IterationEnd - input_tokens there equals the real context size sent to the model in that iteration.

Internally, _do_compact() forks the inner store before calling compact_context. The fork acts as a stable snapshot: the summarisation agent reads from it while the live store remains writable. The live store is only cleared and repopulated after the (async) summary call returns.

Example:

from axio.compaction import AutoCompactStore
from axio.context import MemoryContextStore

store = AutoCompactStore(MemoryContextStore(), transport, keep_recent=6)
result = await agent.run(task, store)
async add_context_tokens(input_tokens: int, output_tokens: int) None[source]
async append(message: Message) None[source]
async clear() None[source]
async close() None[source]

No-op by default.

async fork() AutoCompactStore[source]

Return an AutoCompactStore wrapping a fork of the inner store.

async get_context_tokens() tuple[int, int][source]

Returns (0, 0) by default.

async get_history() list[Message][source]
async list_sessions() list[SessionInfo][source]

List available sessions. Default: returns a single entry for the current session.

property session_id: str

Lazy-init UUID hex; works without calling super().__init__().

async set_context_tokens(input_tokens: int, output_tokens: int) None[source]

No-op by default; tokens are silently dropped.

async axio.compaction.compact_context(context: ContextStore, transport: CompletionTransport, *, keep_recent: int = 6, system_prompt: str = 'You are a conversation summarizer. You will see a conversation between a user and an AI assistant, including tool calls and their results. Produce a concise summary preserving: user goals, decisions made, key facts, tool outcomes, and state changes. Write as narrative prose, not as a transcript.') list[Message] | None[source]

Summarize old messages from context, keeping recent ones verbatim.

Returns a compacted message list ready to populate a fresh store, or None if the history is too short to compact (split <= 0).

The caller is responsible for deciding when to compact (e.g. by tracking usage.input_tokens from IterationEnd events).

Exception hierarchy for axio.

exception axio.exceptions.AxioError[source]

Bases: Exception

Base exception for all axio errors.

exception axio.exceptions.GuardError[source]

Bases: ToolError

Guard denied or crashed during permission check.

exception axio.exceptions.HandlerError[source]

Bases: ToolError

Handler raised during execution.

exception axio.exceptions.StreamError[source]

Bases: AxioError

Error during stream collection.

exception axio.exceptions.ToolError[source]

Bases: AxioError

Base for tool-related errors.

Shared test helpers: StubTransport, fixtures, response builders.

class axio.testing.StubTransport(responses: list[list[StreamEvent]] | None = None)[source]

Bases: object

A CompletionTransport that yields pre-configured event sequences.

Each call to stream() pops the next sequence from the list.

stream(messages: list[Message], tools: list[Tool[Any]], system: str) AsyncIterator[StreamEvent][source]
axio.testing.make_echo_tool() Tool[Any][source]
axio.testing.make_ephemeral_context() MemoryContextStore[source]
axio.testing.make_stub_transport() StubTransport[source]
axio.testing.make_text_response(text: str = 'Done', iteration: int = 2, usage: Usage | None = None) list[StreamEvent][source]

Build a standard end_turn text response event sequence.

axio.testing.make_tool_use_response(tool_name: str = 'echo', tool_id: str = 'call_1', tool_input: dict[str, Any] | None = None, iteration: int = 1, usage: Usage | None = None) list[StreamEvent][source]

Build a standard tool_use response event sequence.