Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5a359d5

Browse files
authored
feat: add metrics to PGPubsub (#11971)
Adds prometheus metrics to PGPubsub for monitoring its health and performance in production.Related to#11950 --- additional diagnostics to help figure out what's happening
1 parente748312 commit5a359d5

File tree

4 files changed

+615
-327
lines changed

4 files changed

+615
-327
lines changed

‎cli/server.go‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
673673
}()
674674

675675
options.Database=database.New(sqlDB)
676-
options.Pubsub,err=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
676+
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
677677
iferr!=nil {
678678
returnxerrors.Errorf("create pubsub: %w",err)
679679
}
680+
options.Pubsub=ps
681+
ifoptions.DeploymentValues.Prometheus.Enable {
682+
options.PrometheusRegistry.MustRegister(ps)
683+
}
680684
deferoptions.Pubsub.Close()
681685
}
682686

‎coderd/database/pubsub/pubsub.go‎

Lines changed: 166 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/lib/pq"
12+
"github.com/prometheus/client_golang/prometheus"
1213
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162163
q.cond.Broadcast()
163164
}
164165

165-
//Pubsub implementation using PostgreSQL.
166-
typepgPubsubstruct {
166+
//PGPubsub is a pubsub implementation using PostgreSQL.
167+
typePGPubsubstruct {
167168
ctx context.Context
168169
cancel context.CancelFunc
169170
logger slog.Logger
@@ -174,29 +175,40 @@ type pgPubsub struct {
174175
queuesmap[string]map[uuid.UUID]*msgQueue
175176
closedListenerbool
176177
closeListenerErrerror
178+
179+
publishesTotal*prometheus.CounterVec
180+
subscribesTotal*prometheus.CounterVec
181+
messagesTotal*prometheus.CounterVec
182+
publishedBytesTotal prometheus.Counter
183+
receivedBytesTotal prometheus.Counter
184+
disconnectionsTotal prometheus.Counter
185+
connected prometheus.Gauge
177186
}
178187

179188
// BufferSize is the maximum number of unhandled messages we will buffer
180189
// for a subscriber before dropping messages.
181190
constBufferSize=2048
182191

183192
// Subscribe calls the listener when an event matching the name is received.
184-
func (p*pgPubsub)Subscribe(eventstring,listenerListener) (cancelfunc(),errerror) {
193+
func (p*PGPubsub)Subscribe(eventstring,listenerListener) (cancelfunc(),errerror) {
185194
returnp.subscribeQueue(event,newMsgQueue(p.ctx,listener,nil))
186195
}
187196

188-
func (p*pgPubsub)SubscribeWithErr(eventstring,listenerListenerWithErr) (cancelfunc(),errerror) {
197+
func (p*PGPubsub)SubscribeWithErr(eventstring,listenerListenerWithErr) (cancelfunc(),errerror) {
189198
returnp.subscribeQueue(event,newMsgQueue(p.ctx,nil,listener))
190199
}
191200

192-
func (p*pgPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {
201+
func (p*PGPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {
193202
p.mut.Lock()
194203
deferp.mut.Unlock()
195204
deferfunc() {
196205
iferr!=nil {
197206
// if we hit an error, we need to close the queue so we don't
198207
// leak its goroutine.
199208
newQ.close()
209+
p.subscribesTotal.WithLabelValues("false").Inc()
210+
}else {
211+
p.subscribesTotal.WithLabelValues("true").Inc()
200212
}
201213
}()
202214

@@ -239,20 +251,23 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239251
},nil
240252
}
241253

242-
func (p*pgPubsub)Publish(eventstring,message []byte)error {
254+
func (p*PGPubsub)Publish(eventstring,message []byte)error {
243255
p.logger.Debug(p.ctx,"publish",slog.F("event",event),slog.F("message_len",len(message)))
244256
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245257
// support the first parameter being a prepared statement.
246258
//nolint:gosec
247259
_,err:=p.db.ExecContext(p.ctx,`select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`,message)
248260
iferr!=nil {
261+
p.publishesTotal.WithLabelValues("false").Inc()
249262
returnxerrors.Errorf("exec pg_notify: %w",err)
250263
}
264+
p.publishesTotal.WithLabelValues("true").Inc()
265+
p.publishedBytesTotal.Add(float64(len(message)))
251266
returnnil
252267
}
253268

254269
// Close closes the pubsub instance.
255-
func (p*pgPubsub)Close()error {
270+
func (p*PGPubsub)Close()error {
256271
p.logger.Info(p.ctx,"pubsub is closing")
257272
p.cancel()
258273
err:=p.closeListener()
@@ -262,7 +277,7 @@ func (p *pgPubsub) Close() error {
262277
}
263278

264279
// closeListener closes the pgListener, unless it has already been closed.
265-
func (p*pgPubsub)closeListener()error {
280+
func (p*PGPubsub)closeListener()error {
266281
p.mut.Lock()
267282
deferp.mut.Unlock()
268283
ifp.closedListener {
@@ -274,7 +289,7 @@ func (p *pgPubsub) closeListener() error {
274289
}
275290

276291
// listen begins receiving messages on the pq listener.
277-
func (p*pgPubsub)listen() {
292+
func (p*PGPubsub)listen() {
278293
deferfunc() {
279294
p.logger.Info(p.ctx,"pubsub listen stopped receiving notify")
280295
cErr:=p.closeListener()
@@ -307,7 +322,14 @@ func (p *pgPubsub) listen() {
307322
}
308323
}
309324

310-
func (p*pgPubsub)listenReceive(notif*pq.Notification) {
325+
func (p*PGPubsub)listenReceive(notif*pq.Notification) {
326+
sizeLabel:=messageSizeNormal
327+
iflen(notif.Extra)>=colossalThreshold {
328+
sizeLabel=messageSizeColossal
329+
}
330+
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
331+
p.receivedBytesTotal.Add(float64(len(notif.Extra)))
332+
311333
p.mut.Lock()
312334
deferp.mut.Unlock()
313335
queues,ok:=p.queues[notif.Channel]
@@ -320,7 +342,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320342
}
321343
}
322344

323-
func (p*pgPubsub)recordReconnect() {
345+
func (p*PGPubsub)recordReconnect() {
324346
p.mut.Lock()
325347
deferp.mut.Unlock()
326348
for_,listeners:=rangep.queues {
@@ -330,20 +352,23 @@ func (p *pgPubsub) recordReconnect() {
330352
}
331353
}
332354

333-
// New creates a new Pubsub implementation using a PostgreSQL connection.
334-
funcNew(ctx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (Pubsub,error) {
355+
func (p*PGPubsub)startListener(ctx context.Context,connectURLstring)error {
356+
p.connected.Set(0)
335357
// Creates a new listener using pq.
336358
errCh:=make(chanerror)
337-
listener:=pq.NewListener(connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
359+
p.pgListener=pq.NewListener(connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
338360
switcht {
339361
casepq.ListenerEventConnected:
340-
logger.Info(ctx,"pubsub connected to postgres")
362+
p.logger.Info(ctx,"pubsub connected to postgres")
363+
p.connected.Set(1.0)
341364
casepq.ListenerEventDisconnected:
342-
logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
365+
p.logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
366+
p.connected.Set(0)
343367
casepq.ListenerEventReconnected:
344-
logger.Info(ctx,"pubsub reconnected to postgres")
368+
p.logger.Info(ctx,"pubsub reconnected to postgres")
369+
p.connected.Set(1)
345370
casepq.ListenerEventConnectionAttemptFailed:
346-
logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
371+
p.logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
347372
}
348373
// This callback gets events whenever the connection state changes.
349374
// Don't send if the errChannel has already been closed.
@@ -358,26 +383,141 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358383
select {
359384
caseerr:=<-errCh:
360385
iferr!=nil {
361-
_=listener.Close()
362-
returnnil,xerrors.Errorf("create pq listener: %w",err)
386+
_=p.pgListener.Close()
387+
returnxerrors.Errorf("create pq listener: %w",err)
363388
}
364389
case<-ctx.Done():
365-
_=listener.Close()
366-
returnnil,ctx.Err()
390+
_=p.pgListener.Close()
391+
returnctx.Err()
367392
}
393+
returnnil
394+
}
368395

396+
// these are the metrics we compute implicitly from our existing data structures
397+
var (
398+
currentSubscribersDesc=prometheus.NewDesc(
399+
"coder_pubsub_current_subscribers",
400+
"The current number of active pubsub subscribers",
401+
nil,nil,
402+
)
403+
currentEventsDesc=prometheus.NewDesc(
404+
"coder_pubsub_current_events",
405+
"The current number of pubsub event channels listened for",
406+
nil,nil,
407+
)
408+
)
409+
410+
// We'll track messages as size "normal" and "colossal", where the
411+
// latter are messages larger than 7600 bytes, or 95% of the postgres
412+
// notify limit. If we see a lot of colossal packets that's an indication that
413+
// we might be trying to send too much data over the pubsub and are in danger of
414+
// failing to publish.
415+
const (
416+
colossalThreshold=7600
417+
messageSizeNormal="normal"
418+
messageSizeColossal="colossal"
419+
)
420+
421+
// Describe implements, along with Collect, the prometheus.Collector interface
422+
// for metrics.
423+
func (p*PGPubsub)Describe(descschan<-*prometheus.Desc) {
424+
// explicit metrics
425+
p.publishesTotal.Describe(descs)
426+
p.subscribesTotal.Describe(descs)
427+
p.messagesTotal.Describe(descs)
428+
p.publishedBytesTotal.Describe(descs)
429+
p.receivedBytesTotal.Describe(descs)
430+
p.disconnectionsTotal.Describe(descs)
431+
p.connected.Describe(descs)
432+
433+
// implicit metrics
434+
descs<-currentSubscribersDesc
435+
descs<-currentEventsDesc
436+
}
437+
438+
// Collect implements, along with Describe, the prometheus.Collector interface
439+
// for metrics
440+
func (p*PGPubsub)Collect(metricschan<- prometheus.Metric) {
441+
// explicit metrics
442+
p.publishesTotal.Collect(metrics)
443+
p.subscribesTotal.Collect(metrics)
444+
p.messagesTotal.Collect(metrics)
445+
p.publishedBytesTotal.Collect(metrics)
446+
p.receivedBytesTotal.Collect(metrics)
447+
p.disconnectionsTotal.Collect(metrics)
448+
p.connected.Collect(metrics)
449+
450+
// implicit metrics
451+
p.mut.Lock()
452+
events:=len(p.queues)
453+
subs:=0
454+
for_,subscriberMap:=rangep.queues {
455+
subs+=len(subscriberMap)
456+
}
457+
p.mut.Unlock()
458+
metrics<-prometheus.MustNewConstMetric(currentSubscribersDesc,prometheus.GaugeValue,float64(subs))
459+
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
460+
}
461+
462+
// New creates a new Pubsub implementation using a PostgreSQL connection.
463+
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (*PGPubsub,error) {
369464
// Start a new context that will be canceled when the pubsub is closed.
370465
ctx,cancel:=context.WithCancel(context.Background())
371-
pgPubsub:=&pgPubsub{
466+
p:=&PGPubsub{
372467
ctx:ctx,
373468
cancel:cancel,
374469
logger:logger,
375470
listenDone:make(chanstruct{}),
376471
db:database,
377-
pgListener:listener,
378472
queues:make(map[string]map[uuid.UUID]*msgQueue),
473+
474+
publishesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
475+
Namespace:"coder",
476+
Subsystem:"pubsub",
477+
Name:"publishes_total",
478+
Help:"Total number of calls to Publish",
479+
}, []string{"success"}),
480+
subscribesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
481+
Namespace:"coder",
482+
Subsystem:"pubsub",
483+
Name:"subscribes_total",
484+
Help:"Total number of calls to Subscribe/SubscribeWithErr",
485+
}, []string{"success"}),
486+
messagesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
487+
Namespace:"coder",
488+
Subsystem:"pubsub",
489+
Name:"messages_total",
490+
Help:"Total number of messages received from postgres",
491+
}, []string{"size"}),
492+
publishedBytesTotal:prometheus.NewCounter(prometheus.CounterOpts{
493+
Namespace:"coder",
494+
Subsystem:"pubsub",
495+
Name:"published_bytes_total",
496+
Help:"Total number of bytes successfully published across all publishes",
497+
}),
498+
receivedBytesTotal:prometheus.NewCounter(prometheus.CounterOpts{
499+
Namespace:"coder",
500+
Subsystem:"pubsub",
501+
Name:"received_bytes_total",
502+
Help:"Total number of bytes received across all messages",
503+
}),
504+
disconnectionsTotal:prometheus.NewCounter(prometheus.CounterOpts{
505+
Namespace:"coder",
506+
Subsystem:"pubsub",
507+
Name:"disconnections_total",
508+
Help:"Total number of times we disconnected unexpectedly from postgres",
509+
}),
510+
connected:prometheus.NewGauge(prometheus.GaugeOpts{
511+
Namespace:"coder",
512+
Subsystem:"pubsub",
513+
Name:"connected",
514+
Help:"Whether we are connected (1) or not connected (0) to postgres",
515+
}),
516+
}
517+
iferr:=p.startListener(startCtx,connectURL);err!=nil {
518+
returnnil,err
379519
}
380-
gopgPubsub.listen()
520+
gop.listen()
381521
logger.Info(ctx,"pubsub has started")
382-
returnpgPubsub,nil
522+
returnp,nil
383523
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp