- Notifications
You must be signed in to change notification settings - Fork5
Library for interoperation between RxJava 3 and JDK 21+ Virtual Threads
License
akarnokd/RxJavaFiberInterop
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Library for interoperation between RxJava 3 and JDK 21's Virtual Threads (aka Fibers, Project Loom).
dependencies { implementation"com.github.akarnokd:rxjava3-fiber-interop:0.0.18"}Requires a JDK that has Virtual Threads as standard feature (i.e., not preview), such ashttps://jdk.java.net/21/.
Currently, the Virtual Thread API does not offer public means to specify the carrier thread(pool) thus it is not possible to use RxJavaSchedulers as such.
You can use theSchedulers.from though to convert the Fork-Join-pool backed standard Virtual Thread Executor into an RxJavaScheduler:
varvte =Executors.newVirtualThreadExecutor();SchedulervtScheduler =Schedulers.from(vte);// sometime latervte.close();
You can then usevtScheduler from the example withsubscribeOn andobserveOn to let traditional functional callbacks to block virtually:
Observable.fromCallable(() ->someBlockingNetworkCall()).subscribeOn(vtScheduler).observeOn(vtScheduler).map(v ->someOtherBlockingCall(v)).observeOn(uiThread).subscribe(v ->label.setText(v),e ->label.setText(e.toString()));
ℹ️ You need the special operators below to make RxJava's non-blocking backpressure into virtually blocked backpressure.
Creates aFlowable from a generator callback, that can emit viaFiberEmitter, run on anExecutorService provided by the user andis suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.
The createdFlowable will complete once the callback returns normally or with an error if the callback throws an exception.
try (varscope =Executors.newVirtualThreadExecutor()) {FiberInterop.create(emitter -> {for (inti =1;i <=5;i++) {emitter.emit(1); } },scope) .test() .awaitDone(5,TimeUnit.SECONDS) .assertResult(1,2,3,4,5);}
Transforms each upstream value via a callback that can emit zero or more values for each of those upstream values, run on anExecutorService provided by the user and is suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.
try (varscope =Executors.newVirtualThreadExecutor()) {Flowable.range(1,5) .compose(FiberInterop.transform((value,emitter) -> {emitter.emit(value);emitter.emit(value +1); },scope)) .test() .awaitDone(5,TimeUnit.SECONDS) .assertResult(1,2,2,3,3,4,4,5,5,6);}
RxJava usesjava.util.concurrent locks andCountDownLatches via itsblockingXXX which will automatically work within a virtual thread. Therefore, there is no need for a separate interop operator. Just block.
try (varscope =Executors.newVirtualThreadExecutor()) {scope.submit(() -> {varv =Flowable.just(1) .delay(1,TimeUnit.SECONDS) .blockingLast();System.out.println(v); });}
About
Library for interoperation between RxJava 3 and JDK 21+ Virtual Threads
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.