Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).

License

NotificationsYou must be signed in to change notification settings

akarnokd/async-enumerable

Repository files navigation

codecov.ioMaven Central

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).

Gradle

compile"com.github.akarnokd:async-enumerable:0.6.0"

Getting started

The main entry point is thehu.akarnokd.asyncenum.AsyncEnumerable interface with its static factory methods similar to RxJava:

AsyncEnumerable<Integer>source =AsyncEnumerable.range(1,10);AsyncEnumerable<String>strings =AsyncEnumerable.fromArray("a","b","c","d");

AsyncEnumerable<T> is a deferred cold source, which can be synchronous or asynchronous, theenumerator() has to be called to receive another interface,hu.akarnokd.asyncenum.AsyncEnumerator to be "iterated" over.

AsyncEnumerator<Integer>enumerator =source.enumerator();

TheAsyncEnumerator<T> defines two methods,moveNext() andcurrent(). CallingmoveNext() will instruct thesource to produce the next value but instead of returning afalse ortrue immediately, the method returns ajava.util.concurrent.CompletionStage<Boolean> that is completed withtrue if a value is ready andfalse if nomore values to be expected. In thetrue case, one can read the current value viacurrent().

(Cancelling a sequence is currently partially supported via thecancel() method onAsyncEnumerator,but it feels too much Reactive Streams and not like the pre-existing cancellation support inother async-enumerable libraries.)

CompletionStage<Boolean>stage =enumerator.moveNext();stage.whenComplete((hasValue,error) -> {if (error !=null) {error.printStackTrace();return;    }if (hasValue) {System.out.println(enumerator.current());    }else {System.out.println("Empty source!");    }})

Note that callingmoveNext() orcurrent() during the time theCompletionStage hasn't been terminated is anundefined behavior. CallingmoveNext after the previousCompletionStage returnedfalse or an exception isalso undefined behavior.

Therefore, consuming multiple values via a plain for loop doesn't work; one has to callmoveNext when the previousCompletionStage completed withtrue in a recursively looking pattern. Since someAsyncEnumerable chains canbe synchronous, this leads toStackOverflowError if not handled properly.

For this purpose, theforEach() instance method onAsyncEnumerable is available, but given anAsyncEnumerator,the following consumption pattern can be employed:

finalclassEnumeratorConsumer<T>extendsAtomicIntegerimplementsBiConsumer<Boolean,Throwable> {finalAsyncEnumerator<T>enumerator;publicEnumeratorConsumer(AsyncEnumerator<T>enumerator) {this.enumerator =enumerator;    }@Overridepublicvoidaccept(BooleanhasNext,Throwableerror) {if (error !=null) {// handle error casereturn;        }if (hasNext) {Tvalue =enumerator.current();// handle current valuemoveNext();           }else {// handle no more values        }    }publicvoidmoveNext() {if (getAndIncrement() ==0) {do {enumerator.moveNext().whenComplete(this);            }while (decrementAndGet() !=0);        }    }}newEnumeratorConsumer(source.enumerator()).moveNext();

This is practically the same queue-drain or trampolining logic used throughout RxJava. It is recommended thoughto use the combinators and operators ofAsyncEnumerable instead as working with a sequence ofCompletionStagecontinuations, especially when there are multiple active sequences involved as complications.

About

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors3

  •  
  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp