@@ -3,6 +3,7 @@ package pubsub
3
3
import (
4
4
"bytes"
5
5
"context"
6
+ "errors"
6
7
"fmt"
7
8
"sync/atomic"
8
9
"time"
@@ -23,8 +24,7 @@ type LatencyMeasurer struct {
23
24
// background measurement members
24
25
collections atomic.Int64
25
26
last atomic.Value
26
- asyncTick * time.Ticker
27
- stop chan struct {}
27
+ asyncCancel context.CancelCauseFunc
28
28
}
29
29
30
30
type LatencyMeasurement struct {
@@ -39,7 +39,6 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
39
39
return & LatencyMeasurer {
40
40
channel :uuid .New (),
41
41
logger :logger ,
42
- stop :make (chan struct {},1 ),
43
42
}
44
43
}
45
44
@@ -86,8 +85,11 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
86
85
// MeasureAsync runs latency measurements asynchronously on a given interval.
87
86
// This function is expected to be run in a goroutine and will exit when the context is canceled.
88
87
func (lm * LatencyMeasurer )MeasureAsync (ctx context.Context ,p Pubsub ,interval time.Duration ) {
89
- lm .asyncTick = time .NewTicker (interval )
90
- defer lm .asyncTick .Stop ()
88
+ tick := time .NewTicker (interval )
89
+ defer tick .Stop ()
90
+
91
+ ctx ,cancel := context .WithCancelCause (ctx )
92
+ lm .asyncCancel = cancel
91
93
92
94
for {
93
95
// run immediately on first call, then sleep a tick before each invocation
@@ -101,14 +103,12 @@ func (lm *LatencyMeasurer) MeasureAsync(ctx context.Context, p Pubsub, interval
101
103
lm .last .Store (& measure )
102
104
103
105
select {
104
- case <- lm . asyncTick .C :
106
+ case <- tick .C :
105
107
continue
106
108
107
109
// bail out if signaled
108
- case <- lm .stop :
109
- return
110
110
case <- ctx .Done ():
111
- lm .logger .Debug (ctx ,"async measurementcontext canceled " ,slog .Error (ctx .Err ()))
111
+ lm .logger .Debug (ctx ,"async measurementcancelled " ,slog .Error (ctx .Err ()))
112
112
return
113
113
}
114
114
}
@@ -130,11 +130,9 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
130
130
131
131
// Stop stops any background measurements.
132
132
func (lm * LatencyMeasurer )Stop () {
133
- if lm .asyncTick = =nil {
134
- return
133
+ if lm .asyncCancel ! =nil {
134
+ lm . asyncCancel ( errors . New ( "stopped" ))
135
135
}
136
- lm .asyncTick .Stop ()
137
- lm .stop <- struct {}{}
138
136
}
139
137
140
138
func (lm * LatencyMeasurer )latencyChannelName ()string {