- Notifications
You must be signed in to change notification settings - Fork68
Rust implementation of Reactive Extensions.
License
rxRust/rxRust
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Add this to your Cargo.toml:
[dependencies]rxrust ="1.0.0-beta.0"
use rxrust:: prelude::*;letmut numbers = observable::from_iter(0..10);// create an even stream by filterlet even = numbers.clone().filter(|v| v %2 ==0);// create an odd stream by filterlet odd = numbers.clone().filter(|v| v %2 !=0);// merge odd and even stream againeven.merge(odd).subscribe(|v|print!("{} ", v,));// "0 2 4 6 8 1 3 5 7 9" will be printed.
Inrxrust almost all extensions consume the upstream. So when you try to subscribe a stream twice, the compiler will complain.
#use rxrust::prelude::*;let o = observable::from_iter(0..10); o.subscribe(|_|println!("consume in first")); o.subscribe(|_|println!("consume in second"));
In this case, we must clone the stream.
#use rxrust::prelude::*;let o = observable::from_iter(0..10); o.clone().subscribe(|_|println!("consume in first")); o.clone().subscribe(|_|println!("consume in second"));
If you want to share the same observable, you can useSubject.
rxrust use the runtime of theFuture as the scheduler,LocalPool andThreadPool infutures::executor can be used as schedulers directly, andtokio::runtime::Runtime is also supported, but need to enable the featurefutures-scheduler. AcrossScheduler to implement customScheduler.Some Observable Ops (such asdelay, anddebounce) need the ability to delay, futures-time supports this ability when set with thetimer feature, but you can also customize it by setting the new_timer function to NEW_TIMER_FN variant and removing thetimer feature.
use rxrust::prelude::*;// `FuturesThreadPoolScheduler` is the alias of `futures::executor::ThreadPool`.let threads_scheduler =FuturesThreadPoolScheduler::new().unwrap();observable::from_iter(0..10).subscribe_on(threads_scheduler.clone()).map(|v| v*2).observe_on_threads(threads_scheduler).subscribe(|v|println!("{},", v));
Also,rxrust supports WebAssembly by enabling the featurewasm-scheduler and using the cratewasm-bindgen. A simple example ishere.
Just useobservable::from_future to convert aFuture to an observable sequence.
use rxrust::prelude::*;letmut scheduler_pool =FuturesLocalSchedulerPool::new();observable::from_future(std::future::ready(1), scheduler_pool.spawner()).subscribe(move |v|println!("subscribed with {}", v));// Wait `task` finish.scheduler_pool.run();
Afrom_future_result function is also provided to propagate errors from `Future``.
Seemissing features to know what rxRust does not have yet.
We are looking for contributors! Feel free to open issues for asking questions, suggesting features or other things!
Help and contributions can be any of the following:
- use the project and report issues to the project issues page
- documentation and README enhancement (VERY important)
- continuous improvement in a ci Pipeline
- implement any unimplemented operator, remember to create a pull request before you start your code, so other people know you are working on it.
you can enable the default timer bytimer feature, or set a timer across functionnew_timer_fn
About
Rust implementation of Reactive Extensions.
Topics
Resources
License
Contributing
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.