Skip to content

nrfta/go-outbox

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

go-outbox

go-outbox implements the outbox pattern for Go applications. It stores messages in a database table as part of the same transaction as your business logic and dispatches them asynchronously to your message broker.

Features

  • Generic API that works with any message type
  • Built in PostgreSQL store with notification triggers
  • Message broker integrations for Kafka (franz-go) and NATS (including JetStream)
  • Pluggable dead letter queue interface
  • Configurable concurrency and retry behaviour
  • Uses encoding/gob for message serialization by default

Installation

go get github.com/nrfta/go-outbox

Quick start

The snippet below shows how to configure an outbox using PostgreSQL and NATS JetStream:

package main

import (
    "database/sql"
    "log/slog"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"

    "github.com/nrfta/go-outbox"
    mbNats "github.com/nrfta/go-outbox/mb/nats"
    outboxPg "github.com/nrfta/go-outbox/store/pg"
)

func NewNatsOutbox(db *sql.DB, connStr string) (outbox.Outbox[*nats.Msg], error) {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        return nil, err
    }

    js, err := jetstream.New(nc)
    if err != nil {
        return nil, err
    }

    broker, err := mbNats.NewJetstream(js)
    if err != nil {
        return nil, err
    }

    store, err := outboxPg.NewStore(db, connStr)
    if err != nil {
        return nil, err
    }

    ob := outbox.New[*nats.Msg](
        store,
        broker,
        logDeadLetterQueue{},
        outbox.WithLogger[*nats.Msg](slog.Default()),
        outbox.WithMaxRetries[*nats.Msg](10),
        outbox.WithNumberOfRoutines[*nats.Msg](5),
    )
    return ob, nil
}

Sending messages

Messages are queued by storing them in the outbox table inside the same database transaction as your business logic. The outbox library then dispatches them asynchronously in the background. Use SendTx to add a message to the queue:

ctx := context.Background()
tx, _ := db.Begin()

msg := &nats.Msg{Subject: "hello", Data: []byte("world")}
if err := ob.SendTx(ctx, tx, msg); err != nil {
    tx.Rollback()
    return err
}
return tx.Commit()

Options

The outbox.New constructor accepts several optional configuration functions:

  • WithNumberOfRoutines(n int) – limits the number of goroutines used to dispatch messages
  • WithMaxRetries(n int) – maximum number of attempts before sending a message to the dead letter queue
  • WithLogger(*slog.Logger) – provide a custom logger

You can provide your own store or message broker by implementing the Store and MessageBroker interfaces found in outbox.go.

Testing

The testing package provides synchronous implementations of outbox interfaces for deterministic, race-condition-free testing. Instead of processing messages asynchronously with background workers, these implementations deliver messages synchronously when they're created.

Benefits

  • Deterministic execution – No async race conditions or timing issues
  • Immediate processing – Events handled synchronously, no waiting required
  • Full stack traces – See the complete call chain in test failures
  • No infrastructure – Tests run without NATS or background workers
  • Simpler debugging – Step through event handling in debugger

Usage Example

import (
    "github.com/nats-io/nats.go/jetstream"
    "github.com/nrfta/go-outbox"
    "github.com/nrfta/go-outbox/testing"
)

// Setup synchronous testing infrastructure
syncJS := testing.NewSyncJetStream()
syncBroker := testing.NewSyncBroker(syncJS)
syncStore := testing.NewSyncStore(syncBroker)

// Create outbox with sync components
ob, err := outbox.New(syncStore, syncBroker)
if err != nil {
    panic(err)
}

// Override JetStream in your DI container
// Events will now be processed synchronously

The sync implementations work together:

  1. SyncStore decodes messages and sends them to the broker immediately (no DB storage)
  2. SyncBroker routes messages to SyncJetStream
  3. SyncJetStream delivers messages directly to registered consumers

See testing/doc.go for detailed documentation.

Running tests

Integration tests require a running PostgreSQL instance. You can run all tests with:

go test ./...

The CI workflow spins up Postgres automatically.

About

Outbox pattern for message broker

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages