Skip to content

EventBus

The EventBus provides 1:N event fan-out for streaming task events to SSE subscribers.

EventBus ABC

a2akit.event_bus.base.EventBus

Bases: ABC

Abstract event bus for broadcasting stream events to subscribers.

publish(task_id, event) abstractmethod async

Publish a stream event to all subscribers of a task.

Returns an event ID if the backend supports it (e.g. Redis Streams, Kafka), or None for backends that don't assign IDs (e.g. InMemory).

Implementations MUST deliver events in the order they were published for a given task_id.

subscribe(task_id, *, after_event_id=None) abstractmethod

Subscribe to stream events for a task.

MUST be used as an async context manager:

async with event_bus.subscribe(task_id) as stream:
    async for event in stream:
        ...

The context manager guarantees cleanup of subscriber resources (e.g. Redis UNSUBSCRIBE, RabbitMQ consumer de-registration) regardless of how the consumer exits (normal, exception, cancel).

When after_event_id is provided, backends that support replay (e.g. Redis Streams) deliver events published after that ID. InMemory ignores this parameter.

Implementations MUST yield events in the order they were published (per A2A spec §3.5.2).

cleanup(task_id) abstractmethod async

Release subscriber resources for a completed task.

MUST be idempotent. Multiple calls with the same task_id MUST NOT raise and MUST NOT affect resources for other tasks.

Called by WorkerAdapter at the end of normal task processing and by TaskManager._force_cancel_after when the worker does not cooperate within the cancel timeout. No other component may call this method.

Backends MUST implement this to avoid resource leaks (e.g. Redis UNSUBSCRIBE, key cleanup).

publish(task_id, event) -> str | None

Publish a stream event to all subscribers of a task. Returns an event ID if the backend supports it (e.g. Redis Streams), or None for backends that don't assign IDs.

Events MUST be delivered in the order they were published for a given task_id.

subscribe(task_id, *, after_event_id=None)

Subscribe to stream events for a task. MUST be used as an async context manager:

async with event_bus.subscribe(task_id) as stream:
    async for event in stream:
        process(event)

When after_event_id is provided, backends that support replay (e.g. Redis Streams) deliver events published after that ID.

cleanup(task_id)

Release subscriber resources for a completed task. Must be idempotent.

InMemoryEventBus

The default event bus for development. Uses asyncio.Queue for fan-out.

from a2akit import A2AServer

server = A2AServer(
    worker=MyWorker(),
    agent_card=AgentCardConfig(...),
    event_bus="memory",  # default
)

The buffer size is configurable:

export A2AKIT_EVENT_BUFFER=200  # default

EventEmitter

The EventEmitter is a facade that TaskContext uses to persist state (Storage) and broadcast events (EventBus) without knowing about either directly.

a2akit.event_emitter.EventEmitter

Bases: ABC

Thin interface that TaskContext uses to persist state and broadcast events.

This keeps TaskContext unaware of EventBus and Storage as separate concepts.

Call order contract for callers (TaskContextImpl):

  1. update_task() — Storage write (authoritative, must succeed)
  2. send_event() — EventBus publish (best-effort, may fail)

If step 2 fails, the state is still correct in Storage. Clients polling via GET will see the right state. SSE subscribers may miss intermediate events but will always see the final status event.

update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None) abstractmethod async

Persist a task state change (and optional artifacts/messages).

When state is None the current state is preserved. When status_message is provided alongside a state transition, it is stored in task.status.message. When expected_version is provided, it is passed through to Storage for optimistic concurrency control.

Returns the new version from Storage.

send_event(task_id, event) abstractmethod async

Broadcast a stream event to all subscribers of a task.

Call Order Contract

  1. update_task() — Storage write (authoritative, must succeed)
  2. send_event() — EventBus publish (best-effort, may fail)

If send_event fails, the state is still correct in Storage. Clients polling via GET will see the right state.

DefaultEventEmitter

The standard implementation that delegates to an EventBus and Storage pair:

  • Storage write is authoritative
  • EventBus failure is logged but not raised

Stream Event Types

The StreamEvent union type covers all possible events:

StreamEvent = (
    Task
    | Message
    | TaskStatusUpdateEvent
    | TaskArtifactUpdateEvent
    | DirectReply
)
Type Description
Task Initial task snapshot (first event in a stream)
TaskStatusUpdateEvent State transition with optional message
TaskArtifactUpdateEvent Artifact creation or update
DirectReply Internal wrapper for reply_directly() messages