Storage¶
Storage is the authoritative persistence layer for tasks, artifacts, and messages. It handles CRUD operations and enforces data-integrity constraints.
Storage ABC¶
a2akit.storage.base.Storage
¶
Bases: ABC, Generic[ContextT]
Abstract storage interface for A2A tasks.
Storage handles CRUD and data-integrity constraints (terminal-state
guard, optimistic concurrency). Business rules (role enforcement,
state-machine transitions, context matching) live in
:class:TaskManager.
Subclasses MUST implement 3 abstract methods
load_task, create_task, update_task
Optional with sensible defaults
list_tasks, delete_task, delete_context, get_version, load_context, update_context
Consistency requirement: Implementations MUST provide
read-your-writes consistency. A load_task() call following
update_task() or create_task() on the same connection
MUST reflect the preceding write. For database backends with
read replicas, this means reading from the primary after writes.
load_task(task_id, history_length=None, *, include_artifacts=True)
abstractmethod
async
¶
create_task(context_id, message, *, idempotency_key=None)
abstractmethod
async
¶
Create a brand-new task from an initial message.
Message contract: Implementations MUST NOT mutate the input
message object. Use message.model_copy(update=...) to
create a copy with task_id and context_id set for
storage. The caller (TaskManager) is responsible for binding
these fields on the original message after this method returns.
If idempotency_key is provided and a task with the same
key and context_id already exists, return the existing
task instead of creating a duplicate. The key is scoped per
context to avoid a global unique index on large tables.
Atomicity requirement: DB backends MUST implement idempotency
as an atomic operation. The recommended pattern is a UNIQUE
constraint on (context_id, idempotency_key) combined with
INSERT ... ON CONFLICT DO NOTHING RETURNING (PostgreSQL)
or equivalent. A SELECT-then-INSERT pattern is NOT sufficient
because it has a TOCTOU race under concurrent requests.
InMemoryStorage uses an O(N) scan which is acceptable for development but MUST NOT be used as a template for production backends.
update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None)
abstractmethod
async
¶
Persist state change, artifacts, and messages atomically.
Business rules (role enforcement, context mismatch) are
handled by :class:TaskManager. Data-integrity constraints
(terminal guard, OCC) are enforced here.
Message binding contract: All Message objects in
messages MUST have task_id and context_id set by
the caller before this method is called. Storage backends
MUST NOT be responsible for filling these fields.
When state is None the current state MUST be preserved
(keep-current semantics) — useful for pure artifact or message
appends without a state transition.
When status_message is provided alongside a state
transition, it is stored in task.status.message so that
polling clients (tasks/get, blocking message/send)
see the agent's message in the status object (A2A §9.4).
Ignored when state is None.
Each :class:ArtifactWrite carries its own append flag so
that callers can mix append and replace operations in a single
call (e.g. append to artifact A while replacing artifact B).
When task_metadata is provided, its key-value pairs are
merged into the task's metadata dict.
When expected_version is provided and doesn't match the
stored version, raise a :class:ConcurrencyError. All
backends (including InMemory) MUST check this parameter.
DB backends should implement this as
UPDATE ... WHERE id = ? AND version = ?.
Terminal-state guard: Implementations MUST reject state
transitions on tasks that are already in a terminal state
(completed, canceled, failed, rejected) by raising
:class:TaskTerminalStateError. This prevents concurrent
writers from corrupting terminal states (e.g. force-cancel
and worker completing simultaneously). Pure message or
artifact appends without a state transition (state=None)
are not affected by this guard.
Implementations MUST ensure that all changes are applied as a single atomic operation. If any part fails, no changes must be visible. For database backends this means a single transaction.
Return value: The new version number after the write.
All backends (including InMemory) MUST return an int
so callers can use it for subsequent optimistic-concurrency
writes. Use load_task() for reading back complete
task state.
list_tasks(query)
async
¶
Return filtered and paginated tasks.
Default implementation returns empty results. Override in backends that support listing.
delete_task(task_id)
async
¶
Delete a task by ID. Returns True if the task existed.
delete_context(context_id)
async
¶
Delete all tasks in a context. Returns the number of deleted tasks.
get_version(task_id)
async
¶
Return current optimistic-concurrency version for a task.
Returns None when the backend does not support versioning
or when task_id does not exist. Default implementation
returns None.
load_context(context_id)
async
¶
Load stored context for a context_id. Returns None if not found.
Default implementation returns None (no context storage).
update_context(context_id, context)
async
¶
Store context for a context_id.
Default implementation is a no-op.
Required Methods¶
Subclasses MUST implement these three methods:
load_task(task_id, history_length=None, *, include_artifacts=True) -> Task | None¶
Load a task by ID. Returns None if not found. history_length limits the number of history messages returned.
create_task(context_id, message, *, idempotency_key=None) -> Task¶
Create a new task from an initial message. When idempotency_key is provided with a matching context_id, returns the existing task instead of creating a duplicate.
update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None) -> int¶
Persist state changes, artifacts, and messages atomically. Returns the new version number.
Optional Methods¶
These have sensible defaults but can be overridden:
| Method | Default | Description |
|---|---|---|
list_tasks(query) |
Returns empty | Filtered and paginated task listing |
delete_task(task_id) |
NotImplementedError |
Delete a task |
delete_context(context_id) |
NotImplementedError |
Delete all tasks in a context |
get_version(task_id) |
None |
Current OCC version |
load_context(context_id) |
None |
Load stored conversation context |
update_context(context_id, context) |
No-op | Store conversation context |
InMemoryStorage¶
The default storage backend for development. Stores everything in Python dictionaries.
from a2akit import A2AServer, InMemoryStorage
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
storage="memory", # equivalent to InMemoryStorage()
)
Not for production
InMemoryStorage loses all data on restart and doesn't support multi-process deployments. Use PostgreSQL or SQLite for production.
PostgreSQLStorage¶
Persistent storage using PostgreSQL via SQLAlchemy + asyncpg.
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
storage="postgresql+asyncpg://user:pass@localhost/mydb",
)
SQLiteStorage¶
Persistent storage using SQLite via SQLAlchemy + aiosqlite.
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
storage="sqlite+aiosqlite:///tasks.db",
)
ArtifactWrite¶
Per-artifact write descriptor with individual append semantics.
Each ArtifactWrite carries its own append flag, allowing callers to mix append and replace operations in a single update_task call.
Data Integrity¶
Terminal-State Guard¶
Storage MUST reject state transitions on tasks in terminal states (completed, canceled, failed, rejected) by raising TaskTerminalStateError. This prevents concurrent writers from corrupting terminal states.
Optimistic Concurrency Control¶
When expected_version is provided to update_task, Storage MUST verify it matches the stored version. On mismatch, raise ConcurrencyError.
Consistency Requirement¶
Implementations MUST provide read-your-writes consistency. A load_task() call following update_task() on the same connection MUST reflect the preceding write.
Exceptions¶
| Exception | Description |
|---|---|
TaskNotFoundError |
Referenced task does not exist |
TaskTerminalStateError |
Operation attempted on a terminal task |
TaskNotAcceptingMessagesError |
Task not in a state that accepts user input |
TaskNotCancelableError |
Cancel attempted on a terminal task |
ContextMismatchError |
Message contextId doesn't match task's contextId |
ConcurrencyError |
Expected version doesn't match stored version |
UnsupportedOperationError |
Operation not supported for current task state |
ListTasksQuery¶
class ListTasksQuery(BaseModel):
context_id: str | None = None
status: TaskState | None = None
page_size: int = 50 # 1-100
page_token: str | None = None
history_length: int | None = None
status_timestamp_after: str | None = None
include_artifacts: bool = False