AI Worker worker.md

Aggregator Worker

Problem: Merge partial results from parallel workers into a single output contract.

This example follows the core principles described in the AI Worker Design Patterns and uses the standard Worker Protocol schema.

Key ideas#

  • Keep the worker single-purpose and explicit about inputs and outputs.
  • Put hard limits in the contract (timeout, retries, tools allowed).
  • Make failures machine-actionable with stable error codes.
  • Emit structured signals so orchestrators can route, retry, or escalate.

Diagram#

parts[] -> merge -> merged + conflicts

Worker spec#

worker_id: aggregator-worker
version: 1.0
purpose: Merge partial results from parallel workers into a single output contract.
inputs:
  - parts: array
  - merge_policy: enum(strict, best_effort)
outputs:
  - merged: object
  - conflicts: array
  - completeness: number
constraints:
  timeout_seconds: 60
  max_tokens: 1500
  tools_allowed: [language_model (optional), merge_rules]
retries:
  max_attempts: 2
  backoff: exponential
observability:
  trace_id: required
  log_fields: [worker_id, attempt, duration_ms]

Input schema#

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "type": "object",
  "additionalProperties": false,
  "properties": {
    "parts": {
      "type": "array"
    },
    "merge_policy": {
      "type": "string",
      "enum": [
        "strict",
        "best_effort"
      ]
    }
  },
  "required": [
    "parts",
    "merge_policy"
  ]
}

Output schema#

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "merged": {
      "type": "object"
    },
    "conflicts": {
      "type": "array"
    },
    "completeness": {
      "type": "number"
    }
  }
}

Constraints#

{
  "timeout_seconds": 60,
  "max_tokens": 1500,
  "retries": {
    "max_attempts": 2,
    "backoff": "exponential"
  },
  "rate_limit": "per-tenant (example: 10/min)",
  "tools_allowed": [
    "language_model (optional)",
    "merge_rules"
  ]
}

Failure modes & handling#

  • Conflicting fields under strict policy: error_code=merge_conflict (non-retryable unless upstream changes).
  • Missing required parts: error_code=missing_parts, retryable=true if upstream is transient.
  • Oversized payload: store parts externally and pass references.

Observability signals#

  • logs: worker_id, attempt, duration_ms, status, error_code
  • metrics: success_count, failure_count, retry_count, p95_duration_ms
  • trace fields: trace_id, span_id, upstream_request_id (if present)

See also#

FAQ#

Should the worker return partial results on failure?

If partial results are safe and useful, return them with a stable status and error_code. Otherwise fail fast and let orchestration decide.

Where should large artifacts go?

Store them externally (object storage or DB) and return a reference (URL or artifact ID) in the response.

How should I choose timeouts?

Set a hard ceiling based on SLOs and queue backpressure. Prefer smaller workers with tighter timeouts over monolith workers.