Aggregator Worker
Problem: Merge partial results from parallel workers into a single output contract.
This example follows the core principles described in the AI Worker Design Patterns and uses the standard Worker Protocol schema.
Key ideas#
- Keep the worker single-purpose and explicit about inputs and outputs.
- Put hard limits in the contract (timeout, retries, tools allowed).
- Make failures machine-actionable with stable error codes.
- Emit structured signals so orchestrators can route, retry, or escalate.
Diagram#
parts[] -> merge -> merged + conflicts
Worker spec#
worker_id: aggregator-worker
version: 1.0
purpose: Merge partial results from parallel workers into a single output contract.
inputs:
- parts: array
- merge_policy: enum(strict, best_effort)
outputs:
- merged: object
- conflicts: array
- completeness: number
constraints:
timeout_seconds: 60
max_tokens: 1500
tools_allowed: [language_model (optional), merge_rules]
retries:
max_attempts: 2
backoff: exponential
observability:
trace_id: required
log_fields: [worker_id, attempt, duration_ms]
Input schema#
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"additionalProperties": false,
"properties": {
"parts": {
"type": "array"
},
"merge_policy": {
"type": "string",
"enum": [
"strict",
"best_effort"
]
}
},
"required": [
"parts",
"merge_policy"
]
}
Output schema#
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"additionalProperties": true,
"properties": {
"merged": {
"type": "object"
},
"conflicts": {
"type": "array"
},
"completeness": {
"type": "number"
}
}
}
Constraints#
{
"timeout_seconds": 60,
"max_tokens": 1500,
"retries": {
"max_attempts": 2,
"backoff": "exponential"
},
"rate_limit": "per-tenant (example: 10/min)",
"tools_allowed": [
"language_model (optional)",
"merge_rules"
]
}
Failure modes & handling#
- Conflicting fields under strict policy: error_code=merge_conflict (non-retryable unless upstream changes).
- Missing required parts: error_code=missing_parts, retryable=true if upstream is transient.
- Oversized payload: store parts externally and pass references.
Observability signals#
- logs: worker_id, attempt, duration_ms, status, error_code
- metrics: success_count, failure_count, retry_count, p95_duration_ms
- trace fields: trace_id, span_id, upstream_request_id (if present)
Related examples#
See also#
FAQ#
Should the worker return partial results on failure?
If partial results are safe and useful, return them with a stable status and error_code. Otherwise fail fast and let orchestration decide.
Where should large artifacts go?
Store them externally (object storage or DB) and return a reference (URL or artifact ID) in the response.
How should I choose timeouts?
Set a hard ceiling based on SLOs and queue backpressure. Prefer smaller workers with tighter timeouts over monolith workers.