Worker Model¶
The Worker is the only class you implement to build an A2A agent. Everything else is handled by the framework.
The Worker ABC¶
from a2akit import Worker, TaskContext
class MyWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
await ctx.complete(f"Hello, {ctx.user_text}!")
The Worker abstract base class has a single required method: handle(ctx). That's it. No event queues, no state machines, no protocol wiring.
The Contract¶
Your handle() method MUST call exactly one lifecycle method before returning:
| Method | Effect |
|---|---|
ctx.complete(text?) |
Mark task completed with optional text artifact |
ctx.complete_json(data) |
Complete with a JSON data artifact |
ctx.respond(text?) |
Complete with a status message (no artifact) |
ctx.reply_directly(text) |
Return a Message directly without task tracking |
ctx.fail(reason) |
Mark task failed |
ctx.reject(reason?) |
Reject the task |
ctx.request_input(question) |
Ask user for more input |
ctx.request_auth(details?) |
Request authentication |
Auto-fail on missing lifecycle call
If your handle() method returns without calling any lifecycle method, the framework automatically marks the task as failed with the message: "Worker returned without calling a lifecycle method."
TaskContext¶
TaskContext is the single interface your worker interacts with. It abstracts away all A2A protocol details — storage writes, event broadcasting, artifact serialization, state transitions.
Properties¶
| Property | Type | Description |
|---|---|---|
ctx.user_text |
str |
The user's input as plain text |
ctx.parts |
list[Any] |
Raw message parts (text, files, etc.) |
ctx.files |
list[FileInfo] |
File parts with content, url, filename, media_type |
ctx.data_parts |
list[dict] |
Structured data parts |
ctx.task_id |
str |
Current task UUID |
ctx.context_id |
str \| None |
Conversation / context identifier |
ctx.message_id |
str |
ID of the triggering message |
ctx.metadata |
dict[str, Any] |
Persisted metadata from the request |
ctx.request_context |
dict[str, Any] |
Transient data from middleware (never persisted) |
ctx.is_cancelled |
bool |
Whether cancellation was requested |
ctx.turn_ended |
bool |
Whether a lifecycle method was called |
ctx.history |
list[HistoryMessage] |
Previous messages in this task |
ctx.previous_artifacts |
list[PreviousArtifact] |
Artifacts from prior turns |
ctx.deps |
DependencyContainer |
Dependency container from the server |
Streaming Methods¶
| Method | Description |
|---|---|
ctx.send_status(msg) |
Emit intermediate status update |
ctx.emit_text_artifact(...) |
Emit a text artifact chunk |
ctx.emit_data_artifact(data) |
Emit a structured data artifact chunk |
ctx.emit_artifact(...) |
Emit an artifact with any content type |
Context Methods¶
| Method | Description |
|---|---|
ctx.load_context() |
Load stored context for this conversation |
ctx.update_context(data) |
Store context for this conversation |
For full API details, see the TaskContext Reference.
Worker Testing¶
Workers are easy to test because they only depend on TaskContext. In tests, you can use the real framework with InMemoryStorage:
import pytest
from a2akit import A2AServer, AgentCardConfig
from httpx import ASGITransport, AsyncClient
from asgi_lifespan import LifespanManager
@pytest.fixture
async def client():
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(
name="Test", description="Test agent", version="0.1.0"
),
)
app = server.as_fastapi_app()
async with LifespanManager(app):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as c:
yield c
async def test_echo(client):
resp = await client.post(
"/v1/message:send",
json={
"message": {
"role": "user",
"parts": [{"text": "hello"}],
"messageId": "test-1",
}
},
)
assert resp.status_code == 200
assert "hello" in resp.json()["artifacts"][0]["parts"][0]["text"]