Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbbf6e1b

Browse files
committed
feat: add metrics to PGPubsub
1 parentcc0dc10 commitbbf6e1b

File tree

4 files changed

+586
-327
lines changed

4 files changed

+586
-327
lines changed

‎cli/server.go‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,14 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
673673
}()
674674

675675
options.Database=database.New(sqlDB)
676-
options.Pubsub,err=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
676+
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
677677
iferr!=nil {
678678
returnxerrors.Errorf("create pubsub: %w",err)
679679
}
680+
options.Pubsub=ps
681+
ifoptions.DeploymentValues.Prometheus.Enable {
682+
options.PrometheusRegistry.MustRegister(ps)
683+
}
680684
deferoptions.Pubsub.Close()
681685
}
682686

‎coderd/database/pubsub/pubsub.go‎

Lines changed: 141 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/lib/pq"
12+
"github.com/prometheus/client_golang/prometheus"
1213
"golang.org/x/xerrors"
1314

1415
"cdr.dev/slog"
@@ -162,8 +163,8 @@ func (q *msgQueue) dropped() {
162163
q.cond.Broadcast()
163164
}
164165

165-
//Pubsub implementation using PostgreSQL.
166-
typepgPubsubstruct {
166+
//PGPubsub is a pubsub implementation using PostgreSQL.
167+
typePGPubsubstruct {
167168
ctx context.Context
168169
cancel context.CancelFunc
169170
logger slog.Logger
@@ -174,29 +175,38 @@ type pgPubsub struct {
174175
queuesmap[string]map[uuid.UUID]*msgQueue
175176
closedListenerbool
176177
closeListenerErrerror
178+
179+
publishesTotal*prometheus.CounterVec
180+
subscribesTotal*prometheus.CounterVec
181+
messagesTotal*prometheus.CounterVec
182+
disconnectionsTotal prometheus.Counter
183+
connected prometheus.Gauge
177184
}
178185

179186
// BufferSize is the maximum number of unhandled messages we will buffer
180187
// for a subscriber before dropping messages.
181188
constBufferSize=2048
182189

183190
// Subscribe calls the listener when an event matching the name is received.
184-
func (p*pgPubsub)Subscribe(eventstring,listenerListener) (cancelfunc(),errerror) {
191+
func (p*PGPubsub)Subscribe(eventstring,listenerListener) (cancelfunc(),errerror) {
185192
returnp.subscribeQueue(event,newMsgQueue(p.ctx,listener,nil))
186193
}
187194

188-
func (p*pgPubsub)SubscribeWithErr(eventstring,listenerListenerWithErr) (cancelfunc(),errerror) {
195+
func (p*PGPubsub)SubscribeWithErr(eventstring,listenerListenerWithErr) (cancelfunc(),errerror) {
189196
returnp.subscribeQueue(event,newMsgQueue(p.ctx,nil,listener))
190197
}
191198

192-
func (p*pgPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {
199+
func (p*PGPubsub)subscribeQueue(eventstring,newQ*msgQueue) (cancelfunc(),errerror) {
193200
p.mut.Lock()
194201
deferp.mut.Unlock()
195202
deferfunc() {
196203
iferr!=nil {
197204
// if we hit an error, we need to close the queue so we don't
198205
// leak its goroutine.
199206
newQ.close()
207+
p.subscribesTotal.WithLabelValues("false").Inc()
208+
}else {
209+
p.subscribesTotal.WithLabelValues("true").Inc()
200210
}
201211
}()
202212

@@ -239,20 +249,22 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
239249
},nil
240250
}
241251

242-
func (p*pgPubsub)Publish(eventstring,message []byte)error {
252+
func (p*PGPubsub)Publish(eventstring,message []byte)error {
243253
p.logger.Debug(p.ctx,"publish",slog.F("event",event),slog.F("message_len",len(message)))
244254
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
245255
// support the first parameter being a prepared statement.
246256
//nolint:gosec
247257
_,err:=p.db.ExecContext(p.ctx,`select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`,message)
248258
iferr!=nil {
259+
p.publishesTotal.WithLabelValues("false").Inc()
249260
returnxerrors.Errorf("exec pg_notify: %w",err)
250261
}
262+
p.publishesTotal.WithLabelValues("true").Inc()
251263
returnnil
252264
}
253265

254266
// Close closes the pubsub instance.
255-
func (p*pgPubsub)Close()error {
267+
func (p*PGPubsub)Close()error {
256268
p.logger.Info(p.ctx,"pubsub is closing")
257269
p.cancel()
258270
err:=p.closeListener()
@@ -262,7 +274,7 @@ func (p *pgPubsub) Close() error {
262274
}
263275

264276
// closeListener closes the pgListener, unless it has already been closed.
265-
func (p*pgPubsub)closeListener()error {
277+
func (p*PGPubsub)closeListener()error {
266278
p.mut.Lock()
267279
deferp.mut.Unlock()
268280
ifp.closedListener {
@@ -274,7 +286,7 @@ func (p *pgPubsub) closeListener() error {
274286
}
275287

276288
// listen begins receiving messages on the pq listener.
277-
func (p*pgPubsub)listen() {
289+
func (p*PGPubsub)listen() {
278290
deferfunc() {
279291
p.logger.Info(p.ctx,"pubsub listen stopped receiving notify")
280292
cErr:=p.closeListener()
@@ -307,7 +319,13 @@ func (p *pgPubsub) listen() {
307319
}
308320
}
309321

310-
func (p*pgPubsub)listenReceive(notif*pq.Notification) {
322+
func (p*PGPubsub)listenReceive(notif*pq.Notification) {
323+
sizeLabel:=messageSizeNormal
324+
iflen(notif.Extra)>=colossalThreshold {
325+
sizeLabel=messageSizeColossal
326+
}
327+
p.messagesTotal.WithLabelValues(sizeLabel).Inc()
328+
311329
p.mut.Lock()
312330
deferp.mut.Unlock()
313331
queues,ok:=p.queues[notif.Channel]
@@ -320,7 +338,7 @@ func (p *pgPubsub) listenReceive(notif *pq.Notification) {
320338
}
321339
}
322340

323-
func (p*pgPubsub)recordReconnect() {
341+
func (p*PGPubsub)recordReconnect() {
324342
p.mut.Lock()
325343
deferp.mut.Unlock()
326344
for_,listeners:=rangep.queues {
@@ -330,20 +348,23 @@ func (p *pgPubsub) recordReconnect() {
330348
}
331349
}
332350

333-
// New creates a new Pubsub implementation using a PostgreSQL connection.
334-
funcNew(ctx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (Pubsub,error) {
351+
func (p*PGPubsub)startListener(ctx context.Context,connectURLstring)error {
352+
p.connected.Set(0)
335353
// Creates a new listener using pq.
336354
errCh:=make(chanerror)
337-
listener:=pq.NewListener(connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
355+
p.pgListener=pq.NewListener(connectURL,time.Second,time.Minute,func(t pq.ListenerEventType,errerror) {
338356
switcht {
339357
casepq.ListenerEventConnected:
340-
logger.Info(ctx,"pubsub connected to postgres")
358+
p.logger.Info(ctx,"pubsub connected to postgres")
359+
p.connected.Set(1.0)
341360
casepq.ListenerEventDisconnected:
342-
logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
361+
p.logger.Error(ctx,"pubsub disconnected from postgres",slog.Error(err))
362+
p.connected.Set(0)
343363
casepq.ListenerEventReconnected:
344-
logger.Info(ctx,"pubsub reconnected to postgres")
364+
p.logger.Info(ctx,"pubsub reconnected to postgres")
365+
p.connected.Set(1)
345366
casepq.ListenerEventConnectionAttemptFailed:
346-
logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
367+
p.logger.Error(ctx,"pubsub failed to connect to postgres",slog.Error(err))
347368
}
348369
// This callback gets events whenever the connection state changes.
349370
// Don't send if the errChannel has already been closed.
@@ -358,26 +379,120 @@ func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL s
358379
select {
359380
caseerr:=<-errCh:
360381
iferr!=nil {
361-
_=listener.Close()
362-
returnnil,xerrors.Errorf("create pq listener: %w",err)
382+
_=p.pgListener.Close()
383+
returnxerrors.Errorf("create pq listener: %w",err)
363384
}
364385
case<-ctx.Done():
365-
_=listener.Close()
366-
returnnil,ctx.Err()
386+
_=p.pgListener.Close()
387+
returnctx.Err()
367388
}
389+
returnnil
390+
}
368391

392+
// these are the metrics we compute explicitly
393+
var (
394+
currentSubscribersDesc=prometheus.NewDesc(
395+
"coder_pubsub_current_subscribers",
396+
"The current number of active pubsub subscribers",
397+
nil,nil,
398+
)
399+
currentEventsDesc=prometheus.NewDesc(
400+
"coder_pubsub_current_events",
401+
"The current number of pubsub event channels listened for",
402+
nil,nil,
403+
)
404+
)
405+
406+
// We'll track messages as size "normal" and "colossal", where the
407+
// latter are messages larger than 7600 bytes, or 95% of the postgres
408+
// notify limit. If we see a lot of colossal packets that's an indication that
409+
// we might be trying to send too much data over the pubsub and are in danger of
410+
// failing to publish.
411+
const (
412+
colossalThreshold=7600
413+
messageSizeNormal="normal"
414+
messageSizeColossal="colossal"
415+
)
416+
417+
// Describe implements, along with Collect, the prometheus.Collector interface
418+
// for metrics.
419+
func (p*PGPubsub)Describe(descschan<-*prometheus.Desc) {
420+
// explicit metrics
421+
p.publishesTotal.Describe(descs)
422+
p.subscribesTotal.Describe(descs)
423+
p.messagesTotal.Describe(descs)
424+
p.disconnectionsTotal.Describe(descs)
425+
p.connected.Describe(descs)
426+
427+
// implicit metrics
428+
descs<-currentSubscribersDesc
429+
descs<-currentEventsDesc
430+
}
431+
432+
// Collect implements, along with Describe, the prometheus.Collector interface
433+
// for metrics
434+
func (p*PGPubsub)Collect(metricschan<- prometheus.Metric) {
435+
// explicit metrics
436+
p.publishesTotal.Collect(metrics)
437+
p.subscribesTotal.Collect(metrics)
438+
p.messagesTotal.Collect(metrics)
439+
p.disconnectionsTotal.Collect(metrics)
440+
p.connected.Collect(metrics)
441+
442+
// implicit metrics
443+
p.mut.Lock()
444+
events:=len(p.queues)
445+
subs:=0
446+
for_,subscriberMap:=rangep.queues {
447+
subs+=len(subscriberMap)
448+
}
449+
p.mut.Unlock()
450+
metrics<-prometheus.MustNewConstMetric(currentSubscribersDesc,prometheus.GaugeValue,float64(subs))
451+
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
452+
}
453+
454+
// New creates a new Pubsub implementation using a PostgreSQL connection.
455+
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (*PGPubsub,error) {
369456
// Start a new context that will be canceled when the pubsub is closed.
370457
ctx,cancel:=context.WithCancel(context.Background())
371-
pgPubsub:=&pgPubsub{
458+
p:=&PGPubsub{
372459
ctx:ctx,
373460
cancel:cancel,
374461
logger:logger,
375462
listenDone:make(chanstruct{}),
376463
db:database,
377-
pgListener:listener,
378464
queues:make(map[string]map[uuid.UUID]*msgQueue),
465+
466+
publishesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
467+
Namespace:"coder",
468+
Subsystem:"pubsub",
469+
Name:"publishes_total",
470+
}, []string{"success"}),
471+
subscribesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
472+
Namespace:"coder",
473+
Subsystem:"pubsub",
474+
Name:"subscribes_total",
475+
}, []string{"success"}),
476+
messagesTotal:prometheus.NewCounterVec(prometheus.CounterOpts{
477+
Namespace:"coder",
478+
Subsystem:"pubsub",
479+
Name:"messages_total",
480+
}, []string{"size"}),
481+
disconnectionsTotal:prometheus.NewCounter(prometheus.CounterOpts{
482+
Namespace:"coder",
483+
Subsystem:"pubsub",
484+
Name:"disconnections_total",
485+
}),
486+
connected:prometheus.NewGauge(prometheus.GaugeOpts{
487+
Namespace:"coder",
488+
Subsystem:"pubsub",
489+
Name:"connected",
490+
}),
491+
}
492+
iferr:=p.startListener(startCtx,connectURL);err!=nil {
493+
returnnil,err
379494
}
380-
gopgPubsub.listen()
495+
gop.listen()
381496
logger.Info(ctx,"pubsub has started")
382-
returnpgPubsub,nil
497+
returnp,nil
383498
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp