Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: measure pubsub latencies and expose metrics#13126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
dannykopping merged 17 commits intocoder:mainfromdannykopping:dk/pubsub-latency
May 10, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
17 commits
Select commitHold shift + click to select a range
c249b70
Measuring pubsub latency and exposing as Prometheus metric
dannykoppingMay 1, 2024
f845bda
Track errors
dannykoppingMay 2, 2024
4f62b40
Explicitly checking latencies are >0
dannykoppingMay 2, 2024
3d9e3dd
Enhancements
dannykoppingMay 2, 2024
34083d0
Fix TestPGPubsub_Metrics test
dannykoppingMay 2, 2024
28a96de
Refactor to avoid global state
dannykoppingMay 2, 2024
65f57b1
Protect against NOTIFY races a slow receiver
dannykoppingMay 2, 2024
49d2002
Reliably cause notify races by delaying publishes
dannykoppingMay 3, 2024
68412c8
Merge branch 'main' of https://github.com/coder/coder into dk/pubsub-…
dannykoppingMay 6, 2024
a7c042f
Measure latency in the background
dannykoppingMay 6, 2024
633365d
Fake pubsub to simulate race
dannykoppingMay 6, 2024
722a233
make fmt
dannykoppingMay 6, 2024
ff73789
Stop async measurements on pubsub close
dannykoppingMay 7, 2024
7d055d2
Cancel goroutine immediately on stop
dannykoppingMay 8, 2024
361538c
Merge branch 'main' of https://github.com/coder/coder into dk/pubsub-…
dannykoppingMay 10, 2024
33a2a1d
Revert to only synchronous collection; background collection is not w…
dannykoppingMay 10, 2024
bad502a
Wording fixup
dannykoppingMay 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletionscoderd/database/pubsub/latency.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
package pubsub

import (
"bytes"
"context"
"fmt"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
)

// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
type LatencyMeasurer struct {
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
channel uuid.UUID
logger slog.Logger
}

// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
const LatencyMessageLength = 36

func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
return &LatencyMeasurer{
channel: uuid.New(),
logger: logger,
}
}

// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
var (
start time.Time
res = make(chan time.Duration, 1)
)

msg := []byte(uuid.New().String())
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))

cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
if !bytes.Equal(in, msg) {
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
return
}

res <- time.Since(start)
})
if err != nil {
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
}
defer cancel()

start = time.Now()
err = p.Publish(lm.latencyChannelName(), msg)
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}

send = time.Since(start)
select {
case <-ctx.Done():
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
return send, -1, ctx.Err()
case recv = <-res:
return send, recv, nil
}
}

func (lm *LatencyMeasurer) latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", lm.channel)
}
61 changes: 57 additions & 4 deletionscoderd/database/pubsub/pubsub.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,6 +7,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All@@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
// might have been dropped.
var ErrDroppedMessages = xerrors.New("dropped messages")

// LatencyMeasureTimeout defines how often to trigger a new background latency measurement.
const LatencyMeasureTimeout = time.Second * 10

// Pubsub is a generic interface for broadcasting and receiving messages.
// Implementors should assume high-availability with the backing implementation.
type Pubsub interface {
Expand DownExpand Up@@ -205,6 +209,10 @@ type PGPubsub struct {
receivedBytesTotal prometheus.Counter
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge

latencyMeasurer *LatencyMeasurer
latencyMeasureCounter atomic.Int64
latencyErrCounter atomic.Int64
}

// BufferSize is the maximum number of unhandled messages we will buffer
Expand DownExpand Up@@ -478,6 +486,30 @@ var (
)
)

// additional metrics collected out-of-band
var (
pubsubSendLatencyDesc = prometheus.NewDesc(
"coder_pubsub_send_latency_seconds",
"The time taken to send a message into a pubsub event channel",
nil, nil,
)
pubsubRecvLatencyDesc = prometheus.NewDesc(
"coder_pubsub_receive_latency_seconds",
"The time taken to receive a message from a pubsub event channel",
nil, nil,
)
pubsubLatencyMeasureCountDesc = prometheus.NewDesc(
"coder_pubsub_latency_measures_total",
"The number of pubsub latency measurements",
nil, nil,
)
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
"coder_pubsub_latency_measure_errs_total",
"The number of pubsub latency measurement failures",
nil, nil,
)
)

// We'll track messages as size "normal" and "colossal", where the
// latter are messages larger than 7600 bytes, or 95% of the postgres
// notify limit. If we see a lot of colossal packets that's an indication that
Expand All@@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
// implicit metrics
descs <- currentSubscribersDesc
descs <- currentEventsDesc

// additional metrics
descs <- pubsubSendLatencyDesc
descs <- pubsubRecvLatencyDesc
descs <- pubsubLatencyMeasureCountDesc
descs <- pubsubLatencyMeasureErrDesc
}

// Collect implements, along with Describe, the prometheus.Collector interface
Expand All@@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
p.qMu.Unlock()
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))

// additional metrics
ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureTimeout)
defer cancel()
send, recv, err := p.latencyMeasurer.Measure(ctx, p)

metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1)))
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
return
}
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds())
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds())
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
Expand All@@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
// newWithoutListener creates a new PGPubsub without creating the pqListener.
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
return &PGPubsub{
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")),

publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",
Expand Down
111 changes: 111 additions & 0 deletionscoderd/database/pubsub/pubsub_linux_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,6 +3,7 @@
package pubsub_test

import (
"bytes"
"context"
"database/sql"
"fmt"
Expand All@@ -15,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"cdr.dev/slog/sloggers/sloghuman"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
Expand DownExpand Up@@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) {
}
require.True(t, gotDroppedErr)
}

func TestMeasureLatency(t *testing.T) {
t.Parallel()

newPubsub := func() (pubsub.Pubsub, func()) {
ctx, cancel := context.WithCancel(context.Background())
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := dbtestutil.Open()
require.NoError(t, err)
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)

return ps, func() {
_ = ps.Close()
_ = db.Close()
closePg()
cancel()
}
}

t.Run("MeasureLatency", func(t *testing.T) {
t.Parallel()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.NoError(t, err)
require.Greater(t, send.Seconds(), 0.0)
require.Greater(t, recv.Seconds(), 0.0)
})

t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
t.Parallel()

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()

send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.Greater(t, send.Seconds(), 0.0)
require.EqualValues(t, recv, time.Duration(-1))
})

t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
t.Parallel()

var buf bytes.Buffer
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
logger = logger.AppendSinks(sloghuman.Sink(&buf))

lm := pubsub.NewLatencyMeasurer(logger)
ps, done := newPubsub()
defer done()

racy := newRacyPubsub(ps)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

send, recv, err := lm.Measure(ctx, racy)
assert.NoError(t, err)
assert.Greater(t, send.Seconds(), 0.0)
assert.Greater(t, recv.Seconds(), 0.0)

logger.Sync()
assert.Contains(t, buf.String(), "received unexpected message")
})
}

// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
type racyPubsub struct {
pubsub.Pubsub
}

func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub {
return &racyPubsub{ps}
}

func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
return s.Pubsub.Subscribe(event, listener)
}

func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
return s.Pubsub.SubscribeWithErr(event, listener)
}

func (s *racyPubsub) Publish(event string, message []byte) error {
err := s.Pubsub.Publish(event, []byte("nonsense"))
if err != nil {
return xerrors.Errorf("failed to send simulated race: %w", err)
}
return s.Pubsub.Publish(event, message)
}

func (s *racyPubsub) Close() error {
return s.Pubsub.Close()
}
39 changes: 28 additions & 11 deletionscoderd/database/pubsub/pubsub_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) {
err = registry.Register(uut)
require.NoError(t, err)

// each Gather measures pubsub latency by publishing a message & subscribing to it
var gatherCount float64

metrics, err := registry.Gather()
gatherCount++
require.NoError(t, err)
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events"))
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers"))
Expand All@@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)

require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)

colossalData := make([]byte, 7600)
colossalSize := 7600
colossalData := make([]byte, colossalSize)
for i := range colossalData {
colossalData[i] = 'q'
}
Expand All@@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)

require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics,2, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics,2, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics,1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics,1+gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics,1+gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics,gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp