Queue-Based Worker
Problem: Consume queued tasks safely with retries and without duplicate side effects.
This example follows the core principles described in the AI Worker Design Patterns and uses the standard Worker Protocol schema.
Key ideas#
- Keep the worker single-purpose and explicit about inputs and outputs.
- Put hard limits in the contract (timeout, retries, tools allowed).
- Make failures machine-actionable with stable error codes.
- Emit structured signals so orchestrators can route, retry, or escalate.
Diagram#
producer -> queue -> worker -> result store
Worker spec#
worker_id: queue-based-worker
version: 1.0
purpose: Consume queued tasks safely with retries and without duplicate side effects.
inputs:
- message: object
- idempotency_key: string
outputs:
- status: string
- outputs: object
constraints:
timeout_seconds: 60
max_tokens: 1500
tools_allowed: [language_model, write_result_store]
retries:
max_attempts: 2
backoff: exponential
observability:
trace_id: required
log_fields: [worker_id, attempt, duration_ms]
Input schema#
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"additionalProperties": false,
"properties": {
"message": {
"type": "object"
},
"idempotency_key": {
"type": "string"
}
},
"required": [
"message",
"idempotency_key"
]
}
Output schema#
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"additionalProperties": true,
"properties": {
"status": {
"type": "string"
},
"outputs": {
"type": "object"
}
}
}
Constraints#
{
"timeout_seconds": 60,
"max_tokens": 1500,
"retries": {
"max_attempts": 2,
"backoff": "exponential"
},
"rate_limit": "per-tenant (example: 10/min)",
"tools_allowed": [
"language_model",
"write_result_store"
]
}
Failure modes & handling#
- Duplicate delivery: detect via idempotency_key and short-circuit.
- Poison message: move to DLQ after max_attempts; error_code=poison_message.
- Downstream store unavailable: retry with backoff; preserve ordering constraints if needed.
Observability signals#
- logs: worker_id, attempt, duration_ms, status, error_code
- metrics: success_count, failure_count, retry_count, p95_duration_ms
- trace fields: trace_id, span_id, upstream_request_id (if present)
Related examples#
See also#
FAQ#
Should the worker return partial results on failure?
If partial results are safe and useful, return them with a stable status and error_code. Otherwise fail fast and let orchestration decide.
Where should large artifacts go?
Store them externally (object storage or DB) and return a reference (URL or artifact ID) in the response.
How should I choose timeouts?
Set a hard ceiling based on SLOs and queue backpressure. Prefer smaller workers with tighter timeouts over monolith workers.