- Notifications
You must be signed in to change notification settings - Fork20
Extensions to the Kotlin Flow library.
License
akarnokd/kotlin-flow-extensions
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
Extensions to the Kotlin Flow library.
dependencies { implementation"com.github.akarnokd:kotlin-flow-extensions:0.0.14"}Table of contents
- Hot Flows
- Sources
rangetimerconcatArrayEager
- Intermediate Flow operators (
FlowExtensions)Flow.concatWithFlow.groupByFlow.parallelFlow.publishFlow.replayFlow.startCollectOnFlow.takeUntilFlow.onBackpressureDropFlow.flatMapDropFlow.concatMapEagerFlow.amb
ParallelFlowoperators (FlowExtensions)ParallelFlow.concatMapParallelFlow.filterParallelFlow.mapParallelFlow.reduceParallelFlow.sequentialParallelFlow.transform
ConnectableFlow
Multicasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be readyto receive the next item or termination.
importhu.akarnokd.kotlin.flow.*runBlocking {val publishSubject=PublishSubject<Int>()val job= launch(Dispatchers.IO) { publishSubject.collect {println(it) }println("Done") }// wait for the collector to arrivewhile (!publishSubject.hasCollectors()) { delay(1) } publishSubject.emit(1) publishSubject.complete() job.join()}
Caches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time boundreplays are available. An additional constructor with aTimeUnit -> Long has been defined to allow virtualizingthe progression of time for testing purposes
importhu.akarnokd.kotlin.flow.*runBlocking {val replaySubject=ReplaySubject<Int>()val job= launch(Dispatchers.IO) { replaySubject.collect {println(it) }println("Done") }// wait for the collector to arrivewhile (!replaySubject.hasCollectors()) { delay(1) } replaySubject.emit(1) replaySubject.emit(2) replaySubject.emit(3) replaySubject.complete() job.join() replaySubject.collect {println(it) }println("Done 2")}
Caches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be readyto receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor.
importhu.akarnokd.kotlin.flow.*runBlocking {val behaviorSubject=BehaviorSubject<Int>() behaviorSubject.emit(1)// OR// val behaviorSubject = BehaviorSubject<Int>(1)val job= launch(Dispatchers.IO) { behaviorSubject.collect {println(it) }println("Done") }// wait for the collector to arrivewhile (!behaviorSubject.hasCollectors()) { delay(1) } behaviorSubject.emit(2) behaviorSubject.emit(3) behaviorSubject.complete() job.join()}
Maps the upstream value into aFlow and relays its items while ignoring further upstream items until the currentinnerFlow completes.
importhu.akarnokd.kotlin.flow.*range(1,10).map { delay(100) it}.flatMapDrop { range(it*100,5) .map { delay(30) it }}.assertResult(100,101,102,103,104,300,301,302,303,304,500,501,502,503,504,700,701,702,703,704,900,901,902,903,904)
Shares a single connection to the upstream source which can be consumed by many collectors inside atransform function,which then yields the resulting items for the downstream.
Effectively, one collector to the outputFlow<R> will trigger exactly one collection of the upstreamFlow<T>. Insidethetransformer function though, the presentedFlow<T> can be collected as many times as needed; it won't triggernew collections towards the upstream but share items to all inner collectors as they become available.
Unfortunately, the suspending nature of coroutines/Flow doesn't give a clear indication when thetransformer chainhas been properly established, which can result in item loss or run-to-completion without any item being collected.If the number of the inner collectors insidetransformer can be known, thepublish(expectedCollectors) overloadcan be used to hold back the upstream until the expected number of collectors have started/ready collecting items.
range(1,5) .publish(2) { shared-> merge(shared.filter { it%2==0 }, shared.filter { it%2!=0 }) } .assertResult(1,2,3,4,5)
In the example, it is knownmerge will establish 2 collectors, thus thepublish can be instructed to await those 2.Without the argument,range would rush through its items asmerge doesn't start collecting in time, causing anempty result list.
Buffers items until a single collector starts collecting items. UsecollectorCancelled todetect when the collector no longer wants to collect items.
Note that the subject uses an unbounded inner buffer and does not suspend its input side ifthe collector never arrives or can't keep up.
val us=UnicastSubject()launchIn(Dispatchers.IO) {for (iin1..200) {println("Emitting$i") us.emit(i) delay(1) } emit.complete()}// collector arrives late for some reasondelay(100)us.collect {println("Collecting$it") }
Buffers items until and inbetween a single collector is able to collect items. If the currentcollector cancels, the next collector will receive the subsequent items.
Note that the subject uses an unbounded inner buffer and does not suspend its input side ifthe collector never arrives or can't keep up.
val uws=UnicastWorkSubject()generateInts(uws,1,15)// prints lines 1..5uws.take(5).collect {println(it) }// prints lines 6..10uws.take(5).collect {println(it) }// prints lines 11..15uws.take(5).collect {println(it) }
Launches all at once and emits all items from a source before items of the next are emitted.
For example, given two sources, if the first is slow, the items of the second won't be emitted until the first hasfinished emitting its items. This operators allows all sources to generate items in parallel but then still emit thoseitems in the order their respectiveFlows are listed.
Note that each source is consumed in an unbounded manner and thus, depending on the speed ofthe current source and the collector, the operator may retain items longer and may use more memoryduring its execution.
concatArrayEager( range(1,5).onStart { delay(200) }, range(6,5)).assertResult(1,2,3,4,5,6,7,8,9,10)
Maps the upstream values into [Flow]s and launches them all at once, thenemits items from a source before items of the next are emitted.
For example, given two inner sources, if the first is slow, the items of the second won't be emitted until the first hasfinished emitting its items. This operators allows all sources to generate items in parallel but then still emit thoseitems in the order their respectiveFlows are mapped in.
Note that the upstream and each source is consumed in an unbounded manner and thus,depending on the speed of the current source and the collector, the operator may retainitems longer and may use more memory during its execution.
range(1,5).concatMapEager { range(it*10,5).onEach { delay(100) }}.assertResult(10,11,12,13,14,20,21,22,23,24,30,31,32,33,34,40,41,42,43,44,50,51,52,53,54)
Starts collecting all source [Flow]s and relays the items of the first one to emit an item,cancelling the rest.
amb( range(1,5).onStart { delay(1000) }, range(6,5).onStart { delay(100) }).assertResult(6,7,8,9,10)
About
Extensions to the Kotlin Flow library.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors5
Uh oh!
There was an error while loading.Please reload this page.