Message Queues (RabbitMQ)
The platform uses RabbitMQ for asynchronous job processing. Three main pipelines rely on queues.
Document Processing Pipeline
| Queue | Producer | Consumer | Payload |
|---|---|---|---|
parsing_jobs | document-service | parser-service worker | Document ID, file content, parser method, options |
parsing_results | parser-service worker | document-service | Parsed markdown, metadata, chunk data |
parsing_progress | parser-service worker | document-service | Progress percentage, current stage |
embedding_jobs | document-service | embedding-service worker | Document ID, parsed text, chunking config |
embedding_results | embedding-service worker | document-service | Chunk count, embedding count, status |
embedding_progress | embedding-service worker | document-service | Progress percentage, current stage |
Flow
document-service --[parsing_jobs]--> parser-service worker
parser-service worker --[parsing_results]--> document-service
document-service --[embedding_jobs]--> embedding-service worker
embedding-service worker --[embedding_results]--> document-service
Transaction Events
| Queue | Producer | Consumer | Payload |
|---|---|---|---|
transaction | completion-service | llm-core | Model name, token counts (input/output), provider, timestamp |
After each LLM completion, the completion service publishes a transaction event. llm-core consumes it and stores it in the model_transactions table for usage tracking.
Message Format
All RabbitMQ messages between NestJS services use the NestJS microservices format:
{
"pattern": "queue_name",
"data": { ... },
"id": "correlation-id"
}
Python services (parser-service, embedding-service) format their messages to match this pattern for compatibility.
Infrastructure
- Exchanges:
embedding.events,document.events(topic exchanges) - Dead Letter Queues (DLQ) configured for failed messages
- Retry with exponential backoff on the embedding-service worker
- Prefetch count configurable per consumer
- Connection managed via
amqp-connection-manager(NestJS) andaio-pika(Python)
Health Checks
document-service includes RabbitMQ connectivity in its /health endpoint. embedding-service checks RabbitMQ in its /readyz endpoint.