- Notifications
You must be signed in to change notification settings - Fork1.3k
-
I have a task combinator that repeatedly executes the task, retries if it fails, and reports the execution status to the provided ZSink. The simplified code (without retrying, for brevity):
I have some troubles with the "report" part. As you can see, I use a
This works, but the I would (probably) be able to achieve this if I could feed Also, it's entirely possible that I'm trying to use inappropriate abstractions for the thing I'm trying to accomplish. For example, I might have used a Also, I could try toreturn a (This was alsoposted on StackOverflow. Let me know if such cross-posting is inappropriate.) |
BetaWas this translation helpful?Give feedback.
All reactions
So apparently the answer to the question "Can I feed items into ZSink manually?" is "No".
However, thanks to@adamgfraser'ssuggestion, I was able to implement therepeatAndFeedStatus
function via streams, seehere.
After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately.
Replies: 3 comments 10 replies
-
If you want to run the sink before repeating you could do something like |
BetaWas this translation helpful?Give feedback.
All reactions
-
Signaling from the sink implementation is an unnecessary complication that it (the sink) shouldn't have to deal with. For now, I managed to serialize running of the sink and running of I already considered using a ZIO instead of a sink (it's in the original question), but as I stated, if I do this I lose the semantics of repetition and sequentiality that streams/sinks have. So it's not an optimal solution in terms of expressiveness. So, back to the original question: is it not possible to feed items into a |
BetaWas this translation helpful?Give feedback.
All reactions
-
You can feed items to a sink manually by offering them to a queue. |
BetaWas this translation helpful?Give feedback.
All reactions
-
Well, that's what I'm already doing, but it doesn't look manual to me because it's the stream that does the actual feeding, in its own fiber. (I suppose I should have written "directly" instead of "manually".) And it doesn't have the properties that I was hoping for. :( Anyway, thanks for your input. |
BetaWas this translation helpful?Give feedback.
All reactions
-
Yeah, I think you are looking for this further property that writing to the sink doesn't return until the sink has "completed" processing that element and the most that could be guaranteed is that the sink has "started" processing the element. I feel like this problem could be more easily solved with |
BetaWas this translation helpful?Give feedback.
All reactions
-
Yes, this may be the case. I did further testing of the solution that I posted in my answer, and when I add a delay to the sink implementation, it doesnot currently work like I want it to. Maybe your other answer will lead to a better result. |
BetaWas this translation helpful?Give feedback.
All reactions
-
Isn't this just something like (working off your original example)? importzio._importzio.stream._objectExample {sealedtraitRunStatusobjectRunStatus {caseobjectRunningextendsRunStatuscaseobjectStartingextendsRunStatus }defrepeatAndFeedStatus(initTask:Task[Unit],repeatTask:Task[Unit],repeatDelay:Duration,retrySchedule:Schedule[Any,Throwable,Unit],statusSink:Sink[Throwable,RunStatus,Nothing,Unit] ):Task[Unit]= {valrepeatSchedule:Schedule[Any,Any,Any]=Schedule.fixed(repeatDelay)valstream:ZStream[Any,Throwable,RunStatus]=ZStream.succeed(RunStatus.Starting)++ZStream.repeatZIOWithSchedule(initTask*> repeatTask*>ZIO.succeed(RunStatus.Running), repeatSchedule)++ZStream.fail(newIllegalStateException("Repeat isn't expected to complete")) stream.run(statusSink) }} I think the fundamental problem is you are using a streaming data type here with a sink and that is going to work best if you use it with other streaming data types or if you don't want to do that just use |
BetaWas this translation helpful?Give feedback.
All reactions
-
So it's using a |
BetaWas this translation helpful?Give feedback.
All reactions
-
I managed to implement the full functionality of the function via streams, as suggested: defrepeatAndFeedStatus2(initTask :Task[Unit],repeatTask :Task[Unit],repeatDelay :Duration,retrySchedule :Schedule[Any,Throwable,Unit],statusSink :Sink[Throwable,RunStatus,Nothing,Nothing]):Task[Nothing]={valrepeatSchedule:Schedule[Any,Any,?]=Schedule.fixed(repeatDelay)Ref.make(false).flatMap { (failedRef :Ref[Boolean])=>valloopStream:ZStream[Any,Throwable,RunStatus]=ZStream.whenZIO(failedRef.get) {ZStream.succeed(RunStatus.Recovering) }++ZStream.fromZIO(initTask).drain++ZStream.repeatZIOWithSchedule(repeatTask*>ZIO.succeed(RunStatus.Running), repeatSchedule)++ZStream.fail(IllegalStateException("Repeat isn't expected to complete"))valloopWithRetryStream:ZStream[Any,Throwable,RunStatus]= loopStream .either .flatMap[Any,Throwable,RunStatus] {caseRight(runStatus)=>ZStream.succeed(runStatus)caseLeft(ex)=>ZStream.fromZIO(failedRef.set(true)).drain++ZStream.succeed(RunStatus.Failing(ex,Instant.now()))++ZStream.fail(ex) } .retry(retrySchedule) (ZStream.succeed(RunStatus.Starting)++ loopWithRetryStream).run(statusSink) }} This seems to be a satisfactory solution to the specific problem that I faced. It doesn't use additional state or parallelism, which is definitely a win. Error handling is unwieldy, but not too much. |
BetaWas this translation helpful?Give feedback.
All reactions
-
So apparently the answer to the question "Can I feed items into ZSink manually?" is "No". However, thanks to@adamgfraser'ssuggestion, I was able to implement the After some more thought, it occured to me that I actually wanted some kind of coroutine mechanics, using a sink as "something to give intermediate results to". I'll write about this separately. |
BetaWas this translation helpful?Give feedback.
All reactions
-
Here are my thoughts on coroutines:#7199 |
BetaWas this translation helpful?Give feedback.