EventBus¶
The EventBus provides 1:N event fan-out for streaming task events to SSE subscribers.
EventBus ABC¶
a2akit.event_bus.base.EventBus
¶
Bases: ABC
Abstract event bus for broadcasting stream events to subscribers.
publish(task_id, event)
abstractmethod
async
¶
Publish a stream event to all subscribers of a task.
Returns an event ID if the backend supports it (e.g. Redis
Streams, Kafka), or None for backends that don't assign
IDs (e.g. InMemory).
Implementations MUST deliver events in the order they were
published for a given task_id.
subscribe(task_id, *, after_event_id=None)
abstractmethod
¶
Subscribe to stream events for a task.
MUST be used as an async context manager:
async with event_bus.subscribe(task_id) as stream:
async for event in stream:
...
The context manager guarantees cleanup of subscriber resources (e.g. Redis UNSUBSCRIBE, RabbitMQ consumer de-registration) regardless of how the consumer exits (normal, exception, cancel).
When after_event_id is provided, backends that support
replay (e.g. Redis Streams) deliver events published after
that ID. InMemory ignores this parameter.
Implementations MUST yield events in the order they were published (per A2A spec §3.5.2).
cleanup(task_id)
abstractmethod
async
¶
Release subscriber resources for a completed task.
MUST be idempotent. Multiple calls with the same task_id MUST NOT raise and MUST NOT affect resources for other tasks.
Called by WorkerAdapter at the end of normal task processing and by TaskManager._force_cancel_after when the worker does not cooperate within the cancel timeout. No other component may call this method.
Backends MUST implement this to avoid resource leaks (e.g. Redis UNSUBSCRIBE, key cleanup).
publish(task_id, event) -> str | None¶
Publish a stream event to all subscribers of a task. Returns an event ID if the backend supports it (e.g. Redis Streams), or None for backends that don't assign IDs.
Events MUST be delivered in the order they were published for a given task_id.
subscribe(task_id, *, after_event_id=None)¶
Subscribe to stream events for a task. MUST be used as an async context manager:
When after_event_id is provided, backends that support replay (e.g. Redis Streams) deliver events published after that ID.
cleanup(task_id)¶
Release subscriber resources for a completed task. Must be idempotent.
InMemoryEventBus¶
The default event bus for development. Uses asyncio.Queue for fan-out.
from a2akit import A2AServer
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
event_bus="memory", # default
)
The buffer size is configurable:
EventEmitter¶
The EventEmitter is a facade that TaskContext uses to persist state (Storage) and broadcast events (EventBus) without knowing about either directly.
a2akit.event_emitter.EventEmitter
¶
Bases: ABC
Thin interface that TaskContext uses to persist state and broadcast events.
This keeps TaskContext unaware of EventBus and Storage as separate concepts.
Call order contract for callers (TaskContextImpl):
update_task()— Storage write (authoritative, must succeed)send_event()— EventBus publish (best-effort, may fail)
If step 2 fails, the state is still correct in Storage. Clients polling via GET will see the right state. SSE subscribers may miss intermediate events but will always see the final status event.
update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None)
abstractmethod
async
¶
Persist a task state change (and optional artifacts/messages).
When state is None the current state is preserved.
When status_message is provided alongside a state
transition, it is stored in task.status.message.
When expected_version is provided, it is passed through to
Storage for optimistic concurrency control.
Returns the new version from Storage.
send_event(task_id, event)
abstractmethod
async
¶
Broadcast a stream event to all subscribers of a task.
Call Order Contract¶
update_task()— Storage write (authoritative, must succeed)send_event()— EventBus publish (best-effort, may fail)
If send_event fails, the state is still correct in Storage. Clients polling via GET will see the right state.
DefaultEventEmitter¶
The standard implementation that delegates to an EventBus and Storage pair:
- Storage write is authoritative
- EventBus failure is logged but not raised
Stream Event Types¶
The StreamEvent union type covers all possible events:
| Type | Description |
|---|---|
Task |
Initial task snapshot (first event in a stream) |
TaskStatusUpdateEvent |
State transition with optional message |
TaskArtifactUpdateEvent |
Artifact creation or update |
DirectReply |
Internal wrapper for reply_directly() messages |