Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Implementing the Outbox Pattern
Milan Jovanović
Milan Jovanović

Posted on • Originally published atmilanjovanovic.tech on

Implementing the Outbox Pattern

In distributed systems, we often face the challenge of keeping our database and external systems in sync. Imagine saving an order to a database and then publishing a message to a message broker. If either operation fails, your system ends up in an inconsistent state.

TheOutbox pattern solves this problem by treating message publication as part of your database transaction. Instead of publishing messages directly, we save them to an Outbox table in our database, ensuring atomic operations. A separate process then reliably publishes these messages.

In this newsletter, we'll dive into implementing this pattern in .NET, covering everything from setup to scaling.

Why Do We Need the Outbox Pattern?

The transactional Outbox pattern fixes a common problem in distributed systems. This problem happens when you need to do two things at once: save data and communicate with an external component.

Consider scenarios like sending order confirmation emails, notifying other systems about new client registrations, or updating inventory levels after an order is placed. Each of these involves a local data change coupled with an external communication or update.

For example, imagine a microservice that needs to:

  • Save a new order in its database
  • Tell other systems about this new order

If one of these steps fails, your system could end up in an inconsistent state. Maybe the order is saved, but no one else knows about it. Or everyone thinks there's a new order, but it's not actually in the database.

Here's aCreateOrderCommandHandler without the Outbox pattern:

