Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Effortlessly send messages anywhere on the network using Reactive Extensions (RX). Transport protocol is ZeroMQ.

License

NotificationsYou must be signed in to change notification settings

NetMQ/NetMQ.ReactiveExtensions

Repository files navigation

BuildNuGetNuGet prerelease

Effortlessly send messages anywhere on the network using Reactive Extensions (RX). Uses NetMQ as the transport layer.

Fast! Runs at >120,000 messages per second on localhost (by comparison, Tibco runs at 100,000 on the same machine).

Sample Code

The API is a drop-in replacement forSubject<T> from Reactive Extensions (RX).

As a refresher, to useSubject<T> in Reactive Extensions (RX):

varsubject=newSubject<int>();subject.Subscribe(message=>{// If we get an error "Cannot convert lambda ... not a delegate type", install Reactive Extensions from NuGet.Console.Write(message);// Prints "42".});subject.OnNext(42);

The new API starts with a drop-in replacement forSubject<T>:

varsubject=newSubjectNetMQ<int>("tcp://127.0.0.1:56001");subject.Subscribe(message=>{Console.Write(message);// Prints "42".});subject.OnNext(42);// Sends 42.

This is great for a demo, but is not recommended for any real life application.

For those of us familiar with Reactive Extensions (RX),Subject<T> is a combination of a publisher and a subscriber. If we are running a real-life application, we should separate out the publisher and the subscriber, because this means we can create the connection earlier which makes the transport setup more deterministic:

varpublisher=newPublisherNetMq<int>("tcp://127.0.0.1:56001");varsubscriber=newSubscriberNetMq<int>("tcp://127.0.0.1:56001");subscriber.Subscribe(message=>{Console.Write(message);// Prints "42".});publisher.OnNext(42);// Sends 42.

If we want to run in separate applications:

// Application 1 (subscriber)varsubscriber1=newSubscriberNetMq<int>("tcp://127.0.0.1:56001");subscriber1.Subscribe(message=>{Console.Write(message);// Prints "42".});// Application 2 (subscriber)varsubscriber2=newSubscriberNetMq<int>("tcp://127.0.0.1:56001");subscriber2.Subscribe(message=>{Console.Write(message);// Prints "42".});// Application 3 (publisher)varpublisher=newPublisherNetMq<int>("tcp://127.0.0.1:56001");publisher.OnNext(42);// Sends 42.

Currently, serialization is performed usingProtoBuf. It will handle simple types such asint without annotation, but if we want to send more complex classes, we have to annotate like this:

// For Protobuf support, include NuGet package protobuf-net from Marc Gravell.[ProtoContract]publicstructMyMessage{[ProtoMember(1)]publicintNum{get;set;}[ProtoMember(2)]publicstringName{get;set;}}varpublisher=newPublisherNetMq<MyMessage>("tcp://127.0.0.1:56001");varsubscriber=newSubscriberNetMq<MyMessage>("tcp://127.0.0.1:56001");subscriber.Subscribe(message=>{Console.Write(message.Num);// Prints "42".Console.Write(message.Name);// Prints "Bill".});publisher.OnNext(newMyMessage(42,"Bill");

NuGet Package

BuildNuGetNuGet prerelease

SeeNetMQ.ReactiveExtensions.

The NuGet package 0.9.3 is designed for .NET 4. It depends on Reactive Extensions v2.2.5 (this is difficult to find, can download the packages manually from NuGet).

The NuGet package 0.9.4-rc7 is designed .NET Core 1.1, .NET 4.5, and .NET Standard 1.6. If you want to build it for other platforms, please let me know. If you have trouble loading this, load the Git branch for the 0.9.3 release.

.NET Core 1.1 Ready

As of v0.9.4-rc7, this package will build for:

As this library supports .NET Standard 1.6 (which is a subset of .NET Core 1.1), this library should be compatible with:

  • Windows
  • Linux
  • Mac

This library is tested on Window and Linux. If it passes it's unit tests on any given platform, then it should perform nicely on different architectures such as Mac.

Compiling from source

NOTE: Not compatible with .NET Core 1.0 or .NET Core 1.0.1. Must install .NET Core 1.1 and above to avoid potential compile errors.

Demos

To check out the demos, see:

  • Publisher: ProjectNetMQ.ReactiveExtensions.SamplePublisher
  • Subscriber: ProjectNetMQ.ReactiveExtensions.SampleSubscriber
  • Sample unit tests: ProjectNetMQ.ReactiveExtensions.Tests

Performance

  • Runs at >120,000 messages per second on localhost.

100% compatible with Reactive Extensions (RX)

  • Compatible with all existing Reactive Extensions code, as it implements IObservable and IObserver from Microsoft.
  • Can use.Where(),.Select(),.Buffer(),.Throttle(), etc.
  • Supports.OnNext(),.OnException(), and.OnCompleted().
  • Properly passes exceptions across the wire.

Unit tests

  • Supported by a full suite of unit tests.

Projects like this one that do messaging

  • SeeObvs, an fantastic RX wrapper which supports many transport layers including NetMQ, RabbitMQ and Azure, and many serialization methods including ProtoBuf and MsgPack.
  • SeeObvs.NetMQ, the RX wrapper with NetMQ as the transport layer.
  • Search forall packages on NuGet that depend on RX, and pick out the ones that are related to message buses.
  • Check out Kafka. It provides many-to-many messaging, with persistance, and multi-node redundancy. And its blindingly fast.

Wiki

See theWiki with more documentation.

About

Effortlessly send messages anywhere on the network using Reactive Extensions (RX). Transport protocol is ZeroMQ.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors5


[8]ページ先頭

©2009-2025 Movatter.jp