Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbf89a2d

Browse files
committed
Stop async measurements on pubsub close
Refactor async measurement for immediate exit upon signalSigned-off-by: Danny Kopping <danny@coder.com>
1 parent722a233 commitbf89a2d

File tree

2 files changed

+36
-11
lines changed

2 files changed

+36
-11
lines changed

‎coderd/database/pubsub/latency.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
2020
channel uuid.UUID
2121
logger slog.Logger
2222

23+
// background measurement members
2324
collections atomic.Int64
2425
last atomic.Value
26+
asyncTick*time.Ticker
27+
stopchanstruct{}
2528
}
2629

2730
typeLatencyMeasurementstruct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3639
return&LatencyMeasurer{
3740
channel:uuid.New(),
3841
logger:logger,
42+
stop:make(chanstruct{},1),
3943
}
4044
}
4145

@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
4751
)
4852

4953
msg:= []byte(uuid.New().String())
54+
lm.logger.Debug(ctx,"performing measurement",slog.F("msg",msg))
5055

5156
cancel,err:=p.Subscribe(lm.latencyChannelName(),func(ctx context.Context,in []byte) {
5257
if!bytes.Equal(in,msg) {
@@ -81,23 +86,32 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
8186
// MeasureAsync runs latency measurements asynchronously on a given interval.
8287
// This function is expected to be run in a goroutine and will exit when the context is canceled.
8388
func (lm*LatencyMeasurer)MeasureAsync(ctx context.Context,pPubsub,interval time.Duration) {
84-
tick:=time.NewTicker(interval)
85-
defertick.Stop()
86-
87-
for ;true;<-tick.C {// tick immediately
88-
select {
89-
case<-ctx.Done():
89+
lm.asyncTick=time.NewTicker(interval)
90+
deferlm.asyncTick.Stop()
91+
92+
loop:
93+
for {
94+
// run immediately on first call, then sleep a tick before each invocation
95+
ifp==nil {
96+
lm.logger.Error(ctx,"given pubsub is nil")
9097
return
91-
default:
92-
ifp==nil {
93-
lm.logger.Error(ctx,"given pubsub is nil")
94-
return
95-
}
9698
}
9799

98100
lm.collections.Add(1)
99101
measure:=lm.Measure(ctx,p)
100102
lm.last.Store(&measure)
103+
104+
select {
105+
case<-lm.asyncTick.C:
106+
continue
107+
108+
// bail out if signaled
109+
case<-lm.stop:
110+
break loop
111+
case<-ctx.Done():
112+
lm.logger.Debug(ctx,"async measurement context canceled",slog.Error(ctx.Err()))
113+
break loop
114+
}
101115
}
102116
}
103117

@@ -115,6 +129,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115129
returnlm.collections.Load()
116130
}
117131

132+
// Stop stops any background measurements.
133+
func (lm*LatencyMeasurer)Stop() {
134+
iflm.asyncTick==nil {
135+
return
136+
}
137+
lm.asyncTick.Stop()
138+
lm.stop<-struct{}{}
139+
}
140+
118141
func (lm*LatencyMeasurer)latencyChannelName()string {
119142
returnfmt.Sprintf("latency-measure:%s",lm.channel)
120143
}

‎coderd/database/pubsub/pubsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ func (p *PGPubsub) Close() error {
311311
err:=p.closeListener()
312312
<-p.listenDone
313313
p.logger.Debug(context.Background(),"pubsub closed")
314+
p.latencyMeasurer.Stop()
315+
p.logger.Debug(context.Background(),"background latency measurement has stopped")
314316
returnerr
315317
}
316318

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp