Skip to content

TaskContext API Reference

TaskContext is the execution context passed to Worker.handle(). It provides the complete interface for interacting with tasks — reading input, emitting results, controlling lifecycle, and accessing dependencies.

Quick Reference

Method Target State Description
complete(text) completed Finish with optional text artifact
complete_json(data) completed Finish with JSON data artifact
respond(text) completed Finish with status message only (no artifact)
reply_directly(text) completed Return Message directly, skip task tracking
fail(reason) failed Mark task as failed
reject(reason) rejected Agent declines the task
request_input(question) input-required Pause and ask user for more info
request_auth(details) auth-required Pause and ask user for credentials
send_status(message) (stays working) Emit intermediate progress update
emit_text_artifact(text) (stays working) Stream a text chunk
emit_data_artifact(data) (stays working) Stream structured data
emit_artifact(...) (stays working) General-purpose artifact emission
Property Type Description
user_text str User's input as plain text
parts list Raw A2A message parts
task_id str Current task UUID
context_id str \| None Conversation identifier
is_cancelled bool Whether cancellation was requested
history list[HistoryMessage] Previous messages in this task
previous_artifacts list[PreviousArtifact] Artifacts from prior turns
deps DependencyContainer Injected dependencies

a2akit.worker.base.TaskContext

Bases: ABC

Execution context passed to Worker.handle().

Attributes:

Name Type Description
task_id str

Current task identifier.

context_id str | None

Optional conversation / context identifier.

message_id str

Identifier of the triggering message.

user_text str

The user's input as plain text.

parts list[Any]

Raw message parts (text, files, etc.).

metadata dict[str, Any]

Arbitrary metadata forwarded from the request.

user_text instance-attribute

parts instance-attribute

files abstractmethod property

File attachments from the user message.

Each FileInfo provides content (decoded bytes), url, filename, and media_type.

Example::

for f in ctx.files:
    if f.media_type == "image/png" and f.content:
        result = analyze_image(f.content)
        await ctx.complete(result)
        return

data_parts abstractmethod property

Structured data parts from the user message.

Returns each DataPart as a plain dict. Use this when clients send JSON payloads alongside text.

Example::

for part in ctx.data_parts:
    if "query" in part:
        results = run_query(part["query"])
        await ctx.complete_json(results)
        return

task_id instance-attribute

context_id instance-attribute

message_id instance-attribute

metadata instance-attribute

request_context abstractmethod property

Transient request data from middleware.

Contains secrets, headers, and other per-request data that was NOT persisted to Storage. Populated by middleware in before_dispatch. Empty dict if no middleware is registered.

This is separate from self.metadata which contains persisted data from message.metadata.

is_cancelled abstractmethod property

Check whether cancellation has been requested for this task.

Poll this in long-running loops to support cooperative cancellation.

Example::

for chunk in large_dataset:
    if ctx.is_cancelled:
        await ctx.fail("Cancelled by user.")
        return
    await ctx.emit_text_artifact(process(chunk), append=True)

turn_ended abstractmethod property

Whether the handler already signaled the end of this turn.

True after calling any terminal method (complete, fail, reject, respond, reply_directly) or a method that pauses for client input (request_input, request_auth).

history abstractmethod property

Previous messages in this task (excluding the current message).

Each HistoryMessage has role, text, parts, and message_id. Empty on the first turn. Use this for multi-turn conversation context.

Example::

if ctx.history:
    # Continuing a conversation
    prev = ctx.history[-1]
    await ctx.complete(f"You previously said: {prev.text}")
else:
    await ctx.request_input("What would you like to discuss?")

previous_artifacts abstractmethod property

Artifacts produced by this task in previous turns.

Each PreviousArtifact has artifact_id, name, and parts. Use this to build on earlier results in multi-turn workflows (e.g. iterative refinement).

Example::

if ctx.previous_artifacts:
    prev = ctx.previous_artifacts[-1]
    await ctx.complete(f"Refining: {prev.parts}")

deps abstractmethod property

Dependency container registered on the server.

Access by type key or string key::

db = ctx.deps[DatabasePool]
key = ctx.deps.get("api_key", "default")

accepts(mime_type) abstractmethod

Check whether the client accepts the given output MIME type.

Returns True if the client listed this type in acceptedOutputModes, or if no filter was specified (absent or empty means the client accepts everything).

Common MIME types::

ctx.accepts("text/plain")
ctx.accepts("application/json")
ctx.accepts("text/html")
ctx.accepts("text/csv")
ctx.accepts("text/markdown")
ctx.accepts("application/pdf")
ctx.accepts("image/png")
ctx.accepts("image/jpeg")
ctx.accepts("audio/mpeg")
ctx.accepts("video/mp4")
ctx.accepts("application/xml")
ctx.accepts("application/octet-stream")

Example::

async def handle(self, ctx: TaskContext) -> None:
    if ctx.accepts("application/json"):
        await ctx.complete_json({"revenue": 42000})
    else:
        await ctx.complete("Revenue: 42,000 €")

complete(text=None, *, artifact_id='final-answer') abstractmethod async

Complete the task, optionally producing a text artifact as result.

This is the primary way to finish a task. When text is provided, it is persisted as an Artifact that clients can retrieve later. When called without arguments, the task is marked completed with no output — useful after streaming artifacts via emit_* methods.

Use complete() when the response is a deliverable (a document, an answer, generated code). Use respond() instead when the reply is conversational and does not need to be stored as an artifact.

Example::

# Simple request-response
await ctx.complete(f"The answer is {result}")

# Complete without text after streaming chunks
for chunk in chunks:
    await ctx.emit_text_artifact(chunk, append=True)
await ctx.complete()

# Custom artifact ID for multi-artifact tasks
await ctx.complete("Final report", artifact_id="report")

complete_json(data, *, artifact_id='final-answer') abstractmethod async

Complete the task with a structured JSON artifact.

Creates an artifact with media_type: application/json in its metadata. Use this when the client expects machine-readable data rather than human-readable text.

Example::

await ctx.complete_json({"revenue": 42000, "currency": "EUR"})

# Content-negotiation pattern
if ctx.accepts("application/json"):
    await ctx.complete_json({"status": "ok", "count": 5})
else:
    await ctx.complete("Status: ok (5 items)")

fail(reason) abstractmethod async

Mark the task as failed with an error message.

The reason is sent to the client as a status message. Use this for errors that the agent encountered while processing (API failures, invalid data, timeouts, etc.).

Example::

try:
    result = await call_external_api()
except APIError as e:
    await ctx.fail(f"API call failed: {e}")
    return
await ctx.complete(result)

reject(reason=None) abstractmethod async

Reject the task — the agent decides not to perform it.

Unlike fail() which indicates an error during processing, reject() means the agent chose not to handle the request (e.g. out of scope, policy violation, unsupported language).

Example::

if "delete" in ctx.user_text.lower():
    await ctx.reject("I cannot perform destructive actions.")
    return

request_input(question) abstractmethod async

Ask the user for additional input (multi-turn conversation).

Transitions the task to input-required state. The client should display the question and send a follow-up message with the same context_id to continue the conversation.

Example::

if not ctx.history:
    await ctx.request_input("What language should I translate to?")
    return
# Second turn — history contains the original request
target_lang = ctx.user_text
await ctx.complete(translate(ctx.history[0].text, target_lang))

request_auth(details=None, *, schemes=None, credentials_hint=None, auth_url=None) abstractmethod async

Ask the client for additional credentials.

Transitions the task to auth-required state. Use this when the worker needs credentials beyond what the initial request provided (e.g. an OAuth token for a third-party service).

Parameters:

Name Type Description Default
details str | None

Human-readable explanation shown to the user.

None
schemes list[str] | None

Required auth schemes (e.g. ["Bearer", "OAuth2"]).

None
credentials_hint str | None

Hint about what credentials are needed.

None
auth_url str | None

URL where the client can obtain credentials.

None

Example::

if "github_token" not in ctx.request_context:
    await ctx.request_auth(
        "I need a GitHub token to access your repositories.",
        schemes=["Bearer"],
        auth_url="https://github.com/settings/tokens",
    )
    return

respond(text=None) abstractmethod async

Complete the task with a status message, without creating an artifact.

Use this instead of complete() when the reply is conversational or ephemeral — an acknowledgement, a clarification, a "done" — and does not need to be persisted as a retrievable artifact.

Example::

