Writing Context Stores¶
A context store holds the conversation history for an agent session. Axio
ships two built-in implementations - MemoryContextStore (in-memory,
ephemeral) and SQLiteContextStore (persistent, file-backed) - that cover
most single-process use cases. This guide explains when you need something
different and how to implement it.
For a full description of the API and the built-in stores see Context & Messages.
When to write a custom store¶
Consider a custom context store when:
Shared state across processes. Multiple agent workers need to read and write the same session (e.g. Redis, Memcached, or a database accessed over the network).
Durable, queryable history. You want to keep conversation history in PostgreSQL alongside other application data, or you need full-text search over past sessions.
Existing infrastructure. Your application already has a message store (a chat service, a ticket system) and you want the agent to read and write it directly.
Custom retention or compaction policies. You need to cap history at N tokens, archive old messages to cold storage, or apply per-tenant data residency rules at the storage layer.
If none of the above applies, MemoryContextStore or SQLiteContextStore
are the right choices.
The contract¶
ContextStore is an abstract base class defined in axio.context. Only two
methods are abstract and must be provided:
Method |
Guarantee |
|---|---|
|
Append one |
|
Return a list of all messages in insertion order. Must not mutate the store’s internal state. |
Every other method has a default implementation on ContextStore that is
correct for simple cases, but may need overriding for production backends:
Method |
Default behaviour |
|---|---|
|
Lazy UUID hex property; generated once per instance. |
|
Raises |
|
Deep-copies the history into a fresh |
|
No-op. |
|
No-op; tokens are silently dropped. |
|
Returns |
|
Calls |
|
Returns a single |
Minimal implementation¶
The simplest possible custom store wraps an in-memory list. This is
functionally identical to MemoryContextStore, but it is a useful starting
point to demonstrate the required interface before moving to a remote backend.
import asyncio
from axio import ContextStore
from axio.messages import Message
from axio.blocks import TextBlock
class MinimalStore(ContextStore):
"""Minimal in-process context store."""
def __init__(self) -> None:
self._messages: list[Message] = []
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
# Return a copy so callers can't mutate internal state.
return list(self._messages)
async def main():
store = MinimalStore()
assert store.session_id # lazy UUID hex, no __init__ call needed
await store.append(Message(role="user", content=[TextBlock(text="Hello")]))
await store.append(Message(role="assistant", content=[TextBlock(text="Hi!")]))
history = await store.get_history()
assert len(history) == 2
assert history[0].role == "user"
assert history[1].role == "assistant"
asyncio.run(main())
Key rules:
Return a copy of the internal list from
get_history(), not a reference. Callers (including the agent loop) may iterate and modify the list independently.The
session_idproperty is provided by the base class - you do not need to set it if you do not callsuper().__init__(). It is lazily initialised the first time it is accessed.
Overriding lifecycle methods¶
clear()¶
The base class raises NotImplementedError. Override it if your backend
supports clearing a session:
import asyncio
from axio import ContextStore
from axio.messages import Message
from axio.blocks import TextBlock
class ClearableStore(ContextStore):
def __init__(self) -> None:
self._messages: list[Message] = []
self._input_tokens: int = 0
self._output_tokens: int = 0
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def clear(self) -> None:
"""Remove all messages and reset token counts."""
self._messages.clear()
self._input_tokens = 0
self._output_tokens = 0
async def main():
store = ClearableStore()
await store.append(Message(role="user", content=[TextBlock(text="Hello")]))
assert len(await store.get_history()) == 1
await store.clear()
assert len(await store.get_history()) == 0
asyncio.run(main())
fork()¶
The default fork() deep-copies the history into a new MemoryContextStore.
That is sufficient for most in-process scenarios, but override it when:
You want the forked session to also be persisted (e.g.
SQLiteContextStoredoes a SQLINSERT … SELECTto create a new session row).The deep-copy is expensive (large history) and you prefer copy-on-write or a pointer/reference to a snapshot.
import asyncio
import copy
from axio import ContextStore
from axio.messages import Message
from axio.blocks import TextBlock
class ForkableStore(ContextStore):
def __init__(self, messages: list[Message] | None = None) -> None:
self._messages: list[Message] = list(messages or [])
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def fork(self) -> "ForkableStore":
"""Return an independent copy of this session."""
return ForkableStore(copy.deepcopy(self._messages))
async def main():
store = ForkableStore()
await store.append(Message(role="user", content=[TextBlock(text="Hello")]))
forked = await store.fork()
await forked.append(Message(role="assistant", content=[TextBlock(text="Hi!")]))
# Original is unchanged.
assert len(await store.get_history()) == 1
assert len(await forked.get_history()) == 2
asyncio.run(main())
close()¶
Override close() when your store holds resources that must be released
explicitly: database connections, file handles, network sockets. The default
is a no-op.
async def close(self) -> None:
await self._conn.close()
The Agent does not call close() automatically - the caller that
creates the store is responsible for closing it, typically with
try / finally or an asynccontextmanager.
Token tracking¶
The agent calls add_context_tokens(input_tokens, output_tokens) after every
LLM iteration to accumulate usage data. The base class’s default
implementation delegates to get_context_tokens() and set_context_tokens(),
both of which are no-ops - so tokens are silently discarded unless you
override at least set_context_tokens and get_context_tokens.
Override both methods together:
import asyncio
from axio import ContextStore
from axio.messages import Message
class TokenTrackingStore(ContextStore):
def __init__(self) -> None:
self._messages: list[Message] = []
self._input_tokens: int = 0
self._output_tokens: int = 0
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def set_context_tokens(self, input_tokens: int, output_tokens: int) -> None:
self._input_tokens = input_tokens
self._output_tokens = output_tokens
async def get_context_tokens(self) -> tuple[int, int]:
return self._input_tokens, self._output_tokens
async def main():
store = TokenTrackingStore()
# The agent calls add_context_tokens after each iteration.
await store.add_context_tokens(100, 50)
await store.add_context_tokens(200, 75)
in_tok, out_tok = await store.get_context_tokens()
assert in_tok == 300
assert out_tok == 125
asyncio.run(main())
Method summary:
set_context_tokens(input_tokens, output_tokens)Overwrite the stored counts with the given values. No-op in the base class.
get_context_tokens() -> tuple[int, int]Return
(input_tokens, output_tokens). Returns(0, 0)in the base class.add_context_tokens(input_tokens, output_tokens)Increment the stored counts. The base class reads the current value via
get_context_tokens()and writes the sum viaset_context_tokens(). You can override this directly for backends (likeSQLiteContextStore) that support atomic increment in a single query.
Registering a context store¶
Context stores are not discovered through entry points - they are ordinary
Python classes that you instantiate yourself and pass to the Agent.
Passing to Agent¶
Agent.run() and Agent.run_stream() accept any ContextStore instance as
their second argument:
import asyncio
from axio import Agent
store = MinimalStore()
agent = Agent(
system="You are a helpful assistant.",
transport=transport,
)
async def main():
result = await agent.run("Hello", store)
assert result == "ok"
asyncio.run(main())
A single store instance represents one session. Pass a new store (or call
fork()) to start a fresh session while keeping the agent configuration.
Using with SQLiteContextStore¶
SQLiteContextStore takes a connection created by axio_context_sqlite.connect
and can share that connection across many session instances:
import asyncio
from axio_context_sqlite import SQLiteContextStore, connect
async def main():
conn = await connect("./my_db.sqlite")
try:
store = SQLiteContextStore(conn, session_id="session-abc")
# pass store to Agent.run(...)
finally:
await conn.close()
asyncio.run(main())
The connection is owned by the caller; SQLiteContextStore.close() is a
no-op by design. Close the underlying aiosqlite.Connection explicitly when
you are done with the session or on application shutdown.
Testing¶
Testing the store directly¶
Test append and get_history first, then each lifecycle method you
override:
import asyncio
from axio import ContextStore
from axio.messages import Message
from axio.blocks import TextBlock
class SimpleStore(ContextStore):
def __init__(self) -> None:
self._messages: list[Message] = []
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def clear(self) -> None:
self._messages.clear()
async def test_append_and_history():
store = SimpleStore()
msg = Message(role="user", content=[TextBlock(text="hello")])
await store.append(msg)
history = await store.get_history()
assert len(history) == 1
assert history[0].role == "user"
async def test_get_history_returns_copy():
store = SimpleStore()
await store.append(Message(role="user", content=[TextBlock(text="hi")]))
h1 = await store.get_history()
h2 = await store.get_history()
# Mutations to the returned list do not affect the store.
h1.clear()
assert len(await store.get_history()) == 1
async def test_clear():
store = SimpleStore()
await store.append(Message(role="user", content=[TextBlock(text="hi")]))
await store.clear()
assert len(await store.get_history()) == 0
asyncio.run(test_append_and_history())
asyncio.run(test_get_history_returns_copy())
asyncio.run(test_clear())
Testing fork isolation¶
import asyncio
import copy
from axio import ContextStore
from axio.messages import Message
from axio.blocks import TextBlock
class ForkedStore(ContextStore):
def __init__(self, messages: list[Message] | None = None) -> None:
self._messages: list[Message] = list(messages or [])
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def fork(self) -> "ForkedStore":
return ForkedStore(copy.deepcopy(self._messages))
async def test_fork_isolation():
store = ForkedStore()
await store.append(Message(role="user", content=[TextBlock(text="original")]))
fork = await store.fork()
await fork.append(Message(role="assistant", content=[TextBlock(text="branched")]))
# Original is unchanged.
assert len(await store.get_history()) == 1
# Fork has the extra message.
assert len(await fork.get_history()) == 2
# Session IDs are independent.
assert store.session_id != fork.session_id
asyncio.run(test_fork_isolation())
Testing token tracking¶
import asyncio
from axio import ContextStore
from axio.messages import Message
class TokenStore(ContextStore):
def __init__(self) -> None:
self._messages: list[Message] = []
self._in: int = 0
self._out: int = 0
async def append(self, message: Message) -> None:
self._messages.append(message)
async def get_history(self) -> list[Message]:
return list(self._messages)
async def set_context_tokens(self, input_tokens: int, output_tokens: int) -> None:
self._in = input_tokens
self._out = output_tokens
async def get_context_tokens(self) -> tuple[int, int]:
return self._in, self._out
async def test_token_accumulation():
store = TokenStore()
await store.add_context_tokens(100, 40)
await store.add_context_tokens(50, 20)
in_tok, out_tok = await store.get_context_tokens()
assert in_tok == 150
assert out_tok == 60
asyncio.run(test_token_accumulation())
Testing with the agent using StubTransport¶
Use axio.testing.StubTransport to run an agent against your store without
making real LLM calls:
import asyncio
from axio import ContextStore, Agent
from axio.messages import Message
from axio.testing import StubTransport, make_text_response
class RecordingStore(ContextStore):
"""Store that records all messages appended to it."""
def __init__(self) -> None:
self._messages: list[Message] = []
self.append_count: int = 0
async def append(self, message: Message) -> None:
self._messages.append(message)
self.append_count += 1
async def get_history(self) -> list[Message]:
return list(self._messages)
async def test_agent_uses_store():
store = RecordingStore()
transport = StubTransport([make_text_response("Done!")])
agent = Agent(system="You are a test agent.", transport=transport)
result = await agent.run("Hello", store)
assert result == "Done!"
# Agent appends the user message and then the assistant reply.
assert store.append_count == 2
history = await store.get_history()
assert history[0].role == "user"
assert history[1].role == "assistant"
asyncio.run(test_agent_uses_store())
Tips¶
Always return a copy from
get_history(). The agent loop iterates the returned list and may pass it to the transport while appending proceeds in the background.Keep
append()atomic. If your backend supports transactions, commit insideappendso that a failure in a subsequent call does not leave the history in a partially-written state.For remote backends, consider whether
fork()should clone data in the backend (likeSQLiteContextStoredoes) or fall back to the default in-memory deep-copy. The default is safe but does not persist the fork.Override
close()whenever your store holds a connection or file handle. The agent does not call it - make the caller responsible, using atry / finallyblock or anasynccontextmanagerwrapper.