- Notifications
You must be signed in to change notification settings - Fork763
The Reactive Extensions for .NET
License
dotnet/reactive
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
This repository contains four libraries which are conceptually related in that they are all concerned with LINQ over sequences of things:
- Reactive Extensions for .NET aka Rx.NET or Rx (System.Reactive): a library for event-driven programming with a composable, declarative model
- AsyncRx.NET (experimental preview) (System.Reactive.Async): experimental implementation of Rx for
IAsyncObservable<T>
offering deeperasync
/await
support - Interactive Extensions for .NET, aka Ix (System.Interactive): extended LINQ operators for
IAsyncEnumerable
andIEnumerable
- LINQ for
IAsyncEnumerable
(System.Linq.Async): implements standard LINQ operators forIAsyncEnumerable
Each will be described later in this README.
Reactive programming provides clarity when our code needs to respond to events. The Rx.NET libraries were designed to enable cloud-native applications to process live data in reliable, predictable ways.
We've written a FREE book which explains the vital abstractions that underpin Rx, and shows how to exploit the powerful and extensive functionality built into the Rx.NET libraries.
Based on Lee Campbell's 2010 book (kindly donated to the project), it has been re-written to bring it up to date with Rx.NET v6.0, .NET 8.0, and modern cloud native use cases such as IoT and real-time stream data processing.
Introduction to Rx.NET is availableOnline,on GitHub, asPDF, andEPUB.
Channel | Rx | AsyncRx | Ix | System.Linq.Async |
---|---|---|---|---|
NuGet.org | ||||
NuGet.org preview (if newer than release) | ||||
Build | Built as part of Ix | |||
Azure Artifacts | ||||
Release history | ReleaseHistory | ReleaseHistory | ReleaseHistory |
For nightly builds, configure NuGet to use this feed:https://pkgs.dev.azure.com/dotnet/Rx.NET/_packaging/RxNet/nuget/v3/index.json
Catch us in the #rxnet channel over athttps://reactivex.slack.com/
In this digital age, live data streams are ubiquitous. Financial applications depend on a swift response to timely information. Computer networks have always been able to provide extensive information about their health and operation. Utility companies such as water providers have vast numbers of devices monitoring their operations. User interface and game building frameworks report user interactions in great detail. Delivery vans continuously report their progress. Aircraft provide performance telemetry to detect potential maintenance issues before they become serious problems, and cars are now starting to do the same. Many of us wear or carry devices that track our physical activity and evenvital signs. And the improvements in machine learning have enriched the insights that can be derived from the ever-increasing volume and variety of live data.
But despite being so widespread, live information streams have always been something of a second class citizen. Almost all programming languages have some innate way to work with lists of data (e.g., arrays), but these mechanisms tend to presume that the relevant data is already sitting in memory, ready for us to work with it. What's missing is the liveness—the fact that an information source might produce new data at any moment, on its own schedule.
Rx elevates the support for live streams of information to the same level as we expect for things like arrays. Here's an example:
varbigTrades=fromtradeintradeswheretrade.Volume>1_000_000selecttrade;
This uses C#'s LINQ feature to filtertrades
down to those entities with a volume greater than one million. This query expression syntax is just a shorthand for method calls, so we could also write it this way:
varbigTrades=trades.Where(trade=>trade.Volume>1_000_000);
The exact behaviour of these two (equivalent) code snippets depends on what typetrades
has. If it were aIEnumerable<Trade>
, then this query would just iterate through the list, andbigTrades
would be an enumerable sequence containing just the matching objects. Iftrades
were an object representing a database table (e.g., anEntity FrameworkDbSet, this would be translated into a database query. But if we're using Rx,trades
would be anIObservable<Trade>
, an object reporting live events as they happen. AndbigTrades
would also be anIObservable<Trade>
, reporting only those trades with a volume over a million. We can provide Rx with a callback to be invoked each time an observable source has something for us:
bigTrades.Subscribe(t=>Console.WriteLine($"{t.Symbol}: trade with volume{t.Volume}"));
The two key features of Rx are:
- a clearly defined way to represent and handle live sequences of data (
IObservable<T>
) - a set of operators (such as the
Where
operator just shown) enabling event processing logic to be expressed declaratively
Rx has been particularly successfully applied in user interfaces. (This is also true outside of .NET—RxJS is a JavaScript spin-off of Rx, and it is very popular in user interface code.) Thehttps://github.com/reactiveui/reactiveui makes deep use of Rx to support .NET UI development.
Ian Griffiths presented a concise 60 minute overview ofReactive Extensions for .NET at the dotnetsheff meetup in 2020. More videos are available on theRx playlist.
Although Rx is a natural way to model asynchronous processes, its original design presumed that code acting on notifications would run synchronously. This is because Rx's design predates C#'sasync
/await
language features. So although Rx offer adapters that can convert betweenIObservable<T>
andTask<T>
, there were certain cases whereasync
was not an option.
AsyncRx.Net lifts this restriction by definingIAsyncObservable<T>
. This enables observers to use asynchronous code. For example, ifbigTrades
were anIAsyncObservable<Trade>
we could write this:
bigTrades.Subscribe(async t=>awaitbigTradeStore.LogTradeAsync(t));
AsyncRx.Net is currently in preview.
Rx defines all the standard LINQ operators available for other providers, but it also adds numerous additional operators. For example, it definesScan
, which performs the same basic processing as the standardAggregate
operator, but instead of producing a single result after processing every element, it produces a sequence containing the aggregated value after every single step. (For example, if the operation being aggregated is addition,Aggregate
would return the sum total as a single output, whereasScan
would produce a running total for each input. Given a sequence[1,2,3]
,Aggregate((a, x) => a + x)
produces just6
, whereasScan
would produce[1,3,6]
.)
Some of the additional operators Rx defines are useful only when you're working with events. But some are applicable to sequences of any kind. So the Interactive Extensions (Ix for short) define implementations forIEnumerable<T>
. Ix is effectively an extension of LINQ to Objects, adding numerous additional operators. (Its usefulness is borne out by the fact that the .NET runtime libraries have, over time, added some of the operators that used to be available only in Ix. For example, .NET 6 addedMinBy
andMaxBy
, operators previously only defined by Ix.)
This library is called the "Interactive Extensions" because "Interactive" is in a sense the opposite of "Reactive". (The name does not refer to user interactions.)
One of the features pioneered by Ix was an asynchronous version ofIEnumerable<T>
. This is another example of a feature so useful that it was eventually added to the .NET runtime libraries: .NET Core 3.0 introducedIAsyncEnumerable<T>
, and the associated version C# (8.0) added intrinsic support for this interface with itsawait foreach
construct.
Although .NET Core 3.0 definedIAsyncEnumerable<T>
, it did not add any corresponding LINQ implementation. WhereasIEnumerable<T>
supports all the standard operators such asWhere
,GroupBy
, andSelectMany
, .NET does not have built-in implementations of any of these forIAsyncEnumerable<T>
. However, Ix had provided LINQ operators for its prototype version ofIAsyncEnumerable<T>
from the start, so when .NET Core 3.0 shipped, it was a relatively straightforward task to update all those existing LINQ operators to work with the new, officialIAsyncEnumerable<T>
.
Thus, theSystem.Linq.Async NuGet package was created, providing a LINQ to Objects implementation forIAsyncEnumerable<T>
to match the one already built into .NET forIEnumerable<T>
.
Since all of the relevant code was already part of the Ix project (withIAsyncEnumerable<T>
also originally having been defined by this project), theSystem.Linq.Async NuGet package is built as part of theIx project.
Some of the best ways to contribute are to try things out, file bugs, and join in design conversations.
- Clone the sources:
git clone https://github.com/dotnet/reactive
- Building, testing and debugging the sources
- How to Contribute
- Pull requests:Open/Closed
Looking for something to work on? The list ofup for grabs issues is a great place to start.
This project has adopted a code of conduct adapted from theContributor Covenant to clarify expected behavior in our community. This code of conduct has beenadopted by many other projects. For more information see theCode of conduct.
This project is part of the.NET Foundation along with otherprojects likethe .NET Runtime. The .NET Foundation provides this project with DevOps infrastructure to compile, test, sign and package this complex solution which has over 100 million downloads. It also provides conservatorship enabling the project to pass from maintainer to maintainer, enabling continuity for the community.
The people currently maintaining Rx are:
![]() Ian Griffiths Hove, UK | ![]() Howard van Rooijen Winchester, UK |
Rx has been around for roughly a decade and a half, so we owe a great deal to its creators, and the many people who have worked on it since. See theAUTHORS.txt for a full list.
As part of .NET Conf 2023, Ian Griffiths provided an update on the efforts tomodernize Rx.NET for v6.0 and the plans to for v7.0.
For more information, see the following discussions:
We have set out aroadmap explaining our medium term plans for ongoing development of Rx. This diagram illustrates our view of the platforms on which Rx is used, and the planned support lifecycles for these various targets:
About
The Reactive Extensions for .NET