go-outbox implements the outbox pattern for Go applications. It stores messages in a database table as part of the same transaction as your business logic and dispatches them asynchronously to your message broker.
- Generic API that works with any message type
- Built in PostgreSQL store with notification triggers
- Message broker integrations for Kafka (franz-go) and NATS (including JetStream)
- Pluggable dead letter queue interface
- Configurable concurrency and retry behaviour
- Uses
encoding/gobfor message serialization by default
go get github.com/nrfta/go-outboxThe snippet below shows how to configure an outbox using PostgreSQL and NATS JetStream:
package main
import (
"database/sql"
"log/slog"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/nrfta/go-outbox"
mbNats "github.com/nrfta/go-outbox/mb/nats"
outboxPg "github.com/nrfta/go-outbox/store/pg"
)
func NewNatsOutbox(db *sql.DB, connStr string) (outbox.Outbox[*nats.Msg], error) {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return nil, err
}
js, err := jetstream.New(nc)
if err != nil {
return nil, err
}
broker, err := mbNats.NewJetstream(js)
if err != nil {
return nil, err
}
store, err := outboxPg.NewStore(db, connStr)
if err != nil {
return nil, err
}
ob := outbox.New[*nats.Msg](
store,
broker,
logDeadLetterQueue{},
outbox.WithLogger[*nats.Msg](slog.Default()),
outbox.WithMaxRetries[*nats.Msg](10),
outbox.WithNumberOfRoutines[*nats.Msg](5),
)
return ob, nil
}Messages are queued by storing them in the outbox table inside the same
database transaction as your business logic. The outbox library then
dispatches them asynchronously in the background. Use SendTx to add a
message to the queue:
ctx := context.Background()
tx, _ := db.Begin()
msg := &nats.Msg{Subject: "hello", Data: []byte("world")}
if err := ob.SendTx(ctx, tx, msg); err != nil {
tx.Rollback()
return err
}
return tx.Commit()The outbox.New constructor accepts several optional configuration functions:
WithNumberOfRoutines(n int)– limits the number of goroutines used to dispatch messagesWithMaxRetries(n int)– maximum number of attempts before sending a message to the dead letter queueWithLogger(*slog.Logger)– provide a custom logger
You can provide your own store or message broker by implementing the Store and MessageBroker interfaces found in outbox.go.
The testing package provides synchronous implementations of outbox interfaces for deterministic, race-condition-free testing. Instead of processing messages asynchronously with background workers, these implementations deliver messages synchronously when they're created.
- Deterministic execution – No async race conditions or timing issues
- Immediate processing – Events handled synchronously, no waiting required
- Full stack traces – See the complete call chain in test failures
- No infrastructure – Tests run without NATS or background workers
- Simpler debugging – Step through event handling in debugger
import (
"github.com/nats-io/nats.go/jetstream"
"github.com/nrfta/go-outbox"
"github.com/nrfta/go-outbox/testing"
)
// Setup synchronous testing infrastructure
syncJS := testing.NewSyncJetStream()
syncBroker := testing.NewSyncBroker(syncJS)
syncStore := testing.NewSyncStore(syncBroker)
// Create outbox with sync components
ob, err := outbox.New(syncStore, syncBroker)
if err != nil {
panic(err)
}
// Override JetStream in your DI container
// Events will now be processed synchronouslyThe sync implementations work together:
SyncStoredecodes messages and sends them to the broker immediately (no DB storage)SyncBrokerroutes messages toSyncJetStreamSyncJetStreamdelivers messages directly to registered consumers
See testing/doc.go for detailed documentation.
Integration tests require a running PostgreSQL instance. You can run all tests with:
go test ./...The CI workflow spins up Postgres automatically.