Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

License

NotificationsYou must be signed in to change notification settings

oagudo/outbox

Repository files navigation

ReleaseSoftware LicenseGitHub ActionscodecovGo Report CardGo Reference

Lightweight library for thetransactional outbox pattern in Go, not tied to any specific relational database or broker.

Key Features

  • Lightweight: Adds only one external dependency:google/uuid
  • Database Agnostic: Works with PostgreSQL, MySQL, Oracle and other relational databases.
  • Message Broker Agnostic: Works with any message broker or external system.
  • Easy integration: Designed for easy integration into your own projects.
  • Observability: Exposes channels for processing errors and discarded messages that you can connect to your metrics and alerting systems.
  • Fast Publishing: Optional immediate async message publishing after transaction commit for reduced latency, with guaranteed delivery fallback.
  • Configurable Retry & Backoff Policies: Fixed, exponential or custom backoff strategies when delivery fails.
  • Max Attempts Safeguard: Automatically discards poison messages that exceed a configurablemaxAttempts threshold.

Usage

The library consists of two main components:

  1. Writer: Stores your entity and corresponding message atomically within a transaction
  2. Reader: Publishes stored messages to your message broker in the background

The Writer

The Writer ensures your entity and outbox message are stored together atomically:

// Setup database connectiondb,_:=sql.Open("pgx","postgres://user:password@localhost:5432/outbox?sslmode=disable")// Create a DBContext and Writer instancedbCtx:=outbox.NewDBContext(db,outbox.SQLDialectPostgres)writer:=outbox.NewWriter(dbCtx)// In your business logic://// Create your entity and outbox messageentity:=Entity{ID:uuid.New(),CreatedAt:time.Now().UTC(),}payload,_:=json.Marshal(entity)metadata:=json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)msg:=outbox.NewMessage(payload,outbox.WithCreatedAt(entity.CreatedAt),outbox.WithMetadata(metadata))// Write message and entity in a single transactionerr=writer.Write(ctx,msg,// This user-defined callback executes queries within the// same transaction that stores the outbox messagefunc(ctx context.Context,txQueryer outbox.TxQueryer)error {_,err:=txQueryer.ExecContext(ctx,"INSERT INTO entity (id, created_at) VALUES ($1, $2)",entity.ID,entity.CreatedAt,        )returnerr    })
🚀 Optimistic Publishing (Optional)

Optimistic publishing attempts to publish messages immediately after transaction commit, reducing latency while maintaining guaranteed delivery through the background reader as fallback.

How It Works

  1. Transaction commits (entity + outbox message stored)
  2. Immediate publish attempt to broker (asynchronously, will not block the incoming request)
  3. On success: message is removed from outbox
  4. On failure: background reader handles delivery later

Configuration

// Create publisher (see Reader section below)publisher:=&messagePublisher{}// Enable optimistic publishing in writerwriter:=outbox.NewWriter(dbCtx,outbox.WithOptimisticPublisher(publisher))

Important considerations:

  • Publishing happens asynchronously after transaction commit
  • Message consumers must be idempotent as messages could be published twice - by the optimistic publisher and by the reader (Note: consumer idempotency is a good practice regardless of optimistic publishing, though some brokers also provide deduplication features)
  • Publishing failures don't affect your transactions - they don't causeWrite() to fail

The Reader

The Reader periodically checks for unsent messages and publishes them to your message broker:

// Create a message publisher implementationtypemessagePublisherstruct {// Your message broker client (e.g., Kafka, RabbitMQ)}func (p*messagePublisher)Publish(ctx context.Context,msg*outbox.Message)error {// Publish the message to your broker. See examples below for specific implementationsreturnnil}// Create and start the readerreader:=outbox.NewReader(dbCtx,// Database context&messagePublisher{},// Publisher implementationoutbox.WithInterval(5*time.Second),// Polling interval (default: 10s)outbox.WithReadBatchSize(200),// Read batch size (default: 100)outbox.WithDeleteBatchSize(50),// Delete batch size (default: 20)outbox.WithMaxAttempts(300),// Discard after 300 attempts (default: MaxInt32)outbox.WithExponentialDelay(// Delay between attempts (default: Exponential; can also use Fixed or Custom)500*time.Millisecond,// Initial delay (default: 200ms)30*time.Minute),// Maximum delay (default: 1h))reader.Start()deferreader.Stop(context.Background())// Stop during application shutdown// Monitor standard processing errors (publish / update / delete / read).gofunc() {forerr:=rangereader.Errors() {switche:=err.(type) {case*outbox.PublishError:log.Printf("Failed to publish message | ID: %s | Error: %v",e.Message.ID,e.Err)case*outbox.UpdateError:log.Printf("Failed to update message | ID: %s | Error: %v",e.Message.ID,e.Err)case*outbox.DeleteError:log.Printf("Batch message deletion failed | Count: %d | Error: %v",len(e.Messages),e.Err)for_,msg:=rangee.Messages {log.Printf("Failed to delete message | ID: %s",msg.ID)            }case*outbox.ReadError:log.Printf("Failed to read outbox messages | Error: %v",e.Err)default:log.Printf("Unexpected error occurred | Error: %v",e)        }    }}()// Monitor discarded messages (hit the max-attempts threshold).gofunc() {formsg:=rangereader.DiscardedMessages() {log.Printf("outbox message %s discarded after %d attempts",msg.ID,msg.TimesAttempted)// Example next steps://   • forward to a dead-letter topic//   • raise an alert / metric//   • persist for manual inspection    }}()

