Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Library for interoperation between RxJava 3 and JDK 21+ Virtual Threads

License

NotificationsYou must be signed in to change notification settings

akarnokd/RxJavaFiberInterop

Repository files navigation

Library for interoperation between RxJava 3 and JDK 21's Virtual Threads (aka Fibers, Project Loom).

codecov.ioMaven Central

dependencies {    implementation"com.github.akarnokd:rxjava3-fiber-interop:0.0.18"}

Requires a JDK that has Virtual Threads as standard feature (i.e., not preview), such ashttps://jdk.java.net/21/.

Components

FiberInterop

Schedulers

Currently, the Virtual Thread API does not offer public means to specify the carrier thread(pool) thus it is not possible to use RxJavaSchedulers as such.

You can use theSchedulers.from though to convert the Fork-Join-pool backed standard Virtual Thread Executor into an RxJavaScheduler:

varvte =Executors.newVirtualThreadExecutor();SchedulervtScheduler =Schedulers.from(vte);// sometime latervte.close();

You can then usevtScheduler from the example withsubscribeOn andobserveOn to let traditional functional callbacks to block virtually:

Observable.fromCallable(() ->someBlockingNetworkCall()).subscribeOn(vtScheduler).observeOn(vtScheduler).map(v ->someOtherBlockingCall(v)).observeOn(uiThread).subscribe(v ->label.setText(v),e ->label.setText(e.toString()));

ℹ️ You need the special operators below to make RxJava's non-blocking backpressure into virtually blocked backpressure.

create

Creates aFlowable from a generator callback, that can emit viaFiberEmitter, run on anExecutorService provided by the user andis suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.

The createdFlowable will complete once the callback returns normally or with an error if the callback throws an exception.

try (varscope =Executors.newVirtualThreadExecutor()) {FiberInterop.create(emitter -> {for (inti =1;i <=5;i++) {emitter.emit(1);        }    },scope)    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1,2,3,4,5);}

transform

Transforms each upstream value via a callback that can emit zero or more values for each of those upstream values, run on anExecutorService provided by the user and is suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.

try (varscope =Executors.newVirtualThreadExecutor()) {Flowable.range(1,5)    .compose(FiberInterop.transform((value,emitter) -> {emitter.emit(value);emitter.emit(value +1);    },scope))    .test()    .awaitDone(5,TimeUnit.SECONDS)    .assertResult(1,2,2,3,3,4,4,5,5,6);}

blockingXXX

RxJava usesjava.util.concurrent locks andCountDownLatches via itsblockingXXX which will automatically work within a virtual thread. Therefore, there is no need for a separate interop operator. Just block.

try (varscope =Executors.newVirtualThreadExecutor()) {scope.submit(() -> {varv =Flowable.just(1)        .delay(1,TimeUnit.SECONDS)        .blockingLast();System.out.println(v);    });}

[8]ページ先頭

©2009-2025 Movatter.jp