Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Rust implementation of Reactive Extensions.

License

NotificationsYou must be signed in to change notification settings

rxRust/rxRust

Repository files navigation

codecov

Usage

Add this to your Cargo.toml:

[dependencies]rxrust ="1.0.0-beta.0"

Example

use rxrust:: prelude::*;letmut numbers = observable::from_iter(0..10);// create an even stream by filterlet even = numbers.clone().filter(|v| v %2 ==0);// create an odd stream by filterlet odd = numbers.clone().filter(|v| v %2 !=0);// merge odd and even stream againeven.merge(odd).subscribe(|v|print!("{} ", v,));// "0 2 4 6 8 1 3 5 7 9" will be printed.

Clone Stream

Inrxrust almost all extensions consume the upstream. So when you try to subscribe a stream twice, the compiler will complain.

 #use rxrust::prelude::*;let o = observable::from_iter(0..10); o.subscribe(|_|println!("consume in first")); o.subscribe(|_|println!("consume in second"));

In this case, we must clone the stream.

 #use rxrust::prelude::*;let o = observable::from_iter(0..10); o.clone().subscribe(|_|println!("consume in first")); o.clone().subscribe(|_|println!("consume in second"));

If you want to share the same observable, you can useSubject.

Scheduler

rxrust use the runtime of theFuture as the scheduler,LocalPool andThreadPool infutures::executor can be used as schedulers directly, andtokio::runtime::Runtime is also supported, but need to enable the featurefutures-scheduler. AcrossScheduler to implement customScheduler.Some Observable Ops (such asdelay, anddebounce) need the ability to delay, futures-time supports this ability when set with thetimer feature, but you can also customize it by setting the new_timer function to NEW_TIMER_FN variant and removing thetimer feature.

use rxrust::prelude::*;// `FuturesThreadPoolScheduler` is the alias of `futures::executor::ThreadPool`.let threads_scheduler =FuturesThreadPoolScheduler::new().unwrap();observable::from_iter(0..10).subscribe_on(threads_scheduler.clone()).map(|v| v*2).observe_on_threads(threads_scheduler).subscribe(|v|println!("{},", v));

Also,rxrust supports WebAssembly by enabling the featurewasm-scheduler and using the cratewasm-bindgen. A simple example ishere.

Converts from a Future

Just useobservable::from_future to convert aFuture to an observable sequence.

use rxrust::prelude::*;letmut scheduler_pool =FuturesLocalSchedulerPool::new();observable::from_future(std::future::ready(1), scheduler_pool.spawner()).subscribe(move |v|println!("subscribed with {}", v));// Wait `task` finish.scheduler_pool.run();

Afrom_future_result function is also provided to propagate errors from `Future``.

Missing Features List

Seemissing features to know what rxRust does not have yet.

All contributions are welcome

We are looking for contributors! Feel free to open issues for asking questions, suggesting features or other things!

Help and contributions can be any of the following:

  • use the project and report issues to the project issues page
  • documentation and README enhancement (VERY important)
  • continuous improvement in a ci Pipeline
  • implement any unimplemented operator, remember to create a pull request before you start your code, so other people know you are working on it.

you can enable the default timer bytimer feature, or set a timer across functionnew_timer_fn

About

Rust implementation of Reactive Extensions.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors31

Languages


[8]ページ先頭

©2009-2025 Movatter.jp