Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Flow is non-blocking but the collector is not
Mahendran
Mahendran

Posted on • Originally published atmahendranv.github.io

     

Flow is non-blocking but the collector is not

Flow is an idiomatic way in kotlin to publish stream of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further. Let's see with an example.


⛲ Flows

Let's create two flows — one finite and an infinite one. Both have a suspension point where they free up the thread 2s/1s. Theflow builder creates a flow from a suspendable block andemits values with given delay.

funfiniteEmissions():Flow<Int>=flow{repeat(3){delay(2000)emit(it+1)}}valrandom=Random(7659)funinfiniteEmissions()=flow{while(true){delay(1000)emit(random.nextInt(10,100))}}
Enter fullscreen modeExit fullscreen mode

🚰 Collecting the flow

When collecting the above flows in the same coroutine, you'll notice theinfiniteEmissions collect statement blocks the next set of statements from running. Whole point of flow is every single element emitted without blocking the current thread. But we're not proceeding further in the coroutine. Why?

funmain(){runBlocking{infiniteEmissions().collect{logger.debug(it)}logger.debug("won't complete")finiteEmissions().collect{logger.debug("Finite: $it")}logger.debug("completed")}}
Enter fullscreen modeExit fullscreen mode
10:23:18.018[main] 8010:23:19.019[main] 3610:23:20.020[main] 3010:23:21.021[main] 1110:23:22.022[main] 1010:23:23.023[main] 36>>> goes on
Enter fullscreen modeExit fullscreen mode

Reason is, the collect statement itself a suspend function and it will naturally block the current coroutine from within it is called.

publicsuspendinlinefun<T>Flow<T>.collect(crossinlineaction:suspend(value:T)->Unit):Unit=collect(object:FlowCollector<T>{overridesuspendfunemit(value:T)=action(value)})
Enter fullscreen modeExit fullscreen mode

So, how do we unblock the rest of the statements or collect the flows in parallel?

As usual — thelaunch coroutine builder. Launch starts a new coroutine without blocking the current one. Putting each collect statements in separate launch builder will unblock each other and collect them simultaneuosly.

runBlocking{launch{infiniteEmissions().collect{logger.debug(it)}logger.debug("won't complete")}launch{finiteEmissions().collect{logger.debug("Finite: $it")}logger.debug("completed")}}
Enter fullscreen modeExit fullscreen mode
10:53:3.003[main] 8010:53:4.004[main] Finite: 110:53:4.004[main] 3610:53:5.005[main] 3010:53:6.006[main] Finite: 210:53:6.006[main] 1110:53:7.007[main] 1010:53:8.008[main] Finite: 310:53:8.008[main] completed10:53:8.008[main] 3610:53:9.009[main] 5510:53:10.010[main] 96>> goes on
Enter fullscreen modeExit fullscreen mode

Now both flows emits values in parallel and notice we're staying on same thread[main].


🙅 Cancelling the infinite flow

Cancelling flow means to cancel the coroutine which collects it. This is applicable for even for a coroutine which doesn't contain a flow-collect.launch builder returns a job which can be cancelled. Callingjob.cancel() will terminate the underlying coroutine. For our example, to cancel the infinite flow once the second flow complete, do like this.

valjob=launch{infiniteEmissions().collect{logger.debug(it)}logger.debug("won't complete")}launch{finiteEmissions().collect{logger.debug("Finite: $it")}logger.debug("completed")job.cancel()}
Enter fullscreen modeExit fullscreen mode
10:54:10.010[main] 8010:54:11.011[main] Finite: 110:54:11.011[main] 3610:54:12.012[main] 3010:54:13.013[main] Finite: 210:54:13.013[main] 1110:54:14.014[main] 1010:54:15.015[main] Finite: 310:54:15.015[main] completed
Enter fullscreen modeExit fullscreen mode

🍬 Wrapup

This article covered collecting two flows in parallel. Like any other coroutine,launch will start them in parallel (not necessarily in new thread) and cancelled individually. To collect the flows in serial manner, no need for any special care — just collecting them from same coroutine would do it.

Kotlin playground

Play around ithere

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Android / Kotlin / Python / KMP
  • Location
    India
  • Work
    Senior android developer at Omnissa
  • Joined

More fromMahendran

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp