Somewhere in most event-driven backends, there's a method that looks like this:
async placeOrder(input: PlaceOrderInput) {
await this.db.insert(orders).values(input); // 1. write the row
await this.kafka.send({ topic: 'order.placed', … }); // 2. publish the event
}
It looks fine, and it has a bug. If the process crashes between 1 and 2, the order exists but the event never happened — downstream consumers silently miss it. Swap the order and you get the opposite failure: an event for an order that was rolled back. There is no try/catch arrangement that fixes this, because a database and a broker cannot commit atomically. This is the dual-write problem.
The boring, proven fix is the transactional outbox: don't publish in step 2. Instead, write the event into an outbox_events table in the same database transaction as the business row. A background worker then relays committed rows to the broker. The transaction is the only atomic boundary you have — so put both writes inside it.
That gives you at-least-once delivery, which means the consumer side needs the mirror-image pattern: an idempotent inbox that deduplicates redeliveries, so the side effect runs exactly once even when Kafka delivers twice.
@nest-native/messaging — part of nest-native, a set of NestJS integrations — packages this pair as a library, after the pattern was first built by hand inside a reference application. It's the outbox/inbox pattern for the Drizzle ORM + NestJS stack — a niche the existing NestJS outbox libraries (which target TypeORM and MikroORM — see nestjs-outbox and nestjs-inbox-outbox, both solid) don't cover.
The producer half: enqueue inside your transaction
The library ships the tables as Drizzle factories per dialect (SQLite, Postgres, MySQL). You add them to your schema and generate a migration like any other table:
// schema.ts
export { outboxEvents, inboxEvents } from '@nest-native/messaging/sqlite'; // or /postgres, /mysql
Transactions ride on @nestjs-cls/transactional with its Drizzle adapter — the same @Transactional() decorator you'd use anyway:
MessagingModule.forRoot({
drizzleInstanceToken: DRIZZLE, // your Drizzle DI token
outboxStore: new SqliteOutboxStore(), // or PostgresOutboxStore / MysqlOutboxStore
inboxStore: new SqliteInboxStore(),
transport, // where the claimer relays to — below
}),
Then the business code:
@Injectable()
export class OrderService {
constructor(
@InjectTransaction() private readonly db: AppDatabase,
private readonly producer: OutboxProducer<SqliteOutboxStore>,
) {}
@Transactional()
placeOrder(id: string, item: string) {
this.db.insert(orders).values({ id, item }).run();
this.producer.enqueue({
topic: 'order.placed',
payload: { id, item },
idempotencyKey: `order:${id}`,
});
}
}
The order row and the outbox row commit together, or roll back together. A throw after enqueue produces no phantom event; a crash after commit loses nothing, because the event is durably in your database.
(One nuance the library handles for you: better-sqlite3 transactions are synchronous while Postgres/MySQL are async. The per-dialect stores own that difference — on SQLite enqueue returns the row directly inside the sync transaction body; on Postgres you await it. Same code shape either way.)
Relaying: the claimer and the worker
A claimer polls for committed rows, publishes each through a transport, and applies retry-with-backoff on failure — including reclaiming rows from a worker that died mid-flight:
// scripts/start-worker.ts
const app = await NestFactory.createApplicationContext(AppModule);
await runWorkerLoop(app.get(OutboxClaimer), {
pollIntervalMs: 2_000,
signal: shutdownSignal, // AbortSignal wired to SIGTERM
});
For Kafka the transport is one line, built on @nest-native/kafka (Confluent's official JS client underneath):
MessagingModule.forRootAsync({
drizzleInstanceToken: DRIZZLE,
outboxStore: new SqliteOutboxStore(),
inboxStore: new SqliteInboxStore(),
inject: [KafkaProducerService],
useTransport: (producer) => new KafkaOutboxTransport(producer),
}),
Don't have Kafka yet? There's an in-process transport (@nest-native/messaging/in-process) — a topic→handler registry with the same at-least-once semantics — so a modular monolith can adopt the pattern today and swap the transport for a broker later without touching a line of domain code.
The consumer half: exactly-once effects
Kafka is at-least-once by contract, so redelivery is a when, not an if. The inbox primitive is a single method:
const outcome = await inbox.runOnce(dedupKey, source, () => {
// your side effect — runs in the SAME transaction as the dedup row
this.audit.record({ … });
});
// 'processed' on the first delivery, 'duplicate' on any redelivery
runOnce inserts a (source, message_key) row protected by a unique index and runs your side effect in the same transaction. A redelivery violates the index → 'duplicate' → the side effect is skipped. If your side effect throws, the dedup row rolls back with it, so the retry reprocesses cleanly. That composition — unique index + shared transaction — is the entire trick, and it's provable in the database.
For Kafka consumers the library wraps the full delivery decision (validate → dedup → ack / dead-letter / redeliver) in an engine you delegate to from a thin @KafkaConsumer shell:
@KafkaConsumer('order.placed', { groupId: 'orders-service' })
export class OrderConsumer {
constructor(private readonly inbox: KafkaInboxConsumer, private readonly audit: OrderAuditService) {}
@KafkaHandler()
async handle(@KafkaMessage() payload: unknown, @KafkaHeaders() headers: Headers, @KafkaCtx() ctx: KafkaContext) {
await this.inbox.consume<OrderPlaced>({
source: 'order.placed:orders-service',
context: ctx, headers, payload,
validate: isOrderPlaced, // poison message → DLQ, then ack
sideEffect: (order, dedupKey) => this.audit.record(order, dedupKey),
dlqTopic: 'order.placed.DLQ',
});
}
}
Poison messages (unparseable, unkeyable) go to a dead-letter topic instead of redelivering forever; transient failures rethrow so the broker redelivers; duplicates ack silently.
Testing without a broker
Everything above runs in tests with no infrastructure: an in-memory outbox transport (@nest-native/messaging/testing) for the producer half, and @nest-native/kafka/testing's in-memory broker for the full pipeline — including redelivery:
await broker.emit('order.placed', publishedMessage); // redeliver the same message
await broker.idle(); // wait for handler pipelines to settle
expect(auditRows).toHaveLength(1); // side effect ran exactly once
The library itself is tested at 100% coverage against real SQLite, real in-process Postgres (pglite), and the in-memory broker.
See the whole thing running
The pattern is one chapter of a larger, runnable story: the nest-native reference app is a multi-tenant work-tracking SaaS where every task write emits task.created/assigned/completed through this outbox, a consumer builds an activity feed through this inbox, the event contracts are published as an AsyncAPI 3.0 catalog, and a streaming AI assistant summarizes the activity — six libraries, one coherent journey, green tests, no Docker required for the default profile.
Honest scope
- This is the app-level outbox. At larger scale you may prefer CDC (Debezium + Kafka Connect) tailing the WAL — different trade-offs, no app code, more infrastructure.
- If you're on TypeORM or MikroORM, the libraries linked above already serve you well.
@nest-native/messagingexists specifically because nothing covered Drizzle. - Delivery is at-least-once end to end; the inbox gives you exactly-once effects, which is the guarantee that actually matters — exactly-once delivery across a network isn't something any tool can promise.
Docs: nest-native.dev/messaging · Source: github.com/nest-native/messaging. Feedback and issues welcome — especially if you hit a case the pattern doesn't cover.