Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Internal application architecture via command and event messages

License

NotificationsYou must be signed in to change notification settings

devlooped/Merq

Repository files navigation

Icon Merq

VersionDownloadsLicense

Mercury: messenger of the Roman gods

Mercury >Merq-ry >Merq

Merq brings theMessage Bus pattern together withacommand-oriented interface for anextensible and decoupled in-process application architecture.

These patterns are well established in microservices and service orientedarchitectures, but their benefits can be applied to apps too, especiallyextensible ones where multiple teams can contribute extensions whichare composed at run-time.

The resulting improved decoupling between components makes it easier to evolvethem independently, while improving discoverability of available commands andevents. You can see this approach applied in the real world inVSCode commandsand various events such aswindow events.Clearly, in the case of VSCode, everything is in-process, but the benefits ofa clean and predictable API are pretty obvious.

Merq provides the same capabilities for .NET apps.

Events

Events can be any type, there is no restriction or interfaces you must implement.Nowadays,C# record typesare a perfect fit for event data types. An example event could be a one-liner such as:

publicrecordItemShipped(stringId,DateTimeOffsetDate);

The events-based API surface on the message bus is simple enough:

publicinterfaceIMessageBus{voidNotify<TEvent>(TEvente);IObservable<TEvent>Observe<TEvent>();}

By relying onIObservable<TEvent>,Merq integrates seamlessly withmore powerful event-driven handling viaSystem.Reactiveor the more lightweightRxFree.Subscribing to events with either of those packages is trivial:

IDisposablesubscription;// constructor may use DI to get the dependencypublicCustomerViewModel(IMessageBus bus){subscription=bus.Observe<ItemShipped>().Subscribe(OnItemShipped);}voidOnItemShipped(ItemShippede)=>// Refresh item statuspublicvoid Dispose()=>subscription.Dispose();

In addition to event producers just invokingNotify, they can also beimplemented asIObservable<TEvent> directly, which is useful when theproducer is itself an observable sequence.

Both features integrate seamlessly and leverage all the power ofReactive Extensions.

Commands

Commands can also be any type, and C# records make for concise definitions:

recordCancelOrder(stringOrderId):IAsyncCommand;

Unlike events, command messages need to signal the invocation style they requirefor execution:

ScenarioInterfaceInvocation
void synchronous commandICommandIMessageBus.Execute(command)
value-returning synchronous commandICommand<TResult>var result = await IMessageBus.Execute(command)
void asynchronous commandIAsyncCommandawait IMessageBus.ExecuteAsync(command)
value-returning asynchronous commandIAsyncCommand<TResult>var result = await IMessageBus.ExecuteAsync(command)
async stream commandIStreamCommand<TResult>await foreach(var item in IMessageBus.ExecuteStream(command))

The sample command shown before can be executed using the following code:

// perhaps a method invoked when a user// clicks/taps a Cancel button next to an orderasyncTaskOnCancel(stringorderId){awaitbus.ExecuteAsync(newCancelOrder(orderId),CancellationToken.None);// refresh UI for new state.}

An example of a synchronous command could be:

// Command declarationrecordSignOut():ICommand;// Command invocationvoidOnSignOut()=>bus.Execute(newSignOut());// or alternatively, for void commands that have no additional data:voidOnSignOut()=>bus.Execute<SignOut>();

The marker interfaces on the command messages drive the compiler to only allowthe right invocation style on the message bus, as defined by the command author:

