Cancellation¶
a2akit supports two cancellation modes: cooperative cancel (worker checks a flag) and force-cancel (timeout fallback). Together, they ensure tasks don't get stuck.
Cooperative Cancellation¶
The worker checks ctx.is_cancelled at natural checkpoints:
import asyncio
from a2akit import Worker, TaskContext
class LongRunningWorker(Worker):
async def handle(self, ctx: TaskContext) -> None:
for i in range(100):
if ctx.is_cancelled: # (1)!
await ctx.fail("Cancelled by user")
return
await ctx.send_status(f"Step {i + 1}/100")
await asyncio.sleep(0.5)
await ctx.complete("All done!")
is_cancelledis a non-blocking check against the CancelRegistry. Place it in your processing loop.
When a client sends POST /v1/tasks/{task_id}:cancel:
- CancelRegistry records the cancellation signal
- Worker's next
ctx.is_cancelledcheck returnsTrue - Worker handles it gracefully (call
fail(),complete(), or any lifecycle method)
Force-Cancel¶
If the worker doesn't cooperate within the timeout, TaskManager forces the transition:
sequenceDiagram
participant Client
participant TM as TaskManager
participant CR as CancelRegistry
participant W as Worker
participant S as Storage
Client->>TM: POST /tasks/{id}:cancel
TM->>CR: request_cancel(task_id)
TM->>TM: start force-cancel timer
Note over W: Worker checks is_cancelled...
alt Worker cooperates
W->>S: update_task(canceled)
else Timeout expires
TM->>S: force update_task(canceled)
TM->>TM: cleanup EventBus + CancelRegistry
end
The force-cancel timeout is configurable:
server = A2AServer(
worker=MyWorker(),
agent_card=AgentCardConfig(...),
cancel_force_timeout_s=120.0, # default: 60.0
)
Or via environment variable:
CancelRegistry¶
The CancelRegistry ABC manages cancellation signals:
| Method | Description |
|---|---|
request_cancel(task_id) |
Signal cancellation for a task |
is_cancelled(task_id) |
Check if cancellation was requested |
on_cancel(task_id) |
Return a CancelScope that signals when cancelled |
cleanup(task_id) |
Release resources for a completed task |
The default InMemoryCancelRegistry uses asyncio.Event objects. For distributed deployments, implement a Redis-backed registry.
Cancel Semantics¶
Cancel always goes through CancelRegistry
There is no instant-cancel path — even for submitted tasks. This avoids a race condition where both TaskManager and WorkerAdapter could write to the same task concurrently.
The worker checks is_cancelled before transitioning to working, so submitted tasks are canceled promptly when dequeued.
Cancelled tasks in your worker
You can transition to any terminal state when handling cancellation. Use ctx.fail("Cancelled") for a clear message, or ctx.complete() if you want to save partial results.