Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4671ebb

Browse files
authored
feat: measure pubsub latencies and expose metrics (#13126)
1 parente14f8fb commit4671ebb

File tree

5 files changed

+326
-38
lines changed

5 files changed

+326
-38
lines changed

‎coderd/database/pubsub/latency.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package pubsub
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"golang.org/x/xerrors"
11+
12+
"cdr.dev/slog"
13+
)
14+
15+
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
16+
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
17+
typeLatencyMeasurerstruct {
18+
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
19+
channel uuid.UUID
20+
logger slog.Logger
21+
}
22+
23+
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
24+
constLatencyMessageLength=36
25+
26+
funcNewLatencyMeasurer(logger slog.Logger)*LatencyMeasurer {
27+
return&LatencyMeasurer{
28+
channel:uuid.New(),
29+
logger:logger,
30+
}
31+
}
32+
33+
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
34+
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub) (send,recv time.Duration,errerror) {
35+
var (
36+
start time.Time
37+
res=make(chan time.Duration,1)
38+
)
39+
40+
msg:= []byte(uuid.New().String())
41+
lm.logger.Debug(ctx,"performing measurement",slog.F("msg",msg))
42+
43+
cancel,err:=p.Subscribe(lm.latencyChannelName(),func(ctx context.Context,in []byte) {
44+
if!bytes.Equal(in,msg) {
45+
lm.logger.Warn(ctx,"received unexpected message",slog.F("got",in),slog.F("expected",msg))
46+
return
47+
}
48+
49+
res<-time.Since(start)
50+
})
51+
iferr!=nil {
52+
return-1,-1,xerrors.Errorf("failed to subscribe: %w",err)
53+
}
54+
defercancel()
55+
56+
start=time.Now()
57+
err=p.Publish(lm.latencyChannelName(),msg)
58+
iferr!=nil {
59+
return-1,-1,xerrors.Errorf("failed to publish: %w",err)
60+
}
61+
62+
send=time.Since(start)
63+
select {
64+
case<-ctx.Done():
65+
lm.logger.Error(ctx,"context canceled before message could be received",slog.Error(ctx.Err()),slog.F("msg",msg))
66+
returnsend,-1,ctx.Err()
67+
caserecv=<-res:
68+
returnsend,recv,nil
69+
}
70+
}
71+
72+
func (lm*LatencyMeasurer)latencyChannelName()string {
73+
returnfmt.Sprintf("latency-measure:%s",lm.channel)
74+
}

‎coderd/database/pubsub/pubsub.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/google/uuid"
@@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
2829
// might have been dropped.
2930
varErrDroppedMessages=xerrors.New("dropped messages")
3031

32+
// LatencyMeasureTimeout defines how often to trigger a new background latency measurement.
33+
constLatencyMeasureTimeout=time.Second*10
34+
3135
// Pubsub is a generic interface for broadcasting and receiving messages.
3236
// Implementors should assume high-availability with the backing implementation.
3337
typePubsubinterface {
@@ -205,6 +209,10 @@ type PGPubsub struct {
205209
receivedBytesTotal prometheus.Counter
206210
disconnectionsTotal prometheus.Counter
207211
connected prometheus.Gauge
212+
213+
latencyMeasurer*LatencyMeasurer
214+
latencyMeasureCounter atomic.Int64
215+
latencyErrCounter atomic.Int64
208216
}
209217

210218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -478,6 +486,30 @@ var (
478486
)
479487
)
480488

489+
// additional metrics collected out-of-band
490+
var (
491+
pubsubSendLatencyDesc=prometheus.NewDesc(
492+
"coder_pubsub_send_latency_seconds",
493+
"The time taken to send a message into a pubsub event channel",
494+
nil,nil,
495+
)
496+
pubsubRecvLatencyDesc=prometheus.NewDesc(
497+
"coder_pubsub_receive_latency_seconds",
498+
"The time taken to receive a message from a pubsub event channel",
499+
nil,nil,
500+
)
501+
pubsubLatencyMeasureCountDesc=prometheus.NewDesc(
502+
"coder_pubsub_latency_measures_total",
503+
"The number of pubsub latency measurements",
504+
nil,nil,
505+
)
506+
pubsubLatencyMeasureErrDesc=prometheus.NewDesc(
507+
"coder_pubsub_latency_measure_errs_total",
508+
"The number of pubsub latency measurement failures",
509+
nil,nil,
510+
)
511+
)
512+
481513
// We'll track messages as size "normal" and "colossal", where the
482514
// latter are messages larger than 7600 bytes, or 95% of the postgres
483515
// notify limit. If we see a lot of colossal packets that's an indication that
@@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
504536
// implicit metrics
505537
descs<-currentSubscribersDesc
506538
descs<-currentEventsDesc
539+
540+
// additional metrics
541+
descs<-pubsubSendLatencyDesc
542+
descs<-pubsubRecvLatencyDesc
543+
descs<-pubsubLatencyMeasureCountDesc
544+
descs<-pubsubLatencyMeasureErrDesc
507545
}
508546