# Acknowledge a side-effect action
await send_email(to=recipient, body=draft)
await ctx.respond("Email sent successfully.")

# Compare with complete() which creates a persistent artifact:
# await ctx.complete("Here is the generated report...")

reply_directly(text) abstractmethod async

Return a Message response instead of a Task response.

The HTTP response to message:send will be {"message": {...}} instead of {"task": {...}}. Use this for lightweight, stateless interactions where the client does not need to track a task (e.g. health checks, simple Q&A, quick lookups).

The task is still created internally for lifecycle management, but the client receives only the message.

Example::

# Fast, stateless reply — client gets a Message, not a Task
await ctx.reply_directly(f"Current time: {now()}")

Streaming note: When the request arrives via message:stream, this method falls back to the standard Task lifecycle stream (Task snapshot + events + terminal status). This is spec-conformant (A2A §3.1.2).

send_status(message=None) abstractmethod async

Emit an intermediate progress update while the task keeps working.

Use this to inform the client about long-running operations. The task stays in working state. SSE subscribers see the update immediately. When message is provided, the status is also persisted so that polling clients (tasks/get) can see progress.

When message is None, only a bare working-state event is broadcast (no storage write).

Example::

await ctx.send_status("Downloading dataset...")
data = await download(url)
await ctx.send_status("Analyzing 10,000 rows...")
result = analyze(data)
await ctx.complete(result)

emit_artifact(*, artifact_id, text=None, data=None, file_bytes=None, file_url=None, media_type=None, filename=None, name=None, description=None, append=False, last_chunk=False, metadata=None, extensions=None) abstractmethod async

Emit an artifact with full control over content and metadata.

This is the low-level artifact method. For common cases, prefer the convenience wrappers:

  • emit_text_artifact() — single text chunk
  • emit_data_artifact() — structured JSON data

Artifacts are streamed to SSE subscribers immediately and batched for storage persistence. Call complete() after emitting all artifacts to finalize the task.

Provide exactly one content argument: text, data, file_bytes, or file_url.

Example::

# Stream a file artifact
await ctx.emit_artifact(
    artifact_id="export",
    file_bytes=csv_bytes,
    media_type="text/csv",
    filename="report.csv",
    name="Monthly Report",
    last_chunk=True,
)
await ctx.complete()

# Multi-part streaming with append
for i, chunk in enumerate(chunks):
    await ctx.emit_artifact(
        artifact_id="story",
        text=chunk,
        append=i > 0,
        last_chunk=i == len(chunks) - 1,
    )
await ctx.complete()

emit_text_artifact(text, *, artifact_id='answer', append=False, last_chunk=False) async

Emit a text artifact chunk — the most common streaming pattern.

Convenience wrapper around emit_artifact(). Use append=True for all chunks after the first. Set last_chunk=True on the final chunk so clients know the artifact is complete.

Example::

# Stream token by token
for i, token in enumerate(llm_stream):
    await ctx.emit_text_artifact(
        token,
        append=i > 0,
        last_chunk=i == total - 1,
    )
await ctx.complete()

# Single-shot artifact (no streaming)
await ctx.emit_text_artifact("Here is the summary.", last_chunk=True)
await ctx.complete()

emit_data_artifact(data, *, artifact_id='answer', media_type='application/json', append=False, last_chunk=False) async

Emit a structured data artifact chunk.

Convenience wrapper around emit_artifact() for JSON-serializable data. Sets media_type in artifact metadata (defaults to application/json).

Example::

# Emit a structured result
await ctx.emit_data_artifact(
    {"users": users, "total": len(users)},
    last_chunk=True,
)
await ctx.complete()

# Stream rows incrementally
for i, row in enumerate(query_results):
    await ctx.emit_data_artifact(
        row,
        append=i > 0,
        last_chunk=i == total - 1,
    )
await ctx.complete()

load_context() abstractmethod async

Load persisted conversation context for this context_id.

Returns whatever was previously saved via update_context(), or None if nothing was stored or context_id is not set. Use this to maintain state across turns in multi-turn conversations.

Example::

prefs = await ctx.load_context() or {}
lang = prefs.get("language", "en")
await ctx.complete(translate(ctx.user_text, lang))

update_context(context) abstractmethod async

Persist conversation context for this context_id.