Database Setup

1. Choose Your Database Dialect

The library supports multiple relational databases. Configure the appropriateSQLDialect when creating theDBContext. Supported dialects are PostgreSQL, MySQL, MariaDB, SQLite, Oracle and SQL Server.

// Example creating a DBContext with MySQL dialectdbCtx:=outbox.NewDBContext(db,outbox.SQLDialectMySQL)

2. Create the Outbox Table

The outbox table stores messages that need to be published to your message broker. Choose your database below:

🐘 PostgreSQL
CREATETABLEIF NOT EXISTS outbox (    id UUIDPRIMARY KEY,    created_atTIMESTAMP WITH TIME ZONENOT NULL,    scheduled_atTIMESTAMP WITH TIME ZONENOT NULL,    metadataBYTEA,    payloadBYTEANOT NULL,    times_attemptedINTEGERNOT NULL);CREATEINDEXIF NOT EXISTS idx_outbox_created_atON outbox (created_at);CREATEINDEXIF NOT EXISTS idx_outbox_scheduled_atON outbox (scheduled_at);
📊 MySQL
CREATETABLEIF NOT EXISTS outbox (    id BINARY(16)PRIMARY KEY,    created_atTIMESTAMP(3)NOT NULL,    scheduled_atTIMESTAMP(3)NOT NULL,    metadata BLOB,    payload BLOBNOT NULL,    times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🐬 MariaDB
CREATETABLEIF NOT EXISTS outbox (    id UUIDPRIMARY KEY,    created_atTIMESTAMP(3)NOT NULL,    scheduled_atTIMESTAMP(3)NOT NULL,    metadata BLOB,    payload BLOBNOT NULL,    times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🗃️ SQLite
CREATETABLEIF NOT EXISTS outbox (    idTEXTPRIMARY KEY,    created_at DATETIMENOT NULL,    scheduled_at DATETIMENOT NULL,    metadata BLOB,    payload BLOBNOT NULL,    times_attemptedINTEGERNOT NULL);CREATEINDEXIF NOT EXISTS idx_outbox_created_atON outbox (created_at);CREATEINDEXIF NOT EXISTS idx_outbox_scheduled_atON outbox (scheduled_at);
🏛️ Oracle
CREATETABLEoutbox (    id RAW(16)PRIMARY KEY,    created_atTIMESTAMP WITH TIME ZONENOT NULL,    scheduled_atTIMESTAMP WITH TIME ZONENOT NULL,    metadata BLOB,    payload BLOBNOT NULL,    times_attemptedNUMBER(10)NOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);
🪟 SQL Server
CREATETABLEoutbox (    id UNIQUEIDENTIFIERPRIMARY KEY,    created_at DATETIMEOFFSET(3)NOT NULL,    scheduled_at DATETIMEOFFSET(3)NOT NULL,    metadata VARBINARY(MAX),    payload VARBINARY(MAX)NOT NULL,    times_attemptedINTNOT NULL);CREATEINDEXidx_outbox_created_atON outbox (created_at);CREATEINDEXidx_outbox_scheduled_atON outbox (scheduled_at);

Examples

Complete working examples for different databases and message brokers:

To run an example:

cd examples/postgres-kafka# or examples/oracle-nats or examples/mysql-rabitmq../../scripts/up-and-wait.shgo run service.go# In another terminal trigger a POST to trigger entity creationcurl -X POST http://localhost:8080/entity

FAQ

What happens when multiple instances of my service use the library?

When running multiple instances of your service, each with its own reader, be aware that:

  • Multiple readers will independently retrieve messages. This can result in messages published more than once. To handle this you can either:
    1. Ensure your consumers are idempotent and accept duplicates
    2. Use broker deduplication features if available (e.g. NATS JetStream's Msg-Id)
    3. Run the reader in a single instance only (e.g. single replica deployment in k8s with reader)

The optimistic publisher feature can significantly reduce the number of duplicates. With optimistic publisher messages are delivered as soon as they are committed, so readers will usually see no messages in the outbox table.

Also note that even in single instance deployments, message duplicates can still occur (e.g. if the service crashes right after successfully publishing to the broker). However, these duplicates are less frequent than when you are running multiple reader instances.

How to instantiate aDBContext when usingpgxpool ?

You can usestdlib.OpenDBFromPoolfunction to get a*sql.DB from a*pgxpool.Pool.

import ("github.com/jackc/pgx/v5/pgxpool""github.com/jackc/pgx/v5/stdlib""github.com/oagudo/outbox")// ...pool,_:=pgxpool.New(context.Background(),os.Getenv("DATABASE_URL"))db:=stdlib.OpenDBFromPool(pool)dbCtx:=outbox.NewDBContext(db,outbox.SQLDialectPostgres)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

About

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp