@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
20
20
channel uuid.UUID
21
21
logger slog.Logger
22
22
23
+ // background measurement members
23
24
collections atomic.Int64
24
25
last atomic.Value
26
+ asyncTick * time.Ticker
27
+ stop chan struct {}
25
28
}
26
29
27
30
type LatencyMeasurement struct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
36
39
return & LatencyMeasurer {
37
40
channel :uuid .New (),
38
41
logger :logger ,
42
+ stop :make (chan struct {},1 ),
39
43
}
40
44
}
41
45
@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
47
51
)
48
52
49
53
msg := []byte (uuid .New ().String ())
54
+ lm .logger .Debug (ctx ,"performing measurement" ,slog .F ("msg" ,msg ))
50
55
51
56
cancel ,err := p .Subscribe (lm .latencyChannelName (),func (ctx context.Context ,in []byte ) {
52
57
if ! bytes .Equal (in ,msg ) {
@@ -81,23 +86,31 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
81
86
// MeasureAsync runs latency measurements asynchronously on a given interval.
82
87
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83
88
func (lm * LatencyMeasurer )MeasureAsync (ctx context.Context ,p Pubsub ,interval time.Duration ) {
84
- tick : =time .NewTicker (interval )
85
- defer tick .Stop ()
89
+ lm . asyncTick = time .NewTicker (interval )
90
+ defer lm . asyncTick .Stop ()
86
91
87
- for ;true ;<- tick .C {// tick immediately
88
- select {
89
- case <- ctx .Done ():
92
+ for {
93
+ // run immediately on first call, then sleep a tick before each invocation
94
+ if p == nil {
95
+ lm .logger .Error (ctx ,"given pubsub is nil" )
90
96
return
91
- default :
92
- if p == nil {
93
- lm .logger .Error (ctx ,"given pubsub is nil" )
94
- return
95
- }
96
97
}
97
98
98
99
lm .collections .Add (1 )
99
100
measure := lm .Measure (ctx ,p )
100
101
lm .last .Store (& measure )
102
+
103
+ select {
104
+ case <- lm .asyncTick .C :
105
+ continue
106
+
107
+ // bail out if signaled
108
+ case <- lm .stop :
109
+ return
110
+ case <- ctx .Done ():
111
+ lm .logger .Debug (ctx ,"async measurement context canceled" ,slog .Error (ctx .Err ()))
112
+ return
113
+ }
101
114
}
102
115
}
103
116
@@ -115,6 +128,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115
128
return lm .collections .Load ()
116
129
}
117
130
131
+ // Stop stops any background measurements.
132
+ func (lm * LatencyMeasurer )Stop () {
133
+ if lm .asyncTick == nil {
134
+ return
135
+ }
136
+ lm .asyncTick .Stop ()
137
+ lm .stop <- struct {}{}
138
+ }
139
+
118
140
func (lm * LatencyMeasurer )latencyChannelName ()string {
119
141
return fmt .Sprintf ("latency-measure:%s" ,lm .channel )
120
142
}