publicclassCreateOrderCommandHandler(IOrderRepositoryorderRepository,IProductInventoryCheckerinventoryChecker,IUnitOfWorkunitOfWork,IEventBuseventBus):IRequestHandler<CreateOrderCommand,OrderDto>{publicasyncTask<OrderDto>Handle(CreateOrderCommandrequest,CancellationTokencancellationToken){varorder=newOrder(request.CustomerId,request.ProductId,request.Quantity,inventoryChecker);awaitorderRepository.AddAsync(order);awaitunitOfWork.CommitAsync(cancellationToken);// The database transaction is completed at this point.awaiteventBus.Send(newOrderCreatedIntegrationEvent(order.Id));returnnewOrderDto{Id=order.Id,Total=order.Total};}}
Enter fullscreen modeExit fullscreen mode

This code has a potential consistency problem. After the database transaction is committed, two things could go wrong:

  1. The application might crash right after the transaction is committed but before the event is sent. The order would be created in the database, but other systems wouldn't know about it.

  2. The event bus might be down or unreachable when we try to send the event. This would result in the order being created without notifying other systems.

The transactional Outbox pattern helps solve this problem by ensuring that the database update and event publication are treated as a single atomic operation.

Image description

The sequence diagram illustrates how the Outbox pattern solves our consistency challenge. Instead of trying to save data and send a message as separate steps, we save both the order and an Outbox message in asingle database transaction. This is an all-or-nothing operation - we can't end up in an inconsistent state.

A separate Outbox processor handles the actual message sending. It continuously checks for unsent messages in the Outbox table and publishes them to the message queue. The processor marks messages as sent after successful publishing, preventing duplicates.

An important thing to realize here is that the Outbox pattern gives usat-least-once delivery. The Outbox message will be sent at least once, but it could also be sent multiple times in case of retries. This means we have to make our message consumers idempotent.

Implementing the Outbox Pattern

First, let's create our Outbox table where we will store messages:

CREATETABLEoutbox_messages(idUUIDPRIMARYKEY,typeVARCHAR(255)NOTNULL,contentJSONBNOTNULL,occurred_on_utcTIMESTAMPWITHTIMEZONENOTNULL,processed_on_utcTIMESTAMPWITHTIMEZONENULL,errorTEXTNULL);-- We can consider adding this index since we will be querying for unprocessed messages often-- and it will contain the rows in the correct sort order for our query.CREATEINDEXIFNOTEXISTSidx_outbox_messages_unprocessedONoutbox_messages(occurred_on_utc,processed_on_utc)INCLUDE(id,type,content)WHEREprocessed_on_utcISNULL;
Enter fullscreen modeExit fullscreen mode

I'll use PostgreSQL as the database for this example. Notice thejsonb type for thecontent column. It allows for indexing and querying of the JSON data if needed in the future.

Now, let's create a class to represent our Outbox entry:

publicsealedclassOutboxMessage{publicGuidId{get;init;}publicstringType{get;init;}publicstringContent{get;init;}publicDateTimeOccurredOnUtc{get;init;}publicDateTime?ProcessedOnUtc{get;init;}publicstring?Error{get;init;}}
Enter fullscreen modeExit fullscreen mode

Here's how we can add a message to the Outbox:

publicasyncTaskAddToOutbox<T>(Tmessage,NpgsqlDataSourcedataSource){varoutboxMessage=newOutboxMessage{Id=Guid.NewGuid(),OccurredOnUtc=DateTime.UtcNow,Type=typeof(T).FullName,// We'll need this for deserializationContent=JsonSerializer.Serialize(message)};awaitusingvarconnection=awaitdataSource.OpenConnectionAsync();awaitconnection.ExecuteAsync(@"""        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)        """,outboxMessage);}
Enter fullscreen modeExit fullscreen mode

An elegant approach to implementing this is usingdomain events to represent notifications. When something significant happens in the domain, we will raise a domain event. Before completing the transaction, we can pick up all events and store them as Outbox messages. You could do this from the unit of work or with anEF Core interceptor.

Processing the Outbox

The Outbox processor is the next component we'll need. This could be aphysically separate process or a background worker in the same process.

I'll use Quartz toschedule background jobs for Outbox processing. It's a robust library with excellent support for scheduling recurring jobs.

Now, let's implement theOutboxProcessorJob:

[DisallowConcurrentExecution]publicclassOutboxProcessorJob(NpgsqlDataSourcedataSource,IPublishEndpointpublishEndpoint,AssemblyintegrationEventsAssembly):IJob{publicasyncTaskExecute(IJobExecutionContextcontext){awaitusingvarconnection=awaitdataSource.OpenConnectionAsync();awaitusingvartransaction=awaitconnection.BeginTransactionAsync();// You can make the limit a parameter, to control the batch size.// We can also select just the id, type, and content columns.varmessages=awaitconnection.QueryAsync<OutboxMessage>(@"""            SELECT * FROM outbox_messages            WHERE processed_on_utc IS NULL            ORDER BY occurred_on_utc LIMIT 100            """,transaction:transaction);foreach(varmessageinmessages){try{varmessageType=integrationEventsAssembly.GetType(message.Type);vardeserializedMessage=JsonConvert.DeserializeObject(message.Content,messageType);// We should introduce retries here to improve reliablity.awaitpublishEndpoint.Publish(deserializedMessage);awaitconnection.ExecuteAsync(@"""                    UPDATE outbox_messages                    SET processed_on_utc = @ProcessedOnUtc                    WHERE id = @Id                    """,new{ProcessedOnUtc=DateTime.UtcNow,message.Id},transaction:transaction);}catch(Exceptionex){// We can also introduce error logging here.awaitconnection.ExecuteAsync(@"""                    UPDATE outbox_messages                    SET processed_on_utc = @ProcessedOnUtc, error = @Error                    WHERE id = @Id                    """,new{ProcessedOnUtc=DateTime.UtcNow,Error=ex.ToString(),message.Id},transaction:transaction);}}awaittransaction.CommitAsync();}}
Enter fullscreen modeExit fullscreen mode

This approach uses polling to periodically fetch unprocessed messages from the database. Polling can increase the load on the database, as we'll need to query for unprocessed messages frequently.

An alternative way to process Outbox messages is by usingTransaction log tailing. We can implement this usingPostgres logical replication. The database will stream changes from the Write-Ahead Log (WAL) to our application, and we'll process these messages and publish them to the message broker. You can use this to implement a push-based Outbox processor.

Considerations and Tradeoffs

The Outbox pattern, while effective, introduces additional complexity and database writes. In high-throughput systems, it's crucial to monitor its performance to ensure it doesn't become a bottleneck.

I recommend implementing retry mechanisms in the Outbox processor toimprove reliability. Consider using exponential backoff for transient failures and a circuit breaker for persistent issues to prevent system overload during outages.

It's essential that you implementidempotent message consumers. Network issues or processor restarts can lead to multiple deliveries of the same message, so your consumers must handle repeated processing safely.

Over time, the Outbox table can grow significantly, potentially impacting database performance. It's important to implement an archiving strategy early on. Consider moving processed messages to cold storage or deleting them after a set period.

Scaling Outbox Processing

As your system grows, you may find that a single Outbox processor can't keep up with the volume of messages. This can lead to increased latency between when an event occurs and when it's processed by consumers.

One straightforward approach is to increase the frequency of the Outbox processor job. You should consider running it every few seconds. This can significantly reduce the delay in message processing.

Another effective strategy is to increase the batch size when fetching unprocessed messages. By processing more messages in each run, you can improve throughput. However, be cautious not to make the batches so large that they cause long-running transactions.

For high-volume systems, processing the Outbox in parallel can be very effective. Implement a locking mechanism to claim batches of messages, allowing multiple processors to work simultaneously without conflict. You can useSELECT ... FOR UPDATE SKIP LOCKED to claim a batch of messages. This approach can dramatically increase your processing capacity.

Wrapping Up

The Outbox pattern is a powerful tool for maintaining data consistency in distributed systems. By decoupling database operations from message publishing, the Outbox pattern ensures that your system remains reliable even in the face of failures.

Remember to keep your consumers idempotent, implement proper scaling strategies, and manage your Outbox table growth.

While it adds some complexity, the benefits of guaranteed message delivery make it a valuable pattern in many scenarios.

If you're looking to implement the Outbox pattern in a robust, production-ready way, you can check outPragmatic Clean Architecture. It includes an entire section on implementing the Outbox pattern, along with other essential patterns for building maintainable and scalable .NET applications.

That's all for today. Stay awesome, and I'll see you next week.


P.S. Whenever you're ready, there are 3 ways I can help you:

  1. Pragmatic Clean Architecture: Join 3,050+ students in this comprehensive course that will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture.

  2. Modular Monolith Architecture: Join 950+ engineers in this in-depth course that will transform the way you build modern systems. You will learn the best practices for applying the Modular Monolith architecture in a real-world scenario.

  3. Patreon Community: Join a community of 1,050+ engineers and software architects. You will also unlock access to the source code I use in my YouTube videos, early access to future videos, and exclusive discounts for my courses.

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

I'm a seasoned software architect and Microsoft MVP for Developer Technologies. I talk about all things .NET and post new YouTube videos every week.
  • Work
    Software Architect & Microsoft MVP
  • Joined

More fromMilan Jovanović

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp