Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
/zioPublic

Can I feed items into ZSink manually?#7195

Answeredbypvgoran
pvgoran asked this question inQ&A
Discussion options

I have a task combinator that repeatedly executes the task, retries if it fails, and reports the execution status to the provided ZSink. The simplified code (without retrying, for brevity):

  def repeatAndFeedStatus(    initTask : Task[Unit], repeatTask : Task[Unit], repeatDelay : Duration, retrySchedule : Schedule[Any, Throwable, Unit],    statusSink : Sink[Throwable, RunStatus, Nothing, Unit]) : Task[Nothing] =  {    val repeatSchedule : Schedule[Any, Unit, ?] = Schedule.fixed(repeatDelay)    for {      statusQueue : Queue[RunStatus] <- Queue.bounded(1)      runLoop : Task[Nothing] =        initTask        *> (repeatTask.tap { _ => statusQueue.offer(RunStatus.Running) })          .repeat(repeatSchedule)        *> ZIO.fail(IllegalStateException("Repeat isn't expected to complete"))      result <-        (statusQueue.offer(RunStatus.Starting) *> runLoop)        <& ZStream.fromQueue(statusQueue).run(statusSink)    } yield result  }

I have some troubles with the "report" part. As you can see, I use aQueue to emitRunStatus values, and this Queue is then streamed tostatusSink in parallel with the main computation. The caller definesstatusSink like this:

  val statusSink : Sink[Throwable, RunStatus, Nothing, Unit] =    ZSink.foreach(status => zio.Console.printLine("Status: " + status))

This works, but theprintLine computation above executes out-of-sequence with therepeatTask computation (the latter can begin before the former completes). This is expected due to parallelism, but I'd like sequential execution, so that theRunStatus value is first consumed bystatusSink, and thenrepeatTask is executed.

I would (probably) be able to achieve this if I could feedRunStatus values intostatusSink directly, without running the auxuliaryStream. However, I don't see a way to do this; is it possible?

Also, it's entirely possible that I'm trying to use inappropriate abstractions for the thing I'm trying to accomplish.

For example, I might have used astatusTask : Task[RunStatus] instead ofstatusSink, and just run this computation on each state change; however, I feel thatZSink better captures the intent: it's a "receiver of stream of events", which is exactly what I need, and aTask is just a generic "thing to execute", which doesn't capture the intent of repetition and sequentiality.

Also, I could try toreturn aZStream ofRunStatus values instead offeeding a providedZSink, but (1) it looks less flexible, and (2) I don't see a natural way to convert the composite task created inrepeatAndFeedStatus to a stream.

(This was alsoposted on StackOverflow. Let me know if such cross-posting is inappropriate.)

You must be logged in to vote

So apparently the answer to the question "Can I feed items into ZSink manually?" is "No".

However, thanks to@adamgfraser'ssuggestion, I was able to implement therepeatAndFeedStatus function via streams, seehere.

After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately.

Replies: 3 comments 10 replies

Comment options

If you want to run the sink before repeating you could do something likestatusQueue.offer(RunStatus.Starting) *> ZStream.fromQueue(statusQueue).run(statusSink) *> runLoop. I'm not sure how much this example is simplified from your actual use case but the sink doesn't seem to be doing a lot of work here and I wonder if you want to use it all versus a simple ZIO workflow or function returning a ZIO workflow.

You must be logged in to vote
7 replies
@pvgoran
Comment options

Signaling from the sink implementation is an unnecessary complication that it (the sink) shouldn't have to deal with. For now, I managed to serialize running of the sink and running ofrepeatTask insiderepeatAndFeedStatus with the help of anMVar and aSemaphore; it's not elegant (and possibly not performant), but it gets the job done in an encapsulated way (this implementation detail is nicely hidden insiderepeatAndFeedStatus).

I already considered using a ZIO instead of a sink (it's in the original question), but as I stated, if I do this I lose the semantics of repetition and sequentiality that streams/sinks have. So it's not an optimal solution in terms of expressiveness.

So, back to the original question: is it not possible to feed items into aZSink manually?

@adamgfraser
Comment options

You can feed items to a sink manually by offering them to a queue.ZStream.fromQueue(queue).run(sink).fork and thenqueue.offer.

@pvgoran
Comment options

You can feed items to a sink manually by offering them to a queue.ZStream.fromQueue(queue).run(sink).fork and thenqueue.offer.

Well, that's what I'm already doing, but it doesn't look manual to me because it's the stream that does the actual feeding, in its own fiber. (I suppose I should have written "directly" instead of "manually".) And it doesn't have the properties that I was hoping for. :(

Anyway, thanks for your input.

@adamgfraser
Comment options

Yeah, I think you are looking for this further property that writing to the sink doesn't return until the sink has "completed" processing that element and the most that could be guaranteed is that the sink has "started" processing the element. I feel like this problem could be more easily solved withZIO or possibly withSchedule.

@pvgoran
Comment options

Yeah, I think you are looking for this further property that writing to the sink doesn't return until the sink has "completed" processing that element and the most that could be guaranteed is that the sink has "started" processing the element.

Yes, this may be the case. I did further testing of the solution that I posted in my answer, and when I add a delay to the sink implementation, it doesnot currently work like I want it to. Maybe your other answer will lead to a better result.

Comment options

Isn't this just something like (working off your original example)?

importzio._importzio.stream._objectExample {sealedtraitRunStatusobjectRunStatus {caseobjectRunningextendsRunStatuscaseobjectStartingextendsRunStatus  }defrepeatAndFeedStatus(initTask:Task[Unit],repeatTask:Task[Unit],repeatDelay:Duration,retrySchedule:Schedule[Any,Throwable,Unit],statusSink:Sink[Throwable,RunStatus,Nothing,Unit]  ):Task[Unit]= {valrepeatSchedule:Schedule[Any,Any,Any]=Schedule.fixed(repeatDelay)valstream:ZStream[Any,Throwable,RunStatus]=ZStream.succeed(RunStatus.Starting)++ZStream.repeatZIOWithSchedule(initTask*> repeatTask*>ZIO.succeed(RunStatus.Running), repeatSchedule)++ZStream.fail(newIllegalStateException("Repeat isn't expected to complete"))    stream.run(statusSink)  }}

I think the fundamental problem is you are using a streaming data type here with a sink and that is going to work best if you use it with other streaming data types or if you don't want to do that just useZIO.

You must be logged in to vote
2 replies
@pvgoran
Comment options

So it's using aStream all the way... I wasn't sure this was possible (and I'm still not sure that I can make a version that also does error handling/retrying in addition to normal repetition) and I'm not familiar with the way streams are composed, but it may be worth a try.

@pvgoran
Comment options

I managed to implement the full functionality of the function via streams, as suggested:

defrepeatAndFeedStatus2(initTask :Task[Unit],repeatTask :Task[Unit],repeatDelay :Duration,retrySchedule :Schedule[Any,Throwable,Unit],statusSink :Sink[Throwable,RunStatus,Nothing,Nothing]):Task[Nothing]={valrepeatSchedule:Schedule[Any,Any,?]=Schedule.fixed(repeatDelay)Ref.make(false).flatMap { (failedRef :Ref[Boolean])=>valloopStream:ZStream[Any,Throwable,RunStatus]=ZStream.whenZIO(failedRef.get) {ZStream.succeed(RunStatus.Recovering) }++ZStream.fromZIO(initTask).drain++ZStream.repeatZIOWithSchedule(repeatTask*>ZIO.succeed(RunStatus.Running), repeatSchedule)++ZStream.fail(IllegalStateException("Repeat isn't expected to complete"))valloopWithRetryStream:ZStream[Any,Throwable,RunStatus]=      loopStream        .either        .flatMap[Any,Throwable,RunStatus] {caseRight(runStatus)=>ZStream.succeed(runStatus)caseLeft(ex)=>ZStream.fromZIO(failedRef.set(true)).drain++ZStream.succeed(RunStatus.Failing(ex,Instant.now()))++ZStream.fail(ex)        }        .retry(retrySchedule)    (ZStream.succeed(RunStatus.Starting)++ loopWithRetryStream).run(statusSink)  }}

This seems to be a satisfactory solution to the specific problem that I faced. It doesn't use additional state or parallelism, which is definitely a win. Error handling is unwieldy, but not too much.

Comment options

So apparently the answer to the question "Can I feed items into ZSink manually?" is "No".

However, thanks to@adamgfraser'ssuggestion, I was able to implement therepeatAndFeedStatus function via streams, seehere.

After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately.

You must be logged in to vote
1 reply
@pvgoran
Comment options

Here are my thoughts on coroutines:#7199

Answer selected bypvgoran
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Category
Q&A
Labels
None yet
2 participants
@pvgoran@adamgfraser

[8]ページ先頭

©2009-2025 Movatter.jp