EventBus¶
The EventBus provides 1:N event fan-out for streaming task events to SSE subscribers.
EventBus ABC¶
a2akit.event_bus.base.EventBus
¶
Bases: ABC
Abstract event bus for broadcasting stream events to subscribers.
publish(task_id, event)
abstractmethod
async
¶
Publish a stream event to all subscribers of a task.
Returns an event ID if the backend supports it (e.g. Redis
Streams, Kafka), or None for backends that don't assign
IDs (e.g. InMemory).
Implementations MUST deliver events in the order they were
published for a given task_id.
subscribe(task_id, *, after_event_id=None)
abstractmethod
¶
Subscribe to stream events for a task.
MUST be used as an async context manager:
async with event_bus.subscribe(task_id) as stream:
async for event_id, event in stream:
...
Yields (event_id, event) tuples so that SSE endpoints can
use the event-bus-assigned ID as the SSE id: field, ensuring
Last-Event-ID reconnection maps directly to bus replay IDs.
The context manager guarantees cleanup of subscriber resources (e.g. Redis UNSUBSCRIBE, RabbitMQ consumer de-registration) regardless of how the consumer exits (normal, exception, cancel).
When after_event_id is provided, backends that support
replay (e.g. Redis Streams) deliver events published after
that ID. InMemory replays from its ring buffer.
Implementations MUST yield events in the order they were published (per A2A spec §3.5.2).
cleanup(task_id)
abstractmethod
async
¶
Release subscriber resources for a completed task.
MUST be idempotent. Multiple calls with the same task_id MUST NOT raise and MUST NOT affect resources for other tasks.
Called by WorkerAdapter at the end of normal task processing and by TaskManager._force_cancel_after when the worker does not cooperate within the cancel timeout. No other component may call this method.
Backends MUST implement this to avoid resource leaks (e.g. Redis UNSUBSCRIBE, key cleanup).
publish(task_id, event) -> str | None¶
Publish a stream event to all subscribers of a task. Returns an event ID if the backend supports it (e.g. Redis Streams), or None for backends that don't assign IDs.
Events MUST be delivered in the order they were published for a given task_id.
subscribe(task_id, *, after_event_id=None)¶
Subscribe to stream events for a task. MUST be used as an async context manager:
When after_event_id is provided, backends that support replay (e.g. Redis Streams) deliver events published after that ID.
cleanup(task_id)¶
Release subscriber resources for a completed task. Must be idempotent.
InMemoryEventBus¶
The default event bus for development. Uses asyncio.Queue for fan-out.
from a2akit import A2AServer
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
event_bus="memory", # default
)
The buffer size is configurable:
RedisEventBus¶
Redis-backed event bus using Pub/Sub for live delivery + Streams for replay buffer. Enables Last-Event-ID based reconnection across multiple server instances.
from a2akit import A2AServer
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
event_bus="redis://localhost:6379/0",
)
Or with an explicit instance:
from a2akit.event_bus.redis import RedisEventBus
event_bus = RedisEventBus(
"redis://localhost:6379/0",
stream_maxlen=1000,
)
server = A2AServer(worker=MyWorker(), agent_card=..., event_bus=event_bus)
Requires pip install a2akit[redis].
Dual-Write Architecture¶
Each publish() call uses a Redis pipeline (single roundtrip):
- XADD to a per-task replay stream (bounded by
stream_maxlen) - PUBLISH a lightweight wakeup signal (
"1") to a per-task Pub/Sub channel
The Pub/Sub message contains no payload — live subscribers read new entries from the stream via XRANGE after receiving the wakeup. This avoids double serialization and halves per-event bandwidth compared to embedding the full payload in the Pub/Sub message.
On subscribe(after_event_id=...):
- Replay —
XRANGEfrom the stream after the given ID - Gap-fill — re-check for events published between replay and Pub/Sub subscribe
- Live — wait for Pub/Sub wakeup, then
XRANGEfor new stream entries
This ensures zero event loss even during reconnection.
EventEmitter¶
The EventEmitter is a facade that TaskContext uses to persist state (Storage) and broadcast events (EventBus) without knowing about either directly.
a2akit.event_emitter.EventEmitter
¶
Bases: ABC
Thin interface that TaskContext uses to persist state and broadcast events.
This keeps TaskContext unaware of EventBus and Storage as separate concepts.
Call order contract for callers (TaskContextImpl):
update_task()— Storage write (authoritative, must succeed)send_event()— EventBus publish (best-effort, may fail)
If step 2 fails, the state is still correct in Storage. Clients polling via GET will see the right state. SSE subscribers may miss intermediate events but will always see the final status event.
update_task(task_id, state=None, *, status_message=None, artifacts=None, messages=None, task_metadata=None, expected_version=None)
abstractmethod
async
¶
Persist a task state change (and optional artifacts/messages).
When state is None the current state is preserved.
When status_message is provided alongside a state
transition, it is stored in task.status.message.
When expected_version is provided, it is passed through to
Storage for optimistic concurrency control.
Returns the new version from Storage.
send_event(task_id, event)
abstractmethod
async
¶
Broadcast a stream event to all subscribers of a task.
Call Order Contract¶
update_task()— Storage write (authoritative, must succeed)send_event()— EventBus publish (best-effort, may fail)
If send_event fails, the state is still correct in Storage. Clients polling via GET will see the right state.
DefaultEventEmitter¶
The standard implementation that delegates to an EventBus and Storage pair:
- Storage write is authoritative
- EventBus failure is logged but not raised
Stream Event Types¶
The StreamEvent union type covers all possible events:
| Type | Description |
|---|---|
Task |
Initial task snapshot (first event in a stream) |
TaskStatusUpdateEvent |
State transition with optional message |
TaskArtifactUpdateEvent |
Artifact creation or update |
DirectReply |
Internal wrapper for reply_directly() messages |