Skip to content

Broker

The Broker is the task queue that schedules and delivers run operations to the WorkerAdapter.

Broker ABC

a2akit.broker.base.Broker

Bases: ABC

Abstract broker for task scheduling.

run_task(params, *, is_new_task=False, request_context=None) abstractmethod async

shutdown() abstractmethod async

Signal the broker to stop receiving operations.

receive_task_operations() should terminate gracefully after this is called. Implementations should close connections and release resources cleanly.

receive_task_operations() abstractmethod

Yield task operations from the queue.

This is an async generator — implementations yield OperationHandle instances directly:

async def receive_task_operations(self):
    async for msg in self._queue:
        yield InMemoryOperationHandle(msg)

The generator runs indefinitely until the broker is shut down. Connection lifecycle (connect, channel setup, teardown) is managed by the Broker's __aenter__/__aexit__.

Task-level serialization: Implementations for distributed deployments MUST ensure that at most one operation per task_id is in processing at any time. Mechanisms:

  • RabbitMQ: Consistent Hash Exchange on task_id → single consumer per partition
  • Redis Streams: Consumer Group with XREADGROUP, claim-tracking per task_id
  • SQS: Message Group ID = task_id (FIFO Queue)

InMemoryBroker is exempt from this requirement (single-process).

If a backend uses at-least-once delivery, it MUST ensure that re-deliveries only occur after the previous delivery for the same task_id has been acknowledged (ack) or rejected (nack).

run_task(params, *, is_new_task=False, request_context=None)

Enqueue a task for execution. Called by TaskManager after submission.

shutdown()

Signal the broker to stop receiving operations. receive_task_operations() should terminate gracefully after this is called.

receive_task_operations()

Async generator that yields OperationHandle instances from the queue. Runs indefinitely until the broker is shut down.

Task-level serialization: Implementations for distributed deployments MUST ensure that at most one operation per task_id is in processing at any time.

InMemoryBroker

The default broker for development. Uses an asyncio.Queue.

from a2akit import A2AServer

server = A2AServer(
    worker=MyWorker(),
    agent_card=AgentCardConfig(...),
    broker="memory",  # default
)

OperationHandle

Handle for acknowledging or rejecting a broker operation.

a2akit.broker.base.OperationHandle

Bases: ABC

Handle for acknowledging or rejecting a broker operation.

operation abstractmethod property

Return the wrapped operation.

attempt abstractmethod property

Delivery attempt number (1-based).

InMemory always returns 1. Backends with retry tracking (e.g. RabbitMQ, Redis) report the actual delivery count.

ack() abstractmethod async

Acknowledge successful processing.

nack(*, delay_seconds=0) abstractmethod async

Reject — return operation to queue for retry.

delay_seconds is a hint for backends that support delayed re-delivery (e.g. RabbitMQ dead-letter TTL, Redis delayed queue). InMemory ignores it and re-enqueues immediately.

Property/Method Description
operation The wrapped TaskOperation
attempt Delivery attempt number (1-based)
ack() Acknowledge successful processing
nack(*, delay_seconds=0) Reject — return to queue for retry

CancelRegistry

Registry for task cancellation signals.

a2akit.broker.base.CancelRegistry

Bases: ABC

Registry for task cancellation signals.

request_cancel(task_id) abstractmethod async

Signal cancellation for a task.

is_cancelled(task_id) abstractmethod async

Check if cancellation was requested.

on_cancel(task_id) abstractmethod

Return a scope that signals when cancellation is requested.

cleanup(task_id) abstractmethod async

Release resources for a completed task.

MUST be idempotent. Multiple calls with the same task_id MUST NOT raise and MUST NOT affect resources for other tasks.

Called by WorkerAdapter at the end of normal task processing and by TaskManager._force_cancel_after when the worker does not cooperate within the cancel timeout. No other component may call this method.

Backends MUST implement this to avoid resource leaks (e.g. Redis key cleanup).

Method Description
request_cancel(task_id) Signal cancellation for a task
is_cancelled(task_id) Check if cancellation was requested
on_cancel(task_id) Return a CancelScope for cooperative checks
cleanup(task_id) Release resources (must be idempotent)

CancelScope

Backend-agnostic cancellation handle.

class CancelScope(ABC):
    async def wait(self) -> None: ...   # Block until cancelled
    def is_set(self) -> bool: ...       # Non-blocking check

Retry Semantics

The WorkerAdapter uses OperationHandle.attempt to decide between retry and terminal failure:

  • If attempt < max_retries: call nack(delay_seconds=...) for exponential back-off
  • If attempt >= max_retries: ack() + mark task as failed

The InMemoryBroker always returns attempt=1. Queue backends with retry tracking (RabbitMQ, Redis) report the actual delivery count.