- Notifications
You must be signed in to change notification settings - Fork3
Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.
License
oagudo/outbox
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Lightweight library for thetransactional outbox pattern in Go, not tied to any specific relational database or broker.
- Lightweight: Adds only one external dependency:google/uuid
- Database Agnostic: Works with PostgreSQL, MySQL, Oracle and other relational databases.
- Message Broker Agnostic: Works with any message broker or external system.
- Easy integration: Designed for easy integration into your own projects.
- Observability: Exposes channels for processing errors and discarded messages that you can connect to your metrics and alerting systems.
- Fast Publishing: Optional immediate async message publishing after transaction commit for reduced latency, with guaranteed delivery fallback.
- Configurable Retry & Backoff Policies: Fixed, exponential or custom backoff strategies when delivery fails.
- Max Attempts Safeguard: Automatically discards poison messages that exceed a configurable
maxAttemptsthreshold.
The library consists of two main components:
- Writer: Stores your entity and corresponding message atomically within a transaction
- Reader: Publishes stored messages to your message broker in the background
The Writer ensures your entity and outbox message are stored together atomically:
// Setup database connectiondb,_:=sql.Open("pgx","postgres://user:password@localhost:5432/outbox?sslmode=disable")// Create a DBContext and Writer instancedbCtx:=outbox.NewDBContext(db,outbox.SQLDialectPostgres)writer:=outbox.NewWriter(dbCtx)// In your business logic://// Create your entity and outbox messageentity:=Entity{ID:uuid.New(),CreatedAt:time.Now().UTC(),}payload,_:=json.Marshal(entity)metadata:=json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)msg:=outbox.NewMessage(payload,outbox.WithCreatedAt(entity.CreatedAt),outbox.WithMetadata(metadata))// Write message and entity in a single transactionerr=writer.Write(ctx,msg,// This user-defined callback executes queries within the// same transaction that stores the outbox messagefunc(ctx context.Context,txQueryer outbox.TxQueryer)error {_,err:=txQueryer.ExecContext(ctx,"INSERT INTO entity (id, created_at) VALUES ($1, $2)",entity.ID,entity.CreatedAt, )returnerr })
🚀 Optimistic Publishing (Optional)
Optimistic publishing attempts to publish messages immediately after transaction commit, reducing latency while maintaining guaranteed delivery through the background reader as fallback.
- Transaction commits (entity + outbox message stored)
- Immediate publish attempt to broker (asynchronously, will not block the incoming request)
- On success: message is removed from outbox
- On failure: background reader handles delivery later
// Create publisher (see Reader section below)publisher:=&messagePublisher{}// Enable optimistic publishing in writerwriter:=outbox.NewWriter(dbCtx,outbox.WithOptimisticPublisher(publisher))
Important considerations:
- Publishing happens asynchronously after transaction commit
- Message consumers must be idempotent as messages could be published twice - by the optimistic publisher and by the reader (Note: consumer idempotency is a good practice regardless of optimistic publishing, though some brokers also provide deduplication features)
- Publishing failures don't affect your transactions - they don't cause
Write()to fail
The Reader periodically checks for unsent messages and publishes them to your message broker:
// Create a message publisher implementationtypemessagePublisherstruct {// Your message broker client (e.g., Kafka, RabbitMQ)}func (p*messagePublisher)Publish(ctx context.Context,msg*outbox.Message)error {// Publish the message to your broker. See examples below for specific implementationsreturnnil}// Create and start the readerreader:=outbox.NewReader(dbCtx,// Database context&messagePublisher{},// Publisher implementationoutbox.WithInterval(5*time.Second),// Polling interval (default: 10s)outbox.WithReadBatchSize(200),// Read batch size (default: 100)outbox.WithDeleteBatchSize(50),// Delete batch size (default: 20)outbox.WithMaxAttempts(300),// Discard after 300 attempts (default: MaxInt32)outbox.WithExponentialDelay(// Delay between attempts (default: Exponential; can also use Fixed or Custom)500*time.Millisecond,// Initial delay (default: 200ms)30*time.Minute),// Maximum delay (default: 1h))reader.Start()deferreader.Stop(context.Background())// Stop during application shutdown// Monitor standard processing errors (publish / update / delete / read).gofunc() {forerr:=rangereader.Errors() {switche:=err.(type) {case*outbox.PublishError:log.Printf("Failed to publish message | ID: %s | Error: %v",e.Message.ID,e.Err)case*outbox.UpdateError:log.Printf("Failed to update message | ID: %s | Error: %v",e.Message.ID,e.Err)case*outbox.DeleteError:log.Printf("Batch message deletion failed | Count: %d | Error: %v",len(e.Messages),e.Err)for_,msg:=rangee.Messages {log.Printf("Failed to delete message | ID: %s",msg.ID) }case*outbox.ReadError:log.Printf("Failed to read outbox messages | Error: %v",e.Err)default:log.Printf("Unexpected error occurred | Error: %v",e) } }}()// Monitor discarded messages (hit the max-attempts threshold).gofunc() {formsg:=rangereader.DiscardedMessages() {log.Printf("outbox message %s discarded after %d attempts",msg.ID,msg.TimesAttempted)// Example next steps:// • forward to a dead-letter topic// • raise an alert / metric// • persist for manual inspection }}()
The library supports multiple relational databases. Configure the appropriateSQLDialect when creating theDBContext. Supported dialects are PostgreSQL, MySQL, MariaDB, SQLite, Oracle and SQL Server.
// Example creating a DBContext with MySQL dialectdbCtx:=outbox.NewDBContext(db,outbox.SQLDialectMySQL)
The outbox table stores messages that need to be published to your message broker. Choose your database below:
🐘 PostgreSQL
CREATETABLEIF NOT EXISTS outbox ( id UUIDPRIMARY KEY, created_atTIMESTAMP WITH TIME ZONENOT NULL, scheduled_atTIMESTAMP WITH TIME ZONENOT NULL, metadataBYTEA, payloadBYTEANOT NULL, times_attemptedINTEGERNOT NULL);CREATEINDEXIF NOT EXISTS idx_outbox_created_atON outbox (created_at);CREATEINDEXIF NOT EXISTS idx_outbox_scheduled_atON outbox (scheduled_at);
📊 MySQL
CREATETABLEIF NOT EXISTS outbox ( id BINARY(16)PRIMARY KEY, created_atTIMESTAMP(3)NOT NULL, scheduled_atTIMESTAMP(3)NOT NULL, metadata BLOB, payload BLOBNOT NULL, times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🐬 MariaDB
CREATETABLEIF NOT EXISTS outbox ( id UUIDPRIMARY KEY, created_atTIMESTAMP(3)NOT NULL, scheduled_atTIMESTAMP(3)NOT NULL, metadata BLOB, payload BLOBNOT NULL, times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🗃️ SQLite
CREATETABLEIF NOT EXISTS outbox ( idTEXTPRIMARY KEY, created_at DATETIMENOT NULL, scheduled_at DATETIMENOT NULL, metadata BLOB, payload BLOBNOT NULL, times_attemptedINTEGERNOT NULL);CREATEINDEXIF NOT EXISTS idx_outbox_created_atON outbox (created_at);CREATEINDEXIF NOT EXISTS idx_outbox_scheduled_atON outbox (scheduled_at);
🏛️ Oracle
CREATETABLEoutbox ( id RAW(16)PRIMARY KEY, created_atTIMESTAMP WITH TIME ZONENOT NULL, scheduled_atTIMESTAMP WITH TIME ZONENOT NULL, metadata BLOB, payload BLOBNOT NULL, times_attemptedNUMBER(10)NOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🪟 SQL Server
CREATETABLEoutbox ( id UNIQUEIDENTIFIERPRIMARY KEY, created_at DATETIMEOFFSET(3)NOT NULL, scheduled_at DATETIMEOFFSET(3)NOT NULL, metadata VARBINARY(MAX), payload VARBINARY(MAX)NOT NULL, times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
Complete working examples for different databases and message brokers:
To run an example:
cd examples/postgres-kafka# or examples/oracle-nats or examples/mysql-rabitmq../../scripts/up-and-wait.shgo run service.go# In another terminal trigger a POST to trigger entity creationcurl -X POST http://localhost:8080/entity
When running multiple instances of your service, each with its own reader, be aware that:
- Multiple readers will independently retrieve messages. This can result in messages published more than once. To handle this you can either:
- Ensure your consumers are idempotent and accept duplicates
- Use broker deduplication features if available (e.g. NATS JetStream's Msg-Id)
- Run the reader in a single instance only (e.g. single replica deployment in k8s with reader)
The optimistic publisher feature can significantly reduce the number of duplicates. With optimistic publisher messages are delivered as soon as they are committed, so readers will usually see no messages in the outbox table.
Also note that even in single instance deployments, message duplicates can still occur (e.g. if the service crashes right after successfully publishing to the broker). However, these duplicates are less frequent than when you are running multiple reader instances.
You can usestdlib.OpenDBFromPoolfunction to get a*sql.DB from a*pgxpool.Pool.
import ("github.com/jackc/pgx/v5/pgxpool""github.com/jackc/pgx/v5/stdlib""github.com/oagudo/outbox")// ...pool,_:=pgxpool.New(context.Background(),os.Getenv("DATABASE_URL"))db:=stdlib.OpenDBFromPool(pool)dbCtx:=outbox.NewDBContext(db,outbox.SQLDialectPostgres)
Contributions are welcome! Please feel free to submit a Pull Request.
About
Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.
Topics
Resources
License
Code of conduct
Security policy
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
