Broker¶
The Broker is the task queue that schedules and delivers run operations to the WorkerAdapter.
Broker ABC¶
a2akit.broker.base.Broker
¶
Bases: ABC
Abstract broker for task scheduling.
run_task(params, *, is_new_task=False, request_context=None)
abstractmethod
async
¶
shutdown()
abstractmethod
async
¶
Signal the broker to stop receiving operations.
receive_task_operations() should terminate gracefully after
this is called. Implementations should close connections and
release resources cleanly.
receive_task_operations()
abstractmethod
¶
Yield task operations from the queue.
This is an async generator — implementations yield OperationHandle instances directly:
async def receive_task_operations(self):
async for msg in self._queue:
yield InMemoryOperationHandle(msg)
The generator runs indefinitely until the broker is shut down.
Connection lifecycle (connect, channel setup, teardown) is managed
by the Broker's __aenter__/__aexit__.
Task-level serialization: Implementations for distributed
deployments MUST ensure that at most one operation per task_id
is in processing at any time. Mechanisms:
- RabbitMQ: Consistent Hash Exchange on task_id → single consumer per partition
- Redis Streams: Consumer Group with XREADGROUP, claim-tracking per task_id
- SQS: Message Group ID = task_id (FIFO Queue)
InMemoryBroker is exempt from this requirement (single-process).
If a backend uses at-least-once delivery, it MUST ensure that
re-deliveries only occur after the previous delivery for the same
task_id has been acknowledged (ack) or rejected (nack).
run_task(params, *, is_new_task=False, request_context=None)¶
Enqueue a task for execution. Called by TaskManager after submission.
shutdown()¶
Signal the broker to stop receiving operations. receive_task_operations() should terminate gracefully after this is called.
receive_task_operations()¶
Async generator that yields OperationHandle instances from the queue. Runs indefinitely until the broker is shut down.
Task-level serialization: Implementations for distributed deployments MUST ensure that at most one operation per task_id is in processing at any time.
InMemoryBroker¶
The default broker for development. Uses an asyncio.Queue.
from a2akit import A2AServer
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
broker="memory", # default
)
OperationHandle¶
Handle for acknowledging or rejecting a broker operation.
a2akit.broker.base.OperationHandle
¶
Bases: ABC
Handle for acknowledging or rejecting a broker operation.
operation
abstractmethod
property
¶
Return the wrapped operation.
attempt
abstractmethod
property
¶
Delivery attempt number (1-based).
InMemory always returns 1. Backends with retry tracking (e.g. RabbitMQ, Redis) report the actual delivery count.
ack()
abstractmethod
async
¶
Acknowledge successful processing.
nack(*, delay_seconds=0)
abstractmethod
async
¶
Reject — return operation to queue for retry.
delay_seconds is a hint for backends that support delayed
re-delivery (e.g. RabbitMQ dead-letter TTL, Redis delayed queue).
InMemory ignores it and re-enqueues immediately.
| Property/Method | Description |
|---|---|
operation |
The wrapped TaskOperation |
attempt |
Delivery attempt number (1-based) |
ack() |
Acknowledge successful processing |
nack(*, delay_seconds=0) |
Reject — return to queue for retry |
CancelRegistry¶
Registry for task cancellation signals.
a2akit.broker.base.CancelRegistry
¶
Bases: ABC
Registry for task cancellation signals.
request_cancel(task_id)
abstractmethod
async
¶
Signal cancellation for a task.
is_cancelled(task_id)
abstractmethod
async
¶
Check if cancellation was requested.
on_cancel(task_id)
abstractmethod
¶
Return a scope that signals when cancellation is requested.
cleanup(task_id)
abstractmethod
async
¶
Release resources for a completed task.
MUST be idempotent. Multiple calls with the same task_id MUST NOT raise and MUST NOT affect resources for other tasks.
Called by WorkerAdapter at the end of normal task processing and by TaskManager._force_cancel_after when the worker does not cooperate within the cancel timeout. No other component may call this method.
Backends MUST implement this to avoid resource leaks (e.g. Redis key cleanup).
| Method | Description |
|---|---|
request_cancel(task_id) |
Signal cancellation for a task |
is_cancelled(task_id) |
Check if cancellation was requested |
on_cancel(task_id) |
Return a CancelScope for cooperative checks |
cleanup(task_id) |
Release resources (must be idempotent) |
CancelScope¶
Backend-agnostic cancellation handle.
class CancelScope(ABC):
async def wait(self) -> None: ... # Block until cancelled
def is_set(self) -> bool: ... # Non-blocking check
Retry Semantics¶
The WorkerAdapter uses OperationHandle.attempt to decide between retry and terminal failure:
- If
attempt < max_retries: callnack(delay_seconds=...)for exponential back-off - If
attempt >= max_retries:ack()+ mark task as failed
The InMemoryBroker always returns attempt=1. Queue backends with retry tracking (RabbitMQ, Redis) report the actual delivery count.