@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
20
20
channel uuid.UUID
21
21
logger slog.Logger
22
22
23
+ // background measurement members
23
24
collections atomic.Int64
24
25
last atomic.Value
26
+ asyncTick * time.Ticker
27
+ stop chan struct {}
25
28
}
26
29
27
30
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
36
39
return & LatencyMeasurer {
37
40
channel :uuid .New (),
38
41
logger :logger ,
42
+ stop :make (chan struct {},1 ),
39
43
}
40
44
}
41
45
@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
47
51
)
48
52
49
53
msg := []byte (uuid .New ().String ())
54
+ lm .logger .Debug (ctx ,"performing measurement" ,slog .F ("msg" ,msg ))
50
55
51
56
cancel ,err := p .Subscribe (lm .latencyChannelName (),func (ctx context.Context ,in []byte ) {
52
57
if ! bytes .Equal (in ,msg ) {
@@ -81,23 +86,32 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
81
86
// MeasureAsync runs latency measurements asynchronously on a given interval.
82
87
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83
88
func (lm * LatencyMeasurer )MeasureAsync (ctx context.Context ,p Pubsub ,interval time.Duration ) {
84
- tick := time .NewTicker (interval )
85
- defer tick .Stop ()
86
-
87
- for ;true ;<- tick .C {// tick immediately
88
- select {
89
- case <- ctx .Done ():
89
+ lm .asyncTick = time .NewTicker (interval )
90
+ defer lm .asyncTick .Stop ()
91
+
92
+ loop:
93
+ for {
94
+ // run immediately on first call, then sleep a tick before each invocation
95
+ if p == nil {
96
+ lm .logger .Error (ctx ,"given pubsub is nil" )
90
97
return
91
- default :
92
- if p == nil {
93
- lm .logger .Error (ctx ,"given pubsub is nil" )
94
- return
95
- }
96
98
}
97
99
98
100
lm .collections .Add (1 )
99
101
measure := lm .Measure (ctx ,p )
100
102
lm .last .Store (& measure )
103
+
104
+ select {
105
+ case <- lm .asyncTick .C :
106
+ continue
107
+
108
+ // bail out if signaled
109
+ case <- lm .stop :
110
+ break loop
111
+ case <- ctx .Done ():
112
+ lm .logger .Debug (ctx ,"async measurement context canceled" ,slog .Error (ctx .Err ()))
113
+ break loop
114
+ }
101
115
}
102
116
}
103
117
@@ -115,6 +129,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115
129
return lm .collections .Load ()
116
130
}
117
131
132
+ // Stop stops any background measurements.
133
+ func (lm * LatencyMeasurer )Stop () {
134
+ if lm .asyncTick == nil {
135
+ return
136
+ }
137
+ lm .asyncTick .Stop ()
138
+ lm .stop <- struct {}{}
139
+ }
140
+
118
141
func (lm * LatencyMeasurer )latencyChannelName ()string {
119
142
return fmt .Sprintf ("latency-measure:%s" ,lm .channel )
120
143
}