Skip to content

Storage

Storage is the authoritative persistence layer for tasks, artifacts, and messages. It handles CRUD operations and enforces data-integrity constraints.

Storage ABC

a2akit.storage.base.Storage

Bases: ABC, Generic[ContextT]

Abstract storage interface for A2A tasks.

Storage handles CRUD and data-integrity constraints (terminal-state guard, optimistic concurrency). Business rules (role enforcement, state-machine transitions, context matching) live in :class:TaskManager.

Subclasses MUST implement 3 abstract methods

load_task, create_task, update_task

Optional with sensible defaults

list_tasks, delete_task, delete_context, get_version, load_context, update_context

Consistency requirement: Implementations MUST provide read-your-writes consistency. A load_task() call following update_task() or create_task() on the same connection MUST reflect the preceding write. For database backends with read replicas, this means reading from the primary after writes.

load_task(task_id, history_length=None, *, include_artifacts=True) abstractmethod async

create_task(context_id, message, *, idempotency_key=None) abstractmethod async

Create a brand-new task from an initial message.

Message contract: Implementations MUST NOT mutate the input message object. Use message.model_copy(update=...) to create a copy with task_id and context_id set for storage. The caller (TaskManager) is responsible for binding these fields on the original message after this method returns.

If idempotency_key is provided and a task with the same key and context_id already exists, return the existing task instead of creating a duplicate. The key is scoped per context to avoid a global unique index on large tables.

Atomicity requirement: DB backends MUST implement idempotency as an atomic operation. The recommended pattern is a UNIQUE constraint on (context_id, idempotency_key) combined with INSERT ... ON CONFLICT DO NOTHING RETURNING (PostgreSQL) or equivalent. A SELECT-then-INSERT pattern is NOT sufficient because it has a TOCTOU race under concurrent requests.

InMemoryStorage uses an O(N) scan which is acceptable for development but MUST NOT be used as a template for production backends.

update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None) abstractmethod async

Persist state change, artifacts, and messages atomically.

Business rules (role enforcement, context mismatch) are handled by :class:TaskManager. Data-integrity constraints (terminal guard, OCC) are enforced here.

Message binding contract: All Message objects in messages MUST have task_id and context_id set by the caller before this method is called. Storage backends MUST NOT be responsible for filling these fields.

When state is None the current state MUST be preserved (keep-current semantics) — useful for pure artifact or message appends without a state transition.

When status_message is provided alongside a state transition, it is stored in task.status.message so that polling clients (tasks/get, blocking message/send) see the agent's message in the status object (A2A §9.4). Ignored when state is None.

Each :class:ArtifactWrite carries its own append flag so that callers can mix append and replace operations in a single call (e.g. append to artifact A while replacing artifact B).

When task_metadata is provided, its key-value pairs are merged into the task's metadata dict.

When expected_version is provided and doesn't match the stored version, raise a :class:ConcurrencyError. All backends (including InMemory) MUST check this parameter. DB backends should implement this as UPDATE ... WHERE id = ? AND version = ?.

Terminal-state guard: Implementations MUST reject state transitions on tasks that are already in a terminal state (completed, canceled, failed, rejected) by raising :class:TaskTerminalStateError. This prevents concurrent writers from corrupting terminal states (e.g. force-cancel and worker completing simultaneously). Pure message or artifact appends without a state transition (state=None) are not affected by this guard.

Implementations MUST ensure that all changes are applied as a single atomic operation. If any part fails, no changes must be visible. For database backends this means a single transaction.

Return value: The new version number after the write. All backends (including InMemory) MUST return an int so callers can use it for subsequent optimistic-concurrency writes. Use load_task() for reading back complete task state.

list_tasks(query) async

Return filtered and paginated tasks.

Default implementation returns empty results. Override in backends that support listing.

delete_task(task_id) async

Delete a task by ID. Returns True if the task existed.

delete_context(context_id) async

Delete all tasks in a context. Returns the number of deleted tasks.

get_version(task_id) async

