Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita7c042f

Browse files
committed
Measure latency in the background
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent68412c8 commita7c042f

File tree

6 files changed

+132
-64
lines changed

6 files changed

+132
-64
lines changed

‎cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
678678
}()
679679

680680
options.Database=database.New(sqlDB)
681-
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
681+
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL,pubsub.LatencyMeasureInterval)
682682
iferr!=nil {
683683
returnxerrors.Errorf("create pubsub: %w",err)
684684
}

‎coderd/database/dbtestutil/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
127127
}
128128
db=database.New(sqlDB)
129129

130-
ps,err=pubsub.New(context.Background(),o.logger,sqlDB,connectionURL)
130+
ps,err=pubsub.New(context.Background(),o.logger,sqlDB,connectionURL,pubsub.LatencyMeasureInterval)
131131
require.NoError(t,err)
132132
t.Cleanup(func() {
133133
_=ps.Close()

‎coderd/database/pubsub/latency.go

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"sync/atomic"
78
"time"
89

910
"github.com/google/uuid"
@@ -18,6 +19,14 @@ type LatencyMeasurer struct {
1819
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
1920
channel uuid.UUID
2021
logger slog.Logger
22+
23+
collections atomic.Int64
24+
last atomic.Value
25+
}
26+
27+
typeLatencyMeasurementstruct {
28+
Send,Recv time.Duration
29+
Errerror
2130
}
2231

2332
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
@@ -31,10 +40,10 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3140
}
3241

3342
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
34-
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub)(sendfloat64,recvfloat64,errerror) {
43+
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub)LatencyMeasurement {
3544
var (
3645
start time.Time
37-
res=make(chanfloat64,1)
46+
res=make(chantime.Duration,1)
3847
)
3948

4049
msg:= []byte(uuid.New().String())
@@ -45,28 +54,65 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64,
4554
return
4655
}
4756

48-
res<-time.Since(start).Seconds()
57+
res<-time.Since(start)
4958
})
5059
iferr!=nil {
51-
return-1,-1,xerrors.Errorf("failed to subscribe: %w",err)
60+
returnLatencyMeasurement{Send:-1,Recv:-1,Err:xerrors.Errorf("failed to subscribe: %w",err)}
5261
}
5362
defercancel()
5463

5564
start=time.Now()
5665
err=p.Publish(lm.latencyChannelName(),msg)
5766
iferr!=nil {
58-
return-1,-1,xerrors.Errorf("failed to publish: %w",err)
67+
returnLatencyMeasurement{Send:-1,Recv:-1,Err:xerrors.Errorf("failed to publish: %w",err)}
5968
}
6069

61-
send=time.Since(start).Seconds()
70+
send:=time.Since(start)
6271

6372
select {
6473
case<-ctx.Done():
6574
lm.logger.Error(ctx,"context canceled before message could be received",slog.Error(ctx.Err()),slog.F("msg",msg))
66-
returnsend,-1,ctx.Err()
67-
caseval:=<-res:
68-
returnsend,val,nil
75+
returnLatencyMeasurement{Send:send,Recv:-1,Err:ctx.Err()}
76+
caserecv:=<-res:
77+
returnLatencyMeasurement{Send:send,Recv:recv}
78+
}
79+
}
80+
81+
// MeasureAsync runs latency measurements asynchronously on a given interval.
82+
// This function is expected to be run in a goroutine and will exit when the context is canceled.
83+
func (lm*LatencyMeasurer)MeasureAsync(ctx context.Context,pPubsub,interval time.Duration) {
84+
tick:=time.NewTicker(interval)
85+
defertick.Stop()
86+
87+
for ;true;<-tick.C {// tick immediately
88+
select {
89+
case<-ctx.Done():
90+
return
91+
default:
92+
ifp==nil {
93+
lm.logger.Error(ctx,"given pubsub is nil")
94+
return
95+
}
96+
}
97+
98+
lm.collections.Add(1)
99+
measure:=lm.Measure(ctx,p)
100+
lm.last.Store(&measure)
101+
}
102+
}
103+
104+
func (lm*LatencyMeasurer)LastMeasurement()*LatencyMeasurement {
105+
val:=lm.last.Load()
106+
ifval==nil {
107+
returnnil
69108
}
109+
110+
// nolint:forcetypeassert // Unnecessary type check.
111+
returnval.(*LatencyMeasurement)
112+
}
113+
114+
func (lm*LatencyMeasurer)MeasurementCount()int64 {
115+
returnlm.collections.Load()
70116
}
71117

72118
func (lm*LatencyMeasurer)latencyChannelName()string {

‎coderd/database/pubsub/pubsub.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
"sync"
1010
"time"
1111

12+
"sync/atomic"
13+
1214
"github.com/google/uuid"
1315
"github.com/lib/pq"
1416
"github.com/prometheus/client_golang/prometheus"
15-
"go.uber.org/atomic"
1617
"golang.org/x/xerrors"
1718

1819
"cdr.dev/slog"
@@ -29,6 +30,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
2930
// might have been dropped.
3031
varErrDroppedMessages=xerrors.New("dropped messages")
3132

33+
// LatencyMeasureInterval defines how often to trigger a new background latency measurement.
34+
constLatencyMeasureInterval=time.Second*10
35+
3236
// Pubsub is a generic interface for broadcasting and receiving messages.
3337
// Implementors should assume high-availability with the backing implementation.
3438
typePubsubinterface {
@@ -208,7 +212,7 @@ type PGPubsub struct {
208212
connected prometheus.Gauge
209213

210214
latencyMeasurer*LatencyMeasurer
211-
latencyErrCounter atomic.Float64
215+
latencyErrCounter atomic.Int64
212216
}
213217

214218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -494,6 +498,11 @@ var (
494498
"The time taken to receive a message from a pubsub event channel",
495499
nil,nil,
496500
)
501+
pubsubLatencyMeasureCountDesc=prometheus.NewDesc(
502+
"coder_pubsub_latency_measures_total",
503+
"The number of pubsub latency measurements",
504+
nil,nil,
505+
)
497506
pubsubLatencyMeasureErrDesc=prometheus.NewDesc(
498507
"coder_pubsub_latency_measure_errs_total",
499508
"The number of pubsub latency measurement failures",
@@ -531,6 +540,7 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
531540
// additional metrics
532541
descs<-pubsubSendLatencyDesc
533542
descs<-pubsubRecvLatencyDesc
543+
descs<-pubsubLatencyMeasureCountDesc
534544
descs<-pubsubLatencyMeasureErrDesc
535545
}
536546

@@ -558,27 +568,32 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
558568
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
559569

560570
// additional metrics
561-
ctx,cancel:=context.WithTimeout(context.Background(),time.Second*10)
562-
defercancel()
571+
latency:=p.latencyMeasurer.LastMeasurement()
572+
iflatency==nil {
573+
p.logger.Debug(context.Background(),"latency measurement not completed yet")
574+
return
575+
}
563576

564-
send,recv,err:=p.latencyMeasurer.Measure(ctx,p)
565-
iferr!=nil {
566-
p.logger.Warn(context.Background(),"failed to measure latency",slog.Error(err))
567-
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc,prometheus.CounterValue,p.latencyErrCounter.Add(1))
577+
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc,prometheus.CounterValue,float64(p.latencyMeasurer.MeasurementCount()))
578+
iflatency.Err!=nil {
579+
p.logger.Warn(context.Background(),"failed to measure latency",slog.Error(latency.Err))
580+
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc,prometheus.CounterValue,float64(p.latencyErrCounter.Add(1)))
568581
return
569582
}
570-
metrics<-prometheus.MustNewConstMetric(pubsubSendLatencyDesc,prometheus.GaugeValue,send)
571-
metrics<-prometheus.MustNewConstMetric(pubsubRecvLatencyDesc,prometheus.GaugeValue,recv)
583+
metrics<-prometheus.MustNewConstMetric(pubsubSendLatencyDesc,prometheus.GaugeValue,latency.Send.Seconds())
584+
metrics<-prometheus.MustNewConstMetric(pubsubRecvLatencyDesc,prometheus.GaugeValue,latency.Recv.Seconds())
572585
}
573586

574587
// New creates a new Pubsub implementation using a PostgreSQL connection.
575-
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (*PGPubsub,error) {
588+
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring,latencyMeasureInterval time.Duration) (*PGPubsub,error) {
576589
p:=newWithoutListener(logger,database)
577590
iferr:=p.startListener(startCtx,connectURL);err!=nil {
578591
returnnil,err
579592
}
580593
gop.listen()
581594
logger.Info(startCtx,"pubsub has started")
595+
gop.latencyMeasurer.MeasureAsync(startCtx,p,latencyMeasureInterval)
596+
logger.Debug(startCtx,"background latency measurement has started")
582597
returnp,nil
583598
}
584599

‎coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestPubsub(t *testing.T) {
4646
db,err:=sql.Open("postgres",connectionURL)
4747
require.NoError(t,err)
4848
deferdb.Close()
49-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
49+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
5050
require.NoError(t,err)
5151
deferpubsub.Close()
5252
event:="test"
@@ -75,7 +75,7 @@ func TestPubsub(t *testing.T) {
7575
db,err:=sql.Open("postgres",connectionURL)
7676
require.NoError(t,err)
7777
deferdb.Close()
78-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
78+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
7979
require.NoError(t,err)
8080
deferpubsub.Close()
8181
cancelFunc()
@@ -91,7 +91,7 @@ func TestPubsub(t *testing.T) {
9191
db,err:=sql.Open("postgres",connectionURL)
9292
require.NoError(t,err)
9393
deferdb.Close()
94-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
94+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
9595
require.NoError(t,err)
9696
deferpubsub.Close()
9797

@@ -128,7 +128,7 @@ func TestPubsub_ordering(t *testing.T) {
128128
db,err:=sql.Open("postgres",connectionURL)
129129
require.NoError(t,err)
130130
deferdb.Close()
131-
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
131+
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
132132
require.NoError(t,err)
133133
deferps.Close()
134134
event:="test"
@@ -177,7 +177,7 @@ func TestPubsub_Disconnect(t *testing.T) {
177177
ctx,cancelFunc:=context.WithTimeout(context.Background(),testutil.WaitSuperLong)
178178
defercancelFunc()
179179
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
180-
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
180+
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
181181
require.NoError(t,err)
182182
deferps.Close()
183183
event:="test"
@@ -309,7 +309,7 @@ func TestMeasureLatency(t *testing.T) {
309309
require.NoError(t,err)
310310
db,err:=sql.Open("postgres",connectionURL)
311311
require.NoError(t,err)
312-
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
312+
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
313313
require.NoError(t,err)
314314

315315
returnps,func() {
@@ -330,10 +330,10 @@ func TestMeasureLatency(t *testing.T) {
330330
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
331331
defercancel()
332332

333-
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
334-
require.NoError(t,err)
335-
require.Greater(t,send,0.0)
336-
require.Greater(t,recv,0.0)
333+
l:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
334+
require.NoError(t,l.Err)
335+
require.Greater(t,l.Send.Seconds(),0.0)
336+
require.Greater(t,l.Recv.Seconds(),0.0)
337337
})
338338

339339
t.Run("MeasureLatencyRecvTimeout",func(t*testing.T) {
@@ -343,13 +343,13 @@ func TestMeasureLatency(t *testing.T) {
343343
ps,done:=newPubsub()
344344
deferdone()
345345

346-
ctx,cancel:=context.WithDeadline(context.Background(),time.Now().Add(-time.Second))
346+
ctx,cancel:=context.WithDeadline(context.Background(),time.Now().Add(-time.Minute))
347347
defercancel()
348348

349-
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
350-
require.ErrorContains(t,err,context.DeadlineExceeded.Error())
351-
require.Greater(t,send,0.0)
352-
require.EqualValues(t,recv,-1)
349+
l:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
350+
require.ErrorContains(t,l.Err,context.DeadlineExceeded.Error())
351+
require.Greater(t,l.Send.Seconds(),0.0)
352+
require.EqualValues(t,l.Recv,time.Duration(-1))
353353
})
354354

355355
t.Run("MeasureLatencyNotifyRace",func(t*testing.T) {
@@ -378,10 +378,10 @@ func TestMeasureLatency(t *testing.T) {
378378
deferwg.Done()
379379

380380
hold<-struct{}{}
381-
send,recv,err:=lm.Measure(ctx,slow)
382-
assert.NoError(t,err)
383-
assert.Greater(t,send,0.0)
384-
assert.Greater(t,recv,0.0)
381+
l:=lm.Measure(ctx,slow)
382+
assert.NoError(t,l.Err)
383+
assert.Greater(t,l.Send.Seconds(),0.0)
384+
assert.Greater(t,l.Recv.Seconds(),0.0)
385385

386386
// The slow subscriber will complete first and receive the fast publisher's message first.
387387
logger.Sync()
@@ -396,10 +396,10 @@ func TestMeasureLatency(t *testing.T) {
396396
// Force fast publisher to start after the slow one to avoid flakiness.
397397
<-hold
398398
time.Sleep(time.Millisecond*50)
399-
send,recv,err:=lm.Measure(ctx,fast)
400-
assert.NoError(t,err)
401-
assert.Greater(t,send,0.0)
402-
assert.Greater(t,recv,0.0)
399+
l:=lm.Measure(ctx,fast)
400+
assert.NoError(t,l.Err)
401+
assert.Greater(t,l.Send.Seconds(),0.0)
402+
assert.Greater(t,l.Recv.Seconds(),0.0)
403403
}()
404404

405405
wg.Wait()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp