A2AServer & Config¶
A2AServer¶
The high-level server class that wires all components together and produces a FastAPI app.
a2akit.server.A2AServer
¶
High-level server that wires storage, broker, event bus, worker, and endpoints.
__init__(*, worker, agent_card, middlewares=None, storage='memory', broker='memory', event_bus='memory', cancel_registry=None, blocking_timeout_s=None, cancel_force_timeout_s=None, max_concurrent_tasks=None, hooks=None, settings=None, dependencies=None, enable_telemetry=None, additional_protocols=None, push_max_retries=None, push_retry_delay=None, push_timeout=None, push_max_concurrent=None, push_allow_http=None, push_idle_timeout=None, push_allowed_hosts=None, push_blocked_hosts=None, extended_card_provider=None)
¶
Store configuration for lazy initialization at startup.
as_fastapi_app(*, debug=False, **fastapi_kwargs)
¶
Create a fully configured FastAPI application.
Constructor Parameters¶
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(
name="My Agent",
description="What your agent does.",
version="0.1.0",
),
middlewares=[SecretExtractor()],
storage="memory",
broker="memory",
event_bus="memory",
cancel_registry=None,
blocking_timeout_s=30.0,
cancel_force_timeout_s=60.0,
max_concurrent_tasks=None,
hooks=LifecycleHooks(...),
settings=Settings(...),
dependencies={...},
)
| Parameter | Type | Default | Description |
|---|---|---|---|
worker |
Worker |
required | Your worker implementation |
agent_card |
AgentCardConfig |
required | Agent discovery card configuration |
middlewares |
list[A2AMiddleware] |
[] |
Middleware pipeline |
storage |
str \| Storage |
"memory" |
Storage backend ("memory", connection string, or instance) |
broker |
str \| Broker |
"memory" |
Broker backend ("memory", "redis://...", or instance) |
event_bus |
str \| EventBus |
"memory" |
Event bus backend ("memory", "redis://...", or instance) |
cancel_registry |
CancelRegistry \| None |
None |
Cancel registry (auto-created; uses Redis when broker is Redis) |
blocking_timeout_s |
float \| None |
None |
Timeout for message:send blocking (falls back to Settings) |
cancel_force_timeout_s |
float \| None |
None |
Force-cancel timeout (falls back to Settings) |
max_concurrent_tasks |
int \| None |
None |
Worker parallelism limit |
hooks |
LifecycleHooks \| None |
None |
Lifecycle hook callbacks |
settings |
Settings \| None |
None |
Override default settings |
dependencies |
dict[Any, Any] \| None |
None |
Dependency injection registry |
push_max_retries |
int \| None |
None |
Webhook delivery retries (falls back to Settings) |
push_retry_delay |
float \| None |
None |
Retry base delay in seconds |
push_timeout |
float \| None |
None |
Webhook HTTP timeout in seconds |
push_max_concurrent |
int \| None |
None |
Max concurrent webhook deliveries |
push_allow_http |
bool \| None |
None |
Allow HTTP webhook URLs (dev only) |
push_idle_timeout |
float \| None |
None |
Idle timeout for delivery queue workers (seconds) |
push_allowed_hosts |
set[str] \| None |
None |
Hostname allowlist for webhooks |
push_blocked_hosts |
set[str] \| None |
None |
Hostname blocklist for webhooks |
extended_card_provider |
Callable[[Request], Awaitable[AgentCardConfig]] \| None |
None |
Callback that returns a richer card for authenticated callers. When set, supportsAuthenticatedExtendedCard is automatically True |
Storage Backend Resolution¶
The storage parameter accepts:
"memory"—InMemoryStorage(default)"redis://..."/"rediss://..."—RedisStorage(requiresa2akit[redis])"postgresql+asyncpg://..."—PostgreSQLStorage(requiresa2akit[postgres])"sqlite+aiosqlite:///..."—SQLiteStorage(requiresa2akit[sqlite])- A
Storageinstance — used directly
Broker Backend Resolution¶
The broker parameter accepts:
"memory"—InMemoryBroker(default)"redis://..."/"rediss://..."—RedisBroker(requiresa2akit[redis])- A
Brokerinstance — used directly
Event Bus Backend Resolution¶
The event_bus parameter accepts:
"memory"—InMemoryEventBus(default)"redis://..."/"rediss://..."—RedisEventBus(requiresa2akit[redis])- An
EventBusinstance — used directly
Cancel Registry Resolution¶
The cancel_registry parameter accepts:
None(default) — auto-created:RedisCancelRegistrywhen broker is a Redis URL,InMemoryCancelRegistryotherwise- A
CancelRegistryinstance — used directly
Multi-Process Production Setup¶
For horizontally scalable deployments, use Redis for broker + event bus and PostgreSQL for storage:
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
broker="redis://redis:6379/0",
event_bus="redis://redis:6379/0",
storage="postgresql+asyncpg://user:pass@db:5432/mydb",
)
Storage + Broker consistency
Redis Broker + InMemoryStorage is not recommended for multi-process deployments because tasks created on one process won't be visible to another. Use Redis, PostgreSQL, or SQLite storage when using Redis broker.
as_fastapi_app(**fastapi_kwargs)¶
Returns a fully configured FastAPI application with:
- All A2A protocol endpoints
- Agent card discovery endpoint
- Exception handlers for A2A errors
- Lifespan management (startup/shutdown of storage, broker, event bus, dependencies)
Extra keyword arguments are passed to the FastAPI constructor.
AgentCardConfig¶
User-friendly configuration for the agent discovery card.
a2akit.agent_card.AgentCardConfig
¶
Bases: BaseModel
User-friendly configuration for building an AgentCard.
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Agent name |
description |
str |
required | Agent description |
version |
str |
"1.0.0" |
Agent version |
protocol_version |
str |
"0.3.0" |
A2A protocol version |
skills |
list[SkillConfig] |
[] |
Agent skills |
extensions |
list[ExtensionConfig] |
[] |
Agent extensions |
capabilities |
CapabilitiesConfig |
CapabilitiesConfig() |
Agent capabilities (see below) |
protocol |
Literal["jsonrpc", "http+json"] |
"jsonrpc" |
Transport protocol |
input_modes |
list[str] |
["application/json", "text/plain"] |
Accepted input modes |
output_modes |
list[str] |
["application/json", "text/plain"] |
Produced output modes |
provider |
ProviderConfig \| None |
None |
Agent provider info (organization, url) |
security_schemes |
dict[str, SecurityScheme] \| None |
None |
Security scheme declarations (deklarativ, kein Enforcement) |
security |
list[dict[str, list[str]]] \| None |
None |
Global security requirements (OR-of-ANDs) |
icon_url |
str \| None |
None |
URL to agent icon |
documentation_url |
str \| None |
None |
URL to agent documentation |
signatures |
list[SignatureConfig] \| None |
None |
JWS signatures (externally generated) |
supports_authenticated_extended_card |
bool |
False |
Advertise support for authenticated extended card (auto-set when extended_card_provider is passed to A2AServer) |
CapabilitiesConfig¶
Declares which A2A protocol features this agent supports. All capabilities default to False (opt-in).
| Field | Type | Default | Description |
|---|---|---|---|
streaming |
bool |
False |
Enable SSE streaming (message:stream, tasks:subscribe) |
state_transition_history |
bool |
False |
Advertise state transition history in Agent Card (transitions are always recorded) |
push_notifications |
bool |
False |
Enable push notification config CRUD and webhook delivery |
extended_agent_card |
bool |
False |
Advertise extended agent card capability in the agent card |
extensions |
list[AgentExtension] \| None |
None |
Protocol extensions (declarative, appear in agent card) |
Streaming is opt-in
Since v0.0.7, streaming defaults to False. Agents that use streaming must explicitly enable it:
AgentCardConfig(
name="My Agent",
description="...",
version="0.1.0",
capabilities=CapabilitiesConfig(streaming=True),
)
Without this, the server rejects streaming requests with UnsupportedOperationError, and the client raises AgentCapabilityError before making the request.
ProviderConfig¶
from a2akit import ProviderConfig
provider = ProviderConfig(
organization="Acme Corp",
url="https://acme.example.com",
)
| Field | Type | Description |
|---|---|---|
organization |
str |
Provider organization name |
url |
str |
Provider URL |
SignatureConfig¶
from a2akit import SignatureConfig
sig = SignatureConfig(
protected="eyJhbGciOiJSUzI1NiJ9", # Base64url JWS Protected Header
signature="abc123", # Base64url Signature
header={"kid": "key-1"}, # Optional unprotected header
)
a2akit does not compute JWS signatures. Generate them externally and pass the finished values.
| Field | Type | Default | Description |
|---|---|---|---|
protected |
str |
required | Base64url-encoded JWS Protected Header |
signature |
str |
required | Base64url-encoded Signature |
header |
dict[str, Any] \| None |
None |
Unprotected JWS Header |
SkillConfig¶
from a2akit import SkillConfig
skill = SkillConfig(
id="translate",
name="Translation",
description="Translates text between languages",
tags=["translation", "nlp"],
examples=["Translate 'hello' to French"],
input_modes=["text/plain"],
output_modes=["text/plain", "application/json"],
security=[{"bearer": []}],
)
| Field | Type | Default | Description |
|---|---|---|---|
id |
str |
required | Skill identifier |
name |
str |
required | Display name |
description |
str |
required | What the skill does |
tags |
list[str] |
[] |
Discovery tags |
examples |
list[str] |
[] |
Example prompts |
input_modes |
list[str] \| None |
None |
Override global defaultInputModes for this skill |
output_modes |
list[str] \| None |
None |
Override global defaultOutputModes for this skill |
security |
list[dict[str, list[str]]] \| None |
None |
Per-skill security requirements |
ExtensionConfig¶
from a2akit import ExtensionConfig
ext = ExtensionConfig(
uri="urn:a2a:ext:custom-feature",
description="Custom extension",
params={"setting": "value"},
)
Settings¶
a2akit.config.Settings
¶
Bases: BaseSettings
See Configuration for full details on environment variables and overrides.