Return current optimistic-concurrency version for a task.

Returns None when the backend does not support versioning or when task_id does not exist. Default implementation returns None.

load_context(context_id) async

Load stored context for a context_id. Returns None if not found.

Default implementation returns None (no context storage).

update_context(context_id, context) async

Store context for a context_id.

Default implementation is a no-op.

Required Methods

Subclasses MUST implement these three methods:

load_task(task_id, history_length=None, *, include_artifacts=True) -> Task | None

Load a task by ID. Returns None if not found. history_length limits the number of history messages returned.

create_task(context_id, message, *, idempotency_key=None) -> Task

Create a new task from an initial message. When idempotency_key is provided with a matching context_id, returns the existing task instead of creating a duplicate.

update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None) -> int

Persist state changes, artifacts, and messages atomically. Returns the new version number.

Optional Methods

These have sensible defaults but can be overridden:

Method Default Description
list_tasks(query) Returns empty Filtered and paginated task listing
delete_task(task_id) NotImplementedError Delete a task
delete_context(context_id) NotImplementedError Delete all tasks in a context
get_version(task_id) None Current OCC version
load_context(context_id) None Load stored conversation context
update_context(context_id, context) No-op Store conversation context

InMemoryStorage

The default storage backend for development. Stores everything in Python dictionaries.

from a2akit import A2AServer, InMemoryStorage

server = A2AServer(
    worker=MyWorker(),
    agent_card=AgentCardConfig(...),
    storage="memory",  # equivalent to InMemoryStorage()
)

Not for production

InMemoryStorage loses all data on restart and doesn't support multi-process deployments. Use PostgreSQL or SQLite for production.

PostgreSQLStorage

Persistent storage using PostgreSQL via SQLAlchemy + asyncpg.

pip install a2akit[postgres]
server = A2AServer(
    worker=MyWorker(),
    agent_card=AgentCardConfig(...),
    storage="postgresql+asyncpg://user:pass@localhost/mydb",
)

SQLiteStorage

Persistent storage using SQLite via SQLAlchemy + aiosqlite.

pip install a2akit[sqlite]
server = A2AServer(
    worker=MyWorker(),
    agent_card=AgentCardConfig(...),
    storage="sqlite+aiosqlite:///tasks.db",
)

ArtifactWrite

Per-artifact write descriptor with individual append semantics.

@dataclass(frozen=True)
class ArtifactWrite:
    artifact: Artifact
    append: bool = False

Each ArtifactWrite carries its own append flag, allowing callers to mix append and replace operations in a single update_task call.

Data Integrity

Terminal-State Guard

Storage MUST reject state transitions on tasks in terminal states (completed, canceled, failed, rejected) by raising TaskTerminalStateError. This prevents concurrent writers from corrupting terminal states.

Optimistic Concurrency Control

When expected_version is provided to update_task, Storage MUST verify it matches the stored version. On mismatch, raise ConcurrencyError.

Consistency Requirement

Implementations MUST provide read-your-writes consistency. A load_task() call following update_task() on the same connection MUST reflect the preceding write.

Exceptions

Exception Description
TaskNotFoundError Referenced task does not exist
TaskTerminalStateError Operation attempted on a terminal task
TaskNotAcceptingMessagesError Task not in a state that accepts user input
TaskNotCancelableError Cancel attempted on a terminal task
ContextMismatchError Message contextId doesn't match task's contextId
ConcurrencyError Expected version doesn't match stored version
UnsupportedOperationError Operation not supported for current task state

ListTasksQuery

class ListTasksQuery(BaseModel):
    context_id: str | None = None
    status: TaskState | None = None
    page_size: int = 50  # 1-100
    page_token: str | None = None
    history_length: int | None = None
    status_timestamp_after: str | None = None
    include_artifacts: bool = False

ListTasksResult

class ListTasksResult(BaseModel):
    tasks: list[Task] = []
    next_page_token: str = ""
    page_size: int = 50
    total_size: int = 0