Streaming Agent¶
Emits the user's input word-by-word as streaming artifacts with progress updates.
import asyncio
from a2akit import A2AServer, AgentCardConfig, CapabilitiesConfig, TaskContext, Worker
class StreamingWorker(Worker):
"""Streams the user's input back word by word."""
async def handle(self, ctx: TaskContext) -> None:
words = ctx.user_text.split()
await ctx.send_status(f"Streaming {len(words)} words...")
for i, word in enumerate(words):
is_last = i == len(words) - 1
await ctx.emit_text_artifact(
text=word + ("" if is_last else " "),
artifact_id="stream",
append=(i > 0),
last_chunk=is_last,
)
await asyncio.sleep(0.1)
await ctx.complete()
server = A2AServer(
worker=StreamingWorker(),
agent_card=AgentCardConfig(
name="Streamer",
description="Word-by-word streaming",
version="0.1.0",
capabilities=CapabilitiesConfig(streaming=True),
),
)
app = server.as_fastapi_app()
Run it¶
Test it¶
Use the streaming endpoint to see events arrive in real-time:
curl -N -X POST http://localhost:8000/v1/message:stream \
-H "Content-Type: application/json" \
-d '{"message":{"role":"user","parts":[{"text":"hello beautiful world"}],"messageId":"1"}}'
Expected output¶
A stream of SSE events:
- Task snapshot with
state: submitted - Status update: "Streaming 3 words..."
- Artifact update: "hello " (append=false)
- Artifact update: "beautiful " (append=true)
- Artifact update: "world" (append=true, last_chunk=true)
- Final status:
state: completed