Skip to content

A2AClient

Dev-first client for interacting with A2A agents. Auto-discovers capabilities, detects protocol, and provides high-level methods for send, stream, and task management.

Quick Start

from a2akit import A2AClient

async with A2AClient("http://localhost:8000") as client:
    result = await client.send("Hello!")
    print(result.text)

Constructor

A2AClient(
    url: str,
    *,
    headers: dict[str, str] | None = None,
    timeout: float = 30.0,
    protocol: str | None = None,
    httpx_client: httpx.AsyncClient | None = None,
    card_validator: Callable[[AgentCard, bytes], None] | None = None,
)
Parameter Type Default Description
url str required Base URL of the agent (e.g. http://localhost:8000)
headers dict None Extra HTTP headers for all requests
timeout float 30.0 HTTP timeout in seconds
protocol str None Force protocol ("jsonrpc" or "http+json"). Auto-detected from agent card if None
httpx_client AsyncClient None Bring your own httpx client. Lifecycle is managed externally
card_validator Callable None Optional hook called during connect() with the parsed AgentCard and raw response bytes. Raise to reject the card

Lifecycle

Use as an async context manager:

async with A2AClient("http://localhost:8000") as client:
    ...

Or manage manually:

client = A2AClient("http://localhost:8000")
await client.connect()
# ...
await client.close()

Methods

send(text, *, task_id, context_id, blocking, metadata) -> ClientResult

Send a text message. Returns a ClientResult with the agent's response.

result = await client.send("Hello!")
print(result.text)
print(result.state)     # "completed", "failed", etc.
print(result.task_id)

send_parts(parts, *, task_id, context_id, blocking, metadata) -> ClientResult

Send raw Part objects (text, data, files).

stream(text, *, task_id, context_id, metadata) -> AsyncIterator[StreamEvent]

Stream a message, yielding StreamEvent objects with full event details.

Requires streaming capability

Raises AgentCapabilityError if the agent does not support streaming.

async for event in client.stream("Hello"):
    if event.kind == "artifact":
        print(event.text, end="")
    if event.is_final:
        print(f"\nFinal state: {event.state}")

stream_text(text, *, task_id, context_id, metadata) -> AsyncIterator[str]

Stream only text content as plain strings.

async for chunk in client.stream_text("Hello"):
    print(chunk, end="")

get_task(task_id, *, history_length) -> ClientResult

Fetch a task by ID.

list_tasks(*, context_id, status, page_size, page_token, history_length) -> ListResult

List tasks with optional filters and pagination.

cancel(task_id) -> ClientResult

Cancel a task.

subscribe(task_id) -> AsyncIterator[StreamEvent]

Subscribe to updates for an existing task.

Requires streaming capability

Raises AgentCapabilityError if the agent does not support streaming.

set_push_config(task_id, *, url, token, config_id, authentication) -> dict

Set a push notification config for a task.

await client.set_push_config(
    task_id,
    url="https://my-app.com/webhook",
    token="my-secret",
)

get_push_config(task_id, config_id) -> dict

Get a push notification config.

list_push_configs(task_id) -> list[dict]

List all push configs for a task.

delete_push_config(task_id, config_id) -> None

Delete a push notification config.

get_extended_card() -> AgentCard

Fetch the authenticated extended agent card. Returns a full AgentCard with potentially richer information (e.g., premium skills) based on the caller's authentication.

extended = await client.get_extended_card()
print(f"Extended skills: {len(extended.skills)}")
for skill in extended.skills:
    print(f"  - {skill.name}")

Requires supportsAuthenticatedExtendedCard

Returns 404 (REST) or error code -32007 (JSON-RPC) if the agent has not configured an extended_card_provider.

Card Validation

The card_validator parameter lets you verify the agent card before accepting it. The callable receives the parsed AgentCard and the raw HTTP response body as bytes. If it raises, connect() propagates the exception and the client stays disconnected.

The raw bytes argument exists for JWS signature verification — re-serializing via card.model_dump_json() may produce different bytes (key ordering, whitespace), which would break detached-payload signatures.

# Name allowlist
TRUSTED = {"My Agent", "Internal Router"}

def check_allowlist(card, raw_body):
    if card.name not in TRUSTED:
        raise ValueError(f"Untrusted agent: {card.name}")

async with A2AClient(url, card_validator=check_allowlist) as client:
    result = await client.send("Hello")
# Signature presence check
def require_signatures(card, raw_body):
    if not card.signatures:
        raise ValueError("Agent card has no JWS signatures")

async with A2AClient(url, card_validator=require_signatures) as client:
    ...

Properties

Property Type Description
agent_card AgentCard The discovered agent card
agent_name str Shortcut for agent_card.name
capabilities AgentCapabilities \| None Shortcut for agent_card.capabilities
protocol str Active protocol ("jsonrpc" or "http+json")
is_connected bool Connection status

ClientResult

Primary return type for non-streaming operations.

Field Type Description
task_id str Task identifier
context_id str \| None Context identifier
state str Task state ("completed", "failed", etc.)
text str \| None Extracted text from artifacts or status
data dict \| None Extracted data from first data artifact
artifacts list[ArtifactInfo] All artifacts
raw_task Task \| None Original Task object
raw_message Message \| None Original Message (direct replies)

Convenience properties: completed, failed, input_required, auth_required, canceled, rejected, is_terminal.

StreamEvent

Yielded during streaming operations.

Field Type Description
kind str "task", "status", or "artifact"
state str \| None Current task state
text str \| None Text content
data dict \| None Data content
artifact_id str \| None Artifact identifier
is_final bool Whether this is the final event

Errors

Error When
AgentNotFoundError Agent unreachable or invalid agent card
AgentCapabilityError Streaming called on non-streaming agent
NotConnectedError Method called before connect()
TaskNotFoundError Task ID not found
TaskNotCancelableError Task cannot be canceled
TaskTerminalError Task is in a terminal state
ProtocolError Transport-level error