The Event-Driven Spine
The event bus and job queue are the permanent, non-negotiable backbone of the platform. All services hang off this spine. It is the first thing built and the last thing changed.
Event Bus: NATS JetStream
NATS JetStream is the central nervous system. Every significant state change in the platform is published as an immutable event.
Why NATS JetStream:
- Single binary, trivial to deploy and operate
- Built-in persistence with configurable retention and replay
- Supports both pub/sub (fan-out) and work queues (competing consumers) natively
- Scales to Kafka-level throughput when needed
- Can be swapped for Kafka without changing event contracts
Event Structure
Every event shares this envelope:
{
"eventId": "uuid-v4",
"eventType": "domain.NounVerb",
"eventVersion": "1.0",
"timestamp": "2026-01-01T12:00:00.000Z",
"source": "service-or-plugin-id",
"correlationId": "uuid-v4",
"data": {}
}
eventType follows domain.NounVerb convention. correlationId traces a chain of related events (e.g., one inbound call → multiple downstream events all share the same correlationId).
Event Domains
| Domain | Emitted by | Examples |
|---|---|---|
connector.* | Connector plugins, Connector Service | connector.DataReceived, connector.SyncCompleted |
communication.* | Communication Service, Channel plugins | communication.MessageReceived, communication.CallStarted |
data.* | Data Service, ETL Service | data.EntityCreated, data.EntityUpdated |
rule.* | Rules Service | rule.ConditionMatched, rule.ActionTriggered |
job.* | Job Queue | job.Enqueued, job.Completed, job.Failed |
ai.* | AI Service | ai.QueryReceived, ai.ResponseGenerated |
system.* | All services | system.ServiceStarted, system.PluginLoaded, system.HealthCheckFailed |
NATS Streams
NATS JetStream organizes events into named streams with configurable retention:
| Stream | Subjects | Retention |
|---|---|---|
platform-events | All *.* events | 30 days (configurable) |
connector-data | connector.* | 7 days |
audit-log | All events (audit copy) | 1 year |
Job Queue: pg-boss (BullMQ aspirational)
A dedicated Job Service translates events into durable, queued jobs. This separates event acknowledgment from work execution. Services can acknowledge an event immediately and process it asynchronously via the job queue.
pg-boss
pg-boss (pg-boss ^10.0.0, confirmed in apps/hub/package.json) is a PostgreSQL-backed job queue. It runs entirely on the Supabase PostgreSQL instance — no additional infrastructure required.
Why pg-boss:
- Zero additional infrastructure (uses Supabase PG already required)
- ACID guarantees — jobs are transactional
- Supports priority-based scheduling per job via
boss.send(..., { priority }) - Good enough for thousands of jobs/day
BullMQ on Redis (aspirational)
Status (2026-04-12 audit): BullMQ is not a direct dependency of
apps/hub. Onlyioredis^5.9.3 is present, and theredis:7-alpineservice indocker-compose.ymlis used for caching/rate-limiting, not a BullMQ job queue. The section below describes the intended graduation path when pg-boss throughput becomes insufficient.
When throughput demands it, the plan is to graduate to BullMQ backed by Redis. The job API is intended to stay the same; only the driver changes.
Queue Naming (current reality)
Status (2026-04-12 audit): The platform does not currently expose three global priority queues. Today each service registers its own domain-specific pg-boss queue name — for example
ConnectorServiceusesconnector.sync(seeapps/hub/src/services/connectors/ConnectorService.ts) and passes a numericpriorityoption onboss.send. Therealtime/interactive/backgroundtier names below are the target taxonomy, not the names used in code today.
| Tier (target) | Target Latency | Use Cases |
|---|---|---|
realtime | < 100ms | Inbound call routing, instant message delivery, live UI updates |
interactive | < 5s | Outbound messages, AI queries, dashboard requests |
background | Minutes–hours | Data sync, search indexing, ETL, cleanup, report generation |
Job Guarantees
- At-least-once delivery — every job runs at least once
- Idempotent handlers — every handler checks an idempotency key (typically the
eventIdthat triggered the job) before executing - Exponential backoff retries — failed jobs retry with increasing delays
- Dead-letter queue — after max retries, jobs land in DLQ for manual inspection
- Scheduled jobs — cron-style scheduling for periodic sync, SLA timeouts, reminders
Common Job Types
| Job | Queue | Triggered by |
|---|---|---|
ProcessIncomingDataJob | background | connector.DataReceived |
SyncConnectorDataJob | background | Cron / manual |
IndexDocumentJob | background | data.EntityCreated, data.EntityUpdated |
RunRuleEngineJob | realtime | communication.*, connector.LocationUpdated |
SendOutgoingMessageJob | interactive | Rule action, user action |
PlaceCallJob | realtime | Rule action, user action |
SendFaxJob | interactive | Rule action |
ExecuteLLMQueryJob | interactive | ai.QueryReceived |
WriteEntityJob | background | ETL output |
Observability
Every event published includes a correlationId. Job payloads include the triggering eventId. This enables full trace reconstruction:
incoming_call (communication.CallStarted, correlationId=X)
└─► RunRuleEngineJob (correlationId=X)
└─► rule.ActionTriggered (correlationId=X)
└─► SendOutgoingMessageJob (correlationId=X)
└─► communication.MessageSent (correlationId=X)
OpenTelemetry trace context is carried in the event envelope and propagated through job payloads.