Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitff73789

Browse files
committed
Stop async measurements on pubsub close
Refactor async measurement for immediate exit upon signalSigned-off-by: Danny Kopping <danny@coder.com>
1 parent722a233 commitff73789

File tree

2 files changed

+34
-10
lines changed

2 files changed

+34
-10
lines changed

‎coderd/database/pubsub/latency.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ type LatencyMeasurer struct {
2020
channel uuid.UUID
2121
logger slog.Logger
2222

23+
// background measurement members
2324
collections atomic.Int64
2425
last atomic.Value
26+
asyncTick*time.Ticker
27+
stopchanstruct{}
2528
}
2629

2730
typeLatencyMeasurementstruct {
@@ -36,6 +39,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3639
return&LatencyMeasurer{
3740
channel:uuid.New(),
3841
logger:logger,
42+
stop:make(chanstruct{},1),
3943
}
4044
}
4145

@@ -47,6 +51,7 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
4751
)
4852

4953
msg:= []byte(uuid.New().String())
54+
lm.logger.Debug(ctx,"performing measurement",slog.F("msg",msg))
5055

5156
cancel,err:=p.Subscribe(lm.latencyChannelName(),func(ctx context.Context,in []byte) {
5257
if!bytes.Equal(in,msg) {
@@ -81,23 +86,31 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
8186
// MeasureAsync runs latency measurements asynchronously on a given interval.
8287
// This function is expected to be run in a goroutine and will exit when the context is canceled.
8388
func (lm*LatencyMeasurer)MeasureAsync(ctx context.Context,pPubsub,interval time.Duration) {
84-
tick:=time.NewTicker(interval)
85-
defertick.Stop()
89+
lm.asyncTick=time.NewTicker(interval)
90+
deferlm.asyncTick.Stop()
8691

87-
for ;true;<-tick.C {// tick immediately
88-
select {
89-
case<-ctx.Done():
92+
for {
93+
// run immediately on first call, then sleep a tick before each invocation
94+
ifp==nil {
95+
lm.logger.Error(ctx,"given pubsub is nil")
9096
return
91-
default:
92-
ifp==nil {
93-
lm.logger.Error(ctx,"given pubsub is nil")
94-
return
95-
}
9697
}
9798

9899
lm.collections.Add(1)
99100
measure:=lm.Measure(ctx,p)
100101
lm.last.Store(&measure)
102+
103+
select {
104+
case<-lm.asyncTick.C:
105+
continue
106+
107+
// bail out if signaled
108+
case<-lm.stop:
109+
return
110+
case<-ctx.Done():
111+
lm.logger.Debug(ctx,"async measurement context canceled",slog.Error(ctx.Err()))
112+
return
113+
}
101114
}
102115
}
103116

@@ -115,6 +128,15 @@ func (lm *LatencyMeasurer) MeasurementCount() int64 {
115128
returnlm.collections.Load()
116129
}
117130

131+
// Stop stops any background measurements.
132+
func (lm*LatencyMeasurer)Stop() {
133+
iflm.asyncTick==nil {
134+
return
135+
}
136+
lm.asyncTick.Stop()
137+
lm.stop<-struct{}{}
138+
}
139+
118140
func (lm*LatencyMeasurer)latencyChannelName()string {
119141
returnfmt.Sprintf("latency-measure:%s",lm.channel)
120142
}

‎coderd/database/pubsub/pubsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,8 @@ func (p *PGPubsub) Close() error {
311311
err:=p.closeListener()
312312
<-p.listenDone
313313
p.logger.Debug(context.Background(),"pubsub closed")
314+
p.latencyMeasurer.Stop()
315+
p.logger.Debug(context.Background(),"background latency measurement has stopped")
314316
returnerr
315317
}
316318

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp