Source code for axio.compaction

"""Context compaction: summarise old history to stay within token limits."""

from __future__ import annotations

import logging

from .agent import Agent
from .blocks import TextBlock, ToolResultBlock
from .context import ContextStore, MemoryContextStore, SessionInfo
from .messages import Message
from .transport import CompletionTransport

logger = logging.getLogger(__name__)


DEFAULT_COMPACTION_PROMPT = (
    "You are a conversation summarizer. You will see a conversation between"
    " a user and an AI assistant, including tool calls and their results."
    " Produce a concise summary preserving: user goals, decisions made,"
    " key facts, tool outcomes, and state changes. Write as narrative prose,"
    " not as a transcript."
)


[docs] async def compact_context( context: ContextStore, transport: CompletionTransport, *, keep_recent: int = 6, system_prompt: str = DEFAULT_COMPACTION_PROMPT, ) -> list[Message] | None: """Summarize old messages from *context*, keeping recent ones verbatim. Returns a compacted message list ready to populate a fresh store, or ``None`` if the history is too short to compact (split <= 0). The caller is responsible for deciding *when* to compact (e.g. by tracking ``usage.input_tokens`` from ``IterationEnd`` events). """ history = await context.get_history() split = _find_safe_boundary(history, keep_recent) if split <= 0: return None old, recent = history[:split], history[split:] summary_ctx = MemoryContextStore(old) agent = Agent(system=system_prompt, transport=transport, max_iterations=1) try: summary = await agent.run("Summarize the conversation above.", summary_ctx) except Exception: logger.warning("Context compaction failed, keeping original history", exc_info=True) return None return [ Message(role="user", content=[TextBlock(text=summary)]), Message(role="assistant", content=[TextBlock(text="Understood, context restored.")]), *recent, ]
def _find_safe_boundary(history: list[Message], keep_recent: int) -> int: """Return a split index that never separates a tool_use from its tool_result.""" split = len(history) - keep_recent while split > 0 and any(isinstance(b, ToolResultBlock) for b in history[split].content): split -= 1 return split
[docs] class AutoCompactStore(ContextStore): """Delegating ``ContextStore`` wrapper that auto-compacts the inner store when token usage exceeds a threshold. Works with any ``ContextStore`` backend - ``MemoryContextStore``, ``SQLiteContextStore``, etc. The threshold defaults to 75 % of ``transport.model.context_window`` (read via ``getattr``; falls back to 128 000 if the transport has no ``model`` attribute). Pass ``max_tokens`` explicitly to override. Compaction fires from :meth:`add_context_tokens`, which the agent loop calls immediately after ``IterationEnd`` - ``input_tokens`` there equals the real context size sent to the model in that iteration. Internally, :meth:`_do_compact` forks the inner store before calling ``compact_context``. The fork acts as a stable snapshot: the summarisation agent reads from it while the live store remains writable. The live store is only cleared and repopulated after the (async) summary call returns. Example:: from axio.compaction import AutoCompactStore from axio.context import MemoryContextStore store = AutoCompactStore(MemoryContextStore(), transport, keep_recent=6) result = await agent.run(task, store) """ def __init__( self, store: ContextStore, transport: CompletionTransport, *, keep_recent: int = 6, max_tokens: int | None = None, threshold: float = 0.75, ) -> None: self._store = store self._transport = transport self._keep_recent = keep_recent self._threshold = threshold if max_tokens is not None: self._max_tokens = max_tokens else: model = getattr(transport, "model", None) ctx_win: int = getattr(model, "context_window", 128_000) if model is not None else 128_000 self._max_tokens = int(ctx_win * threshold) @property def session_id(self) -> str: return self._store.session_id
[docs] async def append(self, message: Message) -> None: await self._store.append(message)
[docs] async def get_history(self) -> list[Message]: return await self._store.get_history()
[docs] async def clear(self) -> None: await self._store.clear()
[docs] async def fork(self) -> AutoCompactStore: """Return an ``AutoCompactStore`` wrapping a fork of the inner store.""" return AutoCompactStore( await self._store.fork(), self._transport, keep_recent=self._keep_recent, max_tokens=self._max_tokens, threshold=self._threshold, )
[docs] async def set_context_tokens(self, input_tokens: int, output_tokens: int) -> None: await self._store.set_context_tokens(input_tokens, output_tokens)
[docs] async def get_context_tokens(self) -> tuple[int, int]: return await self._store.get_context_tokens()
[docs] async def add_context_tokens(self, input_tokens: int, output_tokens: int) -> None: await self._store.add_context_tokens(input_tokens, output_tokens) if input_tokens > self._max_tokens: await self._do_compact()
[docs] async def close(self) -> None: await self._store.close()
[docs] async def list_sessions(self) -> list[SessionInfo]: return await self._store.list_sessions()
async def _do_compact(self) -> None: snapshot = await self._store.fork() compacted = await compact_context(snapshot, self._transport, keep_recent=self._keep_recent) if compacted is None: return in_tok, out_tok = await self._store.get_context_tokens() await self._store.clear() for msg in compacted: await self._store.append(msg) await self._store.set_context_tokens(in_tok, out_tok)