The context is stored per context_id and available in subsequent turns via load_context(). Pass any JSON-serializable value. Does nothing if context_id is not set.

Example::

# Remember user preferences across turns
prefs = await ctx.load_context() or {}
prefs["language"] = ctx.user_text
await ctx.update_context(prefs)
await ctx.respond("Language preference saved.")

Properties

Input

Property Type Description
user_text str The user's input as plain text (all text parts joined)
parts list[Any] Raw A2A message parts (text, files, data)
files list[FileInfo] File parts as typed wrappers
data_parts list[dict] Structured data parts extracted from the message

Identifiers

Property Type Description
task_id str Current task UUID
context_id str \| None Conversation / context identifier
message_id str ID of the triggering message

Metadata

Property Type Description
metadata dict[str, Any] Persisted metadata from message.metadata
request_context dict[str, Any] Transient data from middleware (never persisted)

State

Property Type Description
is_cancelled bool Whether cancellation was requested
turn_ended bool Whether a lifecycle method was called

History

Property Type Description
history list[HistoryMessage] Previous messages in this task
previous_artifacts list[PreviousArtifact] Artifacts from prior turns

Dependencies

Property Type Description
deps DependencyContainer Dependency container from the server

Output Negotiation

accepts(mime_type) -> bool

Check whether the client accepts the given output MIME type. Returns True if the type is in acceptedOutputModes, or if no filter was specified (absent or empty means "accept everything"). Case-sensitive per RFC 2045.

if ctx.accepts("application/json"):
    await ctx.complete_json({"revenue": 42000})
elif ctx.accepts("text/csv"):
    await ctx.complete(to_csv(data))
else:
    await ctx.complete("Revenue: 42,000 €")

Lifecycle Methods

These methods transition the task to a new state. Exactly one must be called per handle() invocation.

complete(text=None, *, artifact_id="final-answer")

Mark the task as completed. Optionally attach a text artifact.

await ctx.complete("Here is your answer.")
await ctx.complete()  # completed without artifact

complete_json(data, *, artifact_id="final-answer")

Complete with a JSON data artifact.

await ctx.complete_json({"score": 0.95, "label": "positive"})

respond(text=None)

Complete with a status message only — no artifact is created.

await ctx.respond("Task acknowledged.")

reply_directly(text)

Return a Message directly without task tracking. The HTTP response to message:send will be {"message": {...}} instead of {"task": {...}}.

await ctx.reply_directly("Quick answer without task overhead.")

fail(reason)

Mark the task as failed.

await ctx.fail("External API returned 500.")

reject(reason=None)

Reject the task — the agent decides not to perform it.

await ctx.reject("I can only process English text.")

request_input(question)

Transition to input-required. The client should send a follow-up message.

await ctx.request_input("Which language should I translate to?")

request_auth(details=None)

Transition to auth-required for secondary credentials.

await ctx.request_auth("Please provide your GitHub token.")

Streaming Methods

These methods emit events while the task stays in working state.

send_status(message=None)

Emit an intermediate status update. When message is provided, it's persisted in task.status.message.

await ctx.send_status("Processing 3 of 10 files...")

emit_text_artifact(text, *, artifact_id="answer", append=False, last_chunk=False)

Emit a text chunk as an artifact update.

emit_data_artifact(data, *, artifact_id="answer", media_type="application/json", append=False, last_chunk=False)

Emit structured data as an artifact update.

emit_artifact(*, artifact_id, text=None, data=None, file_bytes=None, file_url=None, media_type=None, filename=None, name=None, description=None, append=False, last_chunk=False, metadata=None)

General-purpose artifact emission supporting text, data, file bytes, and file URLs.

Context Methods

load_context()

Load stored context for this task's context_id. Returns None if no context exists or context_id is None.

update_context(context)

Store context for this task's context_id. No-op if context_id is None.

Helper Types

FileInfo

@dataclass(frozen=True)
class FileInfo:
    content: bytes | None
    url: str | None
    filename: str | None
    media_type: str | None

HistoryMessage

@dataclass(frozen=True)
class HistoryMessage:
    role: str
    text: str
    parts: list[Any]
    message_id: str

PreviousArtifact

@dataclass(frozen=True)
class PreviousArtifact:
    artifact_id: str
    name: str | None
    parts: list[Any]