TaskContext API Reference¶
TaskContext is the execution context passed to Worker.handle(). It provides the complete interface for interacting with tasks — reading input, emitting results, controlling lifecycle, and accessing dependencies.
Quick Reference¶
| Method | Target State | Description |
|---|---|---|
complete(text) |
completed |
Finish with optional text artifact |
complete_json(data) |
completed |
Finish with JSON data artifact |
respond(text) |
completed |
Finish with status message only (no artifact) |
reply_directly(text) |
completed |
Return Message directly, skip task tracking |
fail(reason) |
failed |
Mark task as failed |
reject(reason) |
rejected |
Agent declines the task |
request_input(question) |
input-required |
Pause and ask user for more info |
request_auth(details) |
auth-required |
Pause and ask user for credentials |
send_status(message) |
(stays working) | Emit intermediate progress update |
emit_text_artifact(text) |
(stays working) | Stream a text chunk |
emit_data_artifact(data) |
(stays working) | Stream structured data |
emit_artifact(...) |
(stays working) | General-purpose artifact emission |
| Property | Type | Description |
|---|---|---|
user_text |
str |
User's input as plain text |
parts |
list |
Raw A2A message parts |
task_id |
str |
Current task UUID |
context_id |
str \| None |
Conversation identifier |
is_cancelled |
bool |
Whether cancellation was requested |
history |
list[HistoryMessage] |
Previous messages in this task |
previous_artifacts |
list[PreviousArtifact] |
Artifacts from prior turns |
deps |
DependencyContainer |
Injected dependencies |
a2akit.worker.base.TaskContext
¶
Bases: ABC
Execution context passed to Worker.handle().
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
str
|
Current task identifier. |
context_id |
str | None
|
Optional conversation / context identifier. |
message_id |
str
|
Identifier of the triggering message. |
user_text |
str
|
The user's input as plain text. |
parts |
list[Any]
|
Raw message parts (text, files, etc.). |
metadata |
dict[str, Any]
|
Arbitrary metadata forwarded from the request. |
user_text
instance-attribute
¶
parts
instance-attribute
¶
files
abstractmethod
property
¶
File attachments from the user message.
Each FileInfo provides content (decoded bytes),
url, filename, and media_type.
Example::
for f in ctx.files:
if f.media_type == "image/png" and f.content:
result = analyze_image(f.content)
await ctx.complete(result)
return
data_parts
abstractmethod
property
¶
Structured data parts from the user message.
Returns each DataPart as a plain dict. Use this when
clients send JSON payloads alongside text.
Example::
for part in ctx.data_parts:
if "query" in part:
results = run_query(part["query"])
await ctx.complete_json(results)
return
task_id
instance-attribute
¶
context_id
instance-attribute
¶
message_id
instance-attribute
¶
metadata
instance-attribute
¶
request_context
abstractmethod
property
¶
Transient request data from middleware.
Contains secrets, headers, and other per-request data that
was NOT persisted to Storage. Populated by middleware in
before_dispatch. Empty dict if no middleware is registered.
This is separate from self.metadata which contains
persisted data from message.metadata.
is_cancelled
abstractmethod
property
¶
Check whether cancellation has been requested for this task.
Poll this in long-running loops to support cooperative cancellation.
Example::
for chunk in large_dataset:
if ctx.is_cancelled:
await ctx.fail("Cancelled by user.")
return
await ctx.emit_text_artifact(process(chunk), append=True)
turn_ended
abstractmethod
property
¶
Whether the handler already signaled the end of this turn.
True after calling any terminal method (complete, fail,
reject, respond, reply_directly) or a method that
pauses for client input (request_input, request_auth).
history
abstractmethod
property
¶
Previous messages in this task (excluding the current message).
Each HistoryMessage has role, text, parts, and
message_id. Empty on the first turn. Use this for multi-turn
conversation context.
Example::
if ctx.history:
# Continuing a conversation
prev = ctx.history[-1]
await ctx.complete(f"You previously said: {prev.text}")
else:
await ctx.request_input("What would you like to discuss?")
previous_artifacts
abstractmethod
property
¶
Artifacts produced by this task in previous turns.
Each PreviousArtifact has artifact_id, name, and
parts. Use this to build on earlier results in multi-turn
workflows (e.g. iterative refinement).
Example::
if ctx.previous_artifacts:
prev = ctx.previous_artifacts[-1]
await ctx.complete(f"Refining: {prev.parts}")
deps
abstractmethod
property
¶
Dependency container registered on the server.
Access by type key or string key::
db = ctx.deps[DatabasePool]
key = ctx.deps.get("api_key", "default")
accepts(mime_type)
abstractmethod
¶
Check whether the client accepts the given output MIME type.
Returns True if the client listed this type in
acceptedOutputModes, or if no filter was specified
(absent or empty means the client accepts everything).
Common MIME types::
ctx.accepts("text/plain")
ctx.accepts("application/json")
ctx.accepts("text/html")
ctx.accepts("text/csv")
ctx.accepts("text/markdown")
ctx.accepts("application/pdf")
ctx.accepts("image/png")
ctx.accepts("image/jpeg")
ctx.accepts("audio/mpeg")
ctx.accepts("video/mp4")
ctx.accepts("application/xml")
ctx.accepts("application/octet-stream")
Example::
async def handle(self, ctx: TaskContext) -> None:
if ctx.accepts("application/json"):
await ctx.complete_json({"revenue": 42000})
else:
await ctx.complete("Revenue: 42,000 €")
complete(text=None, *, artifact_id='final-answer')
abstractmethod
async
¶
Complete the task, optionally producing a text artifact as result.
This is the primary way to finish a task. When text is provided,
it is persisted as an Artifact that clients can retrieve later.
When called without arguments, the task is marked completed with no
output — useful after streaming artifacts via emit_* methods.
Use complete() when the response is a deliverable (a document,
an answer, generated code). Use respond() instead when the reply
is conversational and does not need to be stored as an artifact.
Example::
# Simple request-response
await ctx.complete(f"The answer is {result}")
# Complete without text after streaming chunks
for chunk in chunks:
await ctx.emit_text_artifact(chunk, append=True)
await ctx.complete()
# Custom artifact ID for multi-artifact tasks
await ctx.complete("Final report", artifact_id="report")
complete_json(data, *, artifact_id='final-answer')
abstractmethod
async
¶
Complete the task with a structured JSON artifact.
Creates an artifact with media_type: application/json in its
metadata. Use this when the client expects machine-readable data
rather than human-readable text.
Example::
await ctx.complete_json({"revenue": 42000, "currency": "EUR"})
# Content-negotiation pattern
if ctx.accepts("application/json"):
await ctx.complete_json({"status": "ok", "count": 5})
else:
await ctx.complete("Status: ok (5 items)")
fail(reason)
abstractmethod
async
¶
Mark the task as failed with an error message.
The reason is sent to the client as a status message. Use this for errors that the agent encountered while processing (API failures, invalid data, timeouts, etc.).
Example::
try:
result = await call_external_api()
except APIError as e:
await ctx.fail(f"API call failed: {e}")
return
await ctx.complete(result)
reject(reason=None)
abstractmethod
async
¶
Reject the task — the agent decides not to perform it.
Unlike fail() which indicates an error during processing,
reject() means the agent chose not to handle the request
(e.g. out of scope, policy violation, unsupported language).
Example::
if "delete" in ctx.user_text.lower():
await ctx.reject("I cannot perform destructive actions.")
return
request_input(question)
abstractmethod
async
¶
Ask the user for additional input (multi-turn conversation).
Transitions the task to input-required state. The client
should display the question and send a follow-up message with
the same context_id to continue the conversation.
Example::
if not ctx.history:
await ctx.request_input("What language should I translate to?")
return
# Second turn — history contains the original request
target_lang = ctx.user_text
await ctx.complete(translate(ctx.history[0].text, target_lang))
request_auth(details=None, *, schemes=None, credentials_hint=None, auth_url=None)
abstractmethod
async
¶
Ask the client for additional credentials.
Transitions the task to auth-required state. Use this when
the worker needs credentials beyond what the initial request
provided (e.g. an OAuth token for a third-party service).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
details
|
str | None
|
Human-readable explanation shown to the user. |
None
|
schemes
|
list[str] | None
|
Required auth schemes (e.g. |
None
|
credentials_hint
|
str | None
|
Hint about what credentials are needed. |
None
|
auth_url
|
str | None
|
URL where the client can obtain credentials. |
None
|
Example::
if "github_token" not in ctx.request_context:
await ctx.request_auth(
"I need a GitHub token to access your repositories.",
schemes=["Bearer"],
auth_url="https://github.com/settings/tokens",
)
return
respond(text=None)
abstractmethod
async
¶
Complete the task with a status message, without creating an artifact.
Use this instead of complete() when the reply is conversational
or ephemeral — an acknowledgement, a clarification, a "done" — and
does not need to be persisted as a retrievable artifact.
Example::
# Acknowledge a side-effect action
await send_email(to=recipient, body=draft)
await ctx.respond("Email sent successfully.")
# Compare with complete() which creates a persistent artifact:
# await ctx.complete("Here is the generated report...")
reply_directly(text)
abstractmethod
async
¶
Return a Message response instead of a Task response.
The HTTP response to message:send will be
{"message": {...}} instead of {"task": {...}}.
Use this for lightweight, stateless interactions where the
client does not need to track a task (e.g. health checks,
simple Q&A, quick lookups).
The task is still created internally for lifecycle management, but the client receives only the message.
Example::
# Fast, stateless reply — client gets a Message, not a Task
await ctx.reply_directly(f"Current time: {now()}")
Streaming note: When the request arrives via message:stream,
this method falls back to the standard Task lifecycle stream
(Task snapshot + events + terminal status). This is
spec-conformant (A2A §3.1.2).
send_status(message=None)
abstractmethod
async
¶
Emit an intermediate progress update while the task keeps working.
Use this to inform the client about long-running operations.
The task stays in working state. SSE subscribers see the
update immediately. When message is provided, the status
is also persisted so that polling clients (tasks/get) can
see progress.
When message is None, only a bare working-state event
is broadcast (no storage write).
Example::
await ctx.send_status("Downloading dataset...")
data = await download(url)
await ctx.send_status("Analyzing 10,000 rows...")
result = analyze(data)
await ctx.complete(result)
emit_artifact(*, artifact_id, text=None, data=None, file_bytes=None, file_url=None, media_type=None, filename=None, name=None, description=None, append=False, last_chunk=False, metadata=None, extensions=None)
abstractmethod
async
¶
Emit an artifact with full control over content and metadata.
This is the low-level artifact method. For common cases, prefer the convenience wrappers:
emit_text_artifact()— single text chunkemit_data_artifact()— structured JSON data
Artifacts are streamed to SSE subscribers immediately and
batched for storage persistence. Call complete() after
emitting all artifacts to finalize the task.
Provide exactly one content argument: text, data,
file_bytes, or file_url.
Example::
# Stream a file artifact
await ctx.emit_artifact(
artifact_id="export",
file_bytes=csv_bytes,
media_type="text/csv",
filename="report.csv",
name="Monthly Report",
last_chunk=True,
)
await ctx.complete()
# Multi-part streaming with append
for i, chunk in enumerate(chunks):
await ctx.emit_artifact(
artifact_id="story",
text=chunk,
append=i > 0,
last_chunk=i == len(chunks) - 1,
)
await ctx.complete()
emit_text_artifact(text, *, artifact_id='answer', append=False, last_chunk=False)
async
¶
Emit a text artifact chunk — the most common streaming pattern.
Convenience wrapper around emit_artifact(). Use append=True
for all chunks after the first. Set last_chunk=True on the
final chunk so clients know the artifact is complete.
Example::
# Stream token by token
for i, token in enumerate(llm_stream):
await ctx.emit_text_artifact(
token,
append=i > 0,
last_chunk=i == total - 1,
)
await ctx.complete()
# Single-shot artifact (no streaming)
await ctx.emit_text_artifact("Here is the summary.", last_chunk=True)
await ctx.complete()
emit_data_artifact(data, *, artifact_id='answer', media_type='application/json', append=False, last_chunk=False)
async
¶
Emit a structured data artifact chunk.
Convenience wrapper around emit_artifact() for JSON-serializable
data. Sets media_type in artifact metadata (defaults to
application/json).
Example::
# Emit a structured result
await ctx.emit_data_artifact(
{"users": users, "total": len(users)},
last_chunk=True,
)
await ctx.complete()
# Stream rows incrementally
for i, row in enumerate(query_results):
await ctx.emit_data_artifact(
row,
append=i > 0,
last_chunk=i == total - 1,
)
await ctx.complete()
load_context()
abstractmethod
async
¶
Load persisted conversation context for this context_id.
Returns whatever was previously saved via update_context(),
or None if nothing was stored or context_id is not set.
Use this to maintain state across turns in multi-turn conversations.
Example::
prefs = await ctx.load_context() or {}
lang = prefs.get("language", "en")
await ctx.complete(translate(ctx.user_text, lang))
update_context(context)
abstractmethod
async
¶
Persist conversation context for this context_id.
The context is stored per context_id and available in
subsequent turns via load_context(). Pass any
JSON-serializable value. Does nothing if context_id is not set.
Example::
# Remember user preferences across turns
prefs = await ctx.load_context() or {}
prefs["language"] = ctx.user_text
await ctx.update_context(prefs)
await ctx.respond("Language preference saved.")
Properties¶
Input¶
| Property | Type | Description |
|---|---|---|
user_text |
str |
The user's input as plain text (all text parts joined) |
parts |
list[Any] |
Raw A2A message parts (text, files, data) |
files |
list[FileInfo] |
File parts as typed wrappers |
data_parts |
list[dict] |
Structured data parts extracted from the message |
Identifiers¶
| Property | Type | Description |
|---|---|---|
task_id |
str |
Current task UUID |
context_id |
str \| None |
Conversation / context identifier |
message_id |
str |
ID of the triggering message |
Metadata¶
| Property | Type | Description |
|---|---|---|
metadata |
dict[str, Any] |
Persisted metadata from message.metadata |
request_context |
dict[str, Any] |
Transient data from middleware (never persisted) |
State¶
| Property | Type | Description |
|---|---|---|
is_cancelled |
bool |
Whether cancellation was requested |
turn_ended |
bool |
Whether a lifecycle method was called |
History¶
| Property | Type | Description |
|---|---|---|
history |
list[HistoryMessage] |
Previous messages in this task |
previous_artifacts |
list[PreviousArtifact] |
Artifacts from prior turns |
Dependencies¶
| Property | Type | Description |
|---|---|---|
deps |
DependencyContainer |
Dependency container from the server |
Output Negotiation¶
accepts(mime_type) -> bool¶
Check whether the client accepts the given output MIME type. Returns True if the type is in acceptedOutputModes, or if no filter was specified (absent or empty means "accept everything"). Case-sensitive per RFC 2045.
if ctx.accepts("application/json"):
await ctx.complete_json({"revenue": 42000})
elif ctx.accepts("text/csv"):
await ctx.complete(to_csv(data))
else:
await ctx.complete("Revenue: 42,000 €")
Lifecycle Methods¶
These methods transition the task to a new state. Exactly one must be called per handle() invocation.
complete(text=None, *, artifact_id="final-answer")¶
Mark the task as completed. Optionally attach a text artifact.
complete_json(data, *, artifact_id="final-answer")¶
Complete with a JSON data artifact.
respond(text=None)¶
Complete with a status message only — no artifact is created.
reply_directly(text)¶
Return a Message directly without task tracking. The HTTP response to message:send will be {"message": {...}} instead of {"task": {...}}.
fail(reason)¶
Mark the task as failed.
reject(reason=None)¶
Reject the task — the agent decides not to perform it.
request_input(question)¶
Transition to input-required. The client should send a follow-up message.
request_auth(details=None)¶
Transition to auth-required for secondary credentials.
Streaming Methods¶
These methods emit events while the task stays in working state.
send_status(message=None)¶
Emit an intermediate status update. When message is provided, it's persisted in task.status.message.
emit_text_artifact(text, *, artifact_id="answer", append=False, last_chunk=False)¶
Emit a text chunk as an artifact update.
emit_data_artifact(data, *, artifact_id="answer", media_type="application/json", append=False, last_chunk=False)¶
Emit structured data as an artifact update.
emit_artifact(*, artifact_id, text=None, data=None, file_bytes=None, file_url=None, media_type=None, filename=None, name=None, description=None, append=False, last_chunk=False, metadata=None)¶
General-purpose artifact emission supporting text, data, file bytes, and file URLs.
Context Methods¶
load_context()¶
Load stored context for this task's context_id. Returns None if no context exists or context_id is None.
update_context(context)¶
Store context for this task's context_id. No-op if context_id is None.
Helper Types¶
FileInfo¶
@dataclass(frozen=True)
class FileInfo:
content: bytes | None
url: str | None
filename: str | None
media_type: str | None