publicinterfaceIMessageBus{// sync voidvoidExecute(ICommandcommand);// sync value-returningTResultExecute<TResult>(ICommand<TResult>command);// async voidTaskExecuteAsync(IAsyncCommandcommand,CancellationTokencancellation);// async value-returningTask<TResult>ExecuteAsync<TResult>(IAsyncCommand<TResult>command,CancellationTokencancellation);// async streamIAsyncEnumerable<TResult>ExecuteStream<TResult>(IStreamCommand<TResult>command,CancellationTokencancellation);}

For example, to create a value-returning async command that retrieves somevalue, you would have:

recordFindDocuments(stringFilter):IAsyncCommand<IEnumerable<string>>;classFindDocumentsHandler:IAsyncCommandHandler<FindDocument,IEnumerable<string>>{publicboolCanExecute(FindDocumentcommand)=>!string.IsNullOrEmpty(command.Filter);publicTask<IEnumerable<string>>ExecuteAsync(FindDocumentcommand,CancellationTokencancellation)=>// evaluate command.Filter across all documents and return matches}

In order to execute such command, the only execute method the compiler will allowis:

IEnumerable<string>files=awaitbus.ExecuteAsync(newFindDocuments("*.json"));

If the consumer tries to useExecute, the compiler will complain that thecommand does not implementICommand<TResult>, which is the synchronous versionof the marker interface.

While these marker interfaces on the command messages might seem unnecessary,they are actually quite important. They solve a key problem that executionabstractions face: whether a command execution is synchronous or asynchronous(as well as void or value-returning) shouldnot be abstracted away sinceotherwise you can end up in two common anti-patterns (i.e.async guidelines for ASP.NET),known assync over async andasync over sync.

Likewise, mistakes cannot be made when implementing the handler, since thehandler interfaces define constraints on what the commands must implement:

// syncpublicinterfaceICommandHandler<inTCommand>: ...where TCommand:ICommand;publicinterfaceICommandHandler<in TCommand,out TResult>: ...where TCommand:ICommand<TResult>;// asyncpublicinterfaceIAsyncCommandHandler<in TCommand>: ...where TCommand:IAsyncCommand;publicinterfaceIAsyncCommandHandler<in TCommand,TResult>: ...where TCommand:IAsyncCommand<TResult>// async streampublicinterfaceIStreamCommandHandler<in TCommand,out TResult>: ...where TCommand:IStreamCommand<TResult>

This design choice also makes it impossible to end up executing a commandimplementation improperly.

In addition to execution, theIMessageBus also provides a mechanism to determineif a command has a registered handler at all via theCanHandle<T> method as wellas a validation mechanism viaCanExecute<T>, as shown above in theFindDocumentsHandler example.

Commands can notify new events, and event observers/subscribers can in turnexecute commands.

Async Streams

For .NET6+ apps,Merq also supportsasync streamsas a command invocation style. This is useful for scenarios where the commandexecution produces a potentially large number of results, and the consumerwants to process them as they are produced, rather than waiting for the entiresequence to be produced.

For example, the filter documents command above could be implemented as anasync stream command instead:

recordFindDocuments(stringFilter):IStreamCommand<string>;classFindDocumentsHandler:IStreamCommandHandler<FindDocument,string>{publicboolCanExecute(FindDocumentcommand)=>!string.IsNullOrEmpty(command.Filter);publicasyncIAsyncEnumerable<string>ExecuteAsync(FindDocumentcommand,[EnumeratorCancellation]CancellationTokencancellation){awaitforeach(varfileinFindFilesAsync(command.Filter,cancellation))yieldreturnfile;}}

In order to execute such command, the only execute method the compiler will allowis:

awaitforeach(varfileinbus.ExecuteStream(newFindDocuments("*.json")))Console.WriteLine(file);

Analyzers and Code Fixes

Beyond the compiler complaining,Merq also provides a set of analyzers andcode fixes to learn the patterns and avoid common mistakes. For example, if youcreated a simple record to use as a command, such as:

publicrecordEcho(stringMessage);

And then tried to implement a command handler for it:

publicclassEchoHandler:ICommandHandler<Echo>{}

the compiler would immediately complain about various contraints and interfacesthat aren't satisfied due to the requirements on theEcho type itself. Fora seasonedMerq developer, this is a no-brainer, but for new developers,it can be a bit puzzling:

compiler warnings screenshot

A code fix is provided to automatically implement the required interfacesin this case:

code fix to implement ICommand screenshot

Likewise, if a consumer attempted to invoke the aboveEcho command asynchronously(known as theasync over sync anti-pattern),they would get a somewhat unintuitive compiler error:

error executing sync command as async

But the second error is more helpful, since it points to the actual problem,and a code fix can be applied to resolve it:

code fix for executing sync command as async

The same analyzers and code fixes are provided for the opposite anti-pattern,known assync over async,where a synchronous command is executed asynchronously.

Message Bus

The default implementation lives in a separate packageMerq.Coreso that application components can take a dependency on just the interfaces.

VersionDownloads

The default implementation of the message bus interfaceIMessageBus hasno external dependencies and can be instantiated via theMessageBus constructordirectly.

The bus locates command handlers and event producers via the passed-inIServiceProvider instance in the constructor:

varbus=newMessageBus(serviceProvider);// execute a commandbus.Execute(newMyCommand());// observe an event from the busbus.Observe<MyEvent>().Subscribe(e=>Console.WriteLine(e.Message));

When usingdependency injection for .NET,theMerq.DependencyInjection packageprovides a simple mechanism for registering the message bus:

varbuilder=WebApplication.CreateBuilder(args);...builder.Services.AddMessageBus();

All command handlers and event producers need to be registered with theservices collection as usual, using the main interface for the component,such asICommandHandler<T> andIObservable<TEvent>.

NOTE:Merq makes no assumptions about the lifetime of the registeredcomponents, so it's up to the consumer to register them with the desiredlifetime.

To drastically simplify registration of handlers and producers, werecommend theDevlooped.Extensions.DependencyInjection.Attributed.package, which provides a simple attribute-based mechanism for automaticallyemitting at compile-time the required service registrations for all typesmarked with the provided[Service] attribute, which also allows setting thecomponent lifetime, such as[Service(ServiceLifetime.Transient)] (defaultlifetime isServiceLifetime.Singleton for this source generator-basedpackage).

This allows to simply mark all command handlers and event producers as[Service] and then register them all with a single line of code:

builder.Services.AddServices();

Telemetry and Monitoring

The core implementation of theIMessageBus is instrumented withActivitySource andMetric, providing out of the box support forOpen Telemetry-based monitoring, as wellas viadotnet traceanddotnet counters.

To export telemetry usingOpen Telemetry,for example:

usingvartracer=Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("ConsoleApp")).AddSource(source.Name).AddSource("Merq").AddConsoleExporter().AddZipkinExporter().AddAzureMonitorTraceExporter(o=>o.ConnectionString=config["AppInsights"]).Build();

Collecting traces viadotnet-trace:

dotnet trace collect --name [PROCESS_NAME] --providers="Microsoft-Diagnostics-DiagnosticSource:::FilterAndPayloadSpecs=[AS]Merq,System.Diagnostics.Metrics:::Metrics=Merq"

Monitoring metrics viadotnet-counters:

dotnet counters monitor --process-id [PROCESS_ID] --counters Merq

Example rendering from the included sample console app:

dotnet-counters screenshot

Duck Typing Support

Being able to loosely couple both events (and their consumers) and command execution (from theircommand handler implementations) is a key feature of Merq. To take this decoupling to the extreme,Merq allows a similar capability as allowed by the TypeScript/JavaScript in VSCode: you can justcopy/paste an event/command definition assource into your assembly, and perform the regularoperations with it (likeObserve an event andExecute a command), in a "duck typing" manner.

As long as the types' full name match, the conversion will happen automatically. Since thisfunctionality isn't required in many scenarios, and since there are a myriad ways to implementsuch an object mapping functionality, theMerq.Core package only provides the hooks to enablethis, but does not provide any built-in implementation for it. In other words, no duck typingis performed by default.

TheMerq.AutoMapper package provides one suchimplementation, based on the excelentAutoMapper library. It can beregistered with the DI container as follows:

builder.Services.AddMessageBus<AutoMapperMessageBus>();// register all services, including handlers and producersbuilder.Services.AddServices();

Dogfooding

CI VersionBuild

We also produce CI packages from branches and pull requests so you can dogfood builds as quickly as they are produced.

The CI feed ishttps://pkg.kzu.dev/index.json.

The versioning scheme for packages is:

  • PR builds:42.42.42-pr[NUMBER]
  • Branch builds:42.42.42-[BRANCH].[COMMITS]

Sponsors

Clarius OrgMFB Technologies, Inc.TorutekDRIVE.NET, Inc.Keith PickfordThomas BolonKori FrancisToni WenzelUno PlatformDan SiegelReuben SwartzJacob FosheeEric JohnsonIx Technologies B.V.David JENNIJonathan Charley WuJakob Tikjøb AndersenTino HagerKen BonnySimon Croppagileworks-eusorahexZheyu ShenVezelChilliCream4OTCVincent LimoJordan S. JonesdomischellJoseph Kingry

Sponsor this project 

Learn more about GitHub Sponsors

About

Internal application architecture via command and event messages

Topics

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Sponsor this project

 

Languages


[8]ページ先頭

©2009-2025 Movatter.jp