509547
// Collect implements, along with Describe, the prometheus.Collector interface
@@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
528566
p.qMu.Unlock()
529567
metrics<-prometheus.MustNewConstMetric(currentSubscribersDesc,prometheus.GaugeValue,float64(subs))
530568
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
569+
570+
// additional metrics
571+
ctx,cancel:=context.WithTimeout(context.Background(),LatencyMeasureTimeout)
572+
defercancel()
573+
send,recv,err:=p.latencyMeasurer.Measure(ctx,p)
574+
575+
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc,prometheus.CounterValue,float64(p.latencyMeasureCounter.Add(1)))
576+
iferr!=nil {
577+
p.logger.Warn(context.Background(),"failed to measure latency",slog.Error(err))
578+
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc,prometheus.CounterValue,float64(p.latencyErrCounter.Add(1)))
579+
return
580+
}
581+
metrics<-prometheus.MustNewConstMetric(pubsubSendLatencyDesc,prometheus.GaugeValue,send.Seconds())
582+
metrics<-prometheus.MustNewConstMetric(pubsubRecvLatencyDesc,prometheus.GaugeValue,recv.Seconds())
531583
}
532584

533585
// New creates a new Pubsub implementation using a PostgreSQL connection.
@@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
544596
// newWithoutListener creates a new PGPubsub without creating the pqListener.
545597
funcnewWithoutListener(logger slog.Logger,database*sql.DB)*PGPubsub {
546598
return&PGPubsub{
547-
logger:logger,
548-
listenDone:make(chanstruct{}),
549-
db:database,
550-
queues:make(map[string]map[uuid.UUID]*msgQueue),
599+
logger:logger,
600+
listenDone:make(chanstruct{}),
601+
db:database,
602+
queues:make(map[string]map[uuid.UUID]*msgQueue),
603+
latencyMeasurer:NewLatencyMeasurer(logger.Named("latency-measurer")),
551604

552605
publishesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
553606
Namespace:"coder",

‎coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package pubsub_test
44

55
import (
6+
"bytes"
67
"context"
78
"database/sql"
89
"fmt"
@@ -15,6 +16,8 @@ import (
1516
"github.com/stretchr/testify/require"
1617
"golang.org/x/xerrors"
1718

19+
"cdr.dev/slog/sloggers/sloghuman"
20+
1821
"cdr.dev/slog"
1922
"cdr.dev/slog/sloggers/slogtest"
2023
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) {
294297
}
295298
require.True(t,gotDroppedErr)
296299
}
300+
301+
funcTestMeasureLatency(t*testing.T) {
302+
t.Parallel()
303+
304+
newPubsub:=func() (pubsub.Pubsub,func()) {
305+
ctx,cancel:=context.WithCancel(context.Background())
306+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
307+
connectionURL,closePg,err:=dbtestutil.Open()
308+
require.NoError(t,err)
309+
db,err:=sql.Open("postgres",connectionURL)
310+
require.NoError(t,err)
311+
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
312+
require.NoError(t,err)
313+
314+
returnps,func() {
315+
_=ps.Close()
316+
_=db.Close()
317+
closePg()
318+
cancel()
319+
}
320+
}
321+
322+
t.Run("MeasureLatency",func(t*testing.T) {
323+
t.Parallel()
324+
325+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
326+
ps,done:=newPubsub()
327+
deferdone()
328+
329+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
330+
defercancel()
331+
332+
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
333+
require.NoError(t,err)
334+
require.Greater(t,send.Seconds(),0.0)
335+
require.Greater(t,recv.Seconds(),0.0)
336+
})
337+
338+
t.Run("MeasureLatencyRecvTimeout",func(t*testing.T) {
339+
t.Parallel()
340+
341+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
342+
ps,done:=newPubsub()
343+
deferdone()
344+
345+
ctx,cancel:=context.WithDeadline(context.Background(),time.Now().Add(-time.Hour))
346+
defercancel()
347+
348+
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
349+
require.ErrorContains(t,err,context.DeadlineExceeded.Error())
350+
require.Greater(t,send.Seconds(),0.0)
351+
require.EqualValues(t,recv,time.Duration(-1))
352+
})
353+
354+
t.Run("MeasureLatencyNotifyRace",func(t*testing.T) {
355+
t.Parallel()
356+
357+
varbuf bytes.Buffer
358+
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
359+
logger=logger.AppendSinks(sloghuman.Sink(&buf))
360+
361+
lm:=pubsub.NewLatencyMeasurer(logger)
362+
ps,done:=newPubsub()
363+
deferdone()
364+
365+
racy:=newRacyPubsub(ps)
366+
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
367+
defercancel()
368+
369+
send,recv,err:=lm.Measure(ctx,racy)
370+
assert.NoError(t,err)
371+
assert.Greater(t,send.Seconds(),0.0)
372+
assert.Greater(t,recv.Seconds(),0.0)
373+
374+
logger.Sync()
375+
assert.Contains(t,buf.String(),"received unexpected message")
376+
})
377+
}
378+
379+
// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
380+
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
381+
typeracyPubsubstruct {
382+
pubsub.Pubsub
383+
}
384+
385+
funcnewRacyPubsub(ps pubsub.Pubsub)*racyPubsub {
386+
return&racyPubsub{ps}
387+
}
388+
389+
func (s*racyPubsub)Subscribe(eventstring,listener pubsub.Listener) (cancelfunc(),errerror) {
390+
returns.Pubsub.Subscribe(event,listener)
391+
}
392+
393+
func (s*racyPubsub)SubscribeWithErr(eventstring,listener pubsub.ListenerWithErr) (cancelfunc(),errerror) {
394+
returns.Pubsub.SubscribeWithErr(event,listener)
395+
}
396+
397+
func (s*racyPubsub)Publish(eventstring,message []byte)error {
398+
err:=s.Pubsub.Publish(event, []byte("nonsense"))
399+
iferr!=nil {
400+
returnxerrors.Errorf("failed to send simulated race: %w",err)
401+
}
402+
returns.Pubsub.Publish(event,message)
403+
}
404+
405+
func (s*racyPubsub)Close()error {
406+
returns.Pubsub.Close()
407+
}

‎coderd/database/pubsub/pubsub_test.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) {
3939
err=registry.Register(uut)
4040
require.NoError(t,err)
4141

42+
// each Gather measures pubsub latency by publishing a message & subscribing to it
43+
vargatherCountfloat64
44+
4245
metrics,err:=registry.Gather()
46+
gatherCount++
4347
require.NoError(t,err)
4448
require.True(t,testutil.PromGaugeHasValue(t,metrics,0,"coder_pubsub_current_events"))
4549
require.True(t,testutil.PromGaugeHasValue(t,metrics,0,"coder_pubsub_current_subscribers"))
@@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) {
5963
_=testutil.RequireRecvCtx(ctx,t,messageChannel)
6064

6165
require.Eventually(t,func()bool {
66+
latencyBytes:=gatherCount*pubsub.LatencyMessageLength
6267
metrics,err=registry.Gather()
68+
gatherCount++
6369
assert.NoError(t,err)
6470
returntestutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_events")&&
6571
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_subscribers")&&
6672
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_connected")&&
67-
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_publishes_total","true")&&
68-
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_subscribes_total","true")&&
69-
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_messages_total","normal")&&
70-
testutil.PromCounterHasValue(t,metrics,7,"coder_pubsub_received_bytes_total")&&
71-
testutil.PromCounterHasValue(t,metrics,7,"coder_pubsub_published_bytes_total")
73+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_publishes_total","true")&&
74+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_subscribes_total","true")&&
75+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_messages_total","normal")&&
76+
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyBytes,"coder_pubsub_received_bytes_total")&&
77+
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyBytes,"coder_pubsub_published_bytes_total")&&
78+
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_send_latency_seconds")&&
79+
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_receive_latency_seconds")&&
80+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_latency_measures_total")&&
81+
!testutil.PromCounterGathered(t,metrics,"coder_pubsub_latency_measure_errs_total")
7282
},testutil.WaitShort,testutil.IntervalFast)
7383

74-
colossalData:=make([]byte,7600)
84+
colossalSize:=7600
85+
colossalData:=make([]byte,colossalSize)
7586
fori:=rangecolossalData {
7687
colossalData[i]='q'
7788
}
@@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
89100
_=testutil.RequireRecvCtx(ctx,t,messageChannel)
90101

91102
require.Eventually(t,func()bool {
103+
latencyBytes:=gatherCount*pubsub.LatencyMessageLength
92104
metrics,err=registry.Gather()
105+
gatherCount++
93106
assert.NoError(t,err)
94107
returntestutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_events")&&
95108
testutil.PromGaugeHasValue(t,metrics,2,"coder_pubsub_current_subscribers")&&
96109
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_connected")&&
97-
testutil.PromCounterHasValue(t,metrics,2,"coder_pubsub_publishes_total","true")&&
98-
testutil.PromCounterHasValue(t,metrics,2,"coder_pubsub_subscribes_total","true")&&
99-
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_messages_total","normal")&&
110+
testutil.PromCounterHasValue(t,metrics,1+gatherCount,"coder_pubsub_publishes_total","true")&&
111+
testutil.PromCounterHasValue(t,metrics,1+gatherCount,"coder_pubsub_subscribes_total","true")&&
112+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_messages_total","normal")&&
100113
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_messages_total","colossal")&&
101-
testutil.PromCounterHasValue(t,metrics,7607,"coder_pubsub_received_bytes_total")&&
102-
testutil.PromCounterHasValue(t,metrics,7607,"coder_pubsub_published_bytes_total")
114+
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyBytes,"coder_pubsub_received_bytes_total")&&
115+
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyBytes,"coder_pubsub_published_bytes_total")&&
116+
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_send_latency_seconds")&&
117+
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_receive_latency_seconds")&&
118+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_latency_measures_total")&&
119+
!testutil.PromCounterGathered(t,metrics,"coder_pubsub_latency_measure_errs_total")
103120
},testutil.WaitShort,testutil.IntervalFast)
104121
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp