Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit33a2a1d

Browse files
committed
Revert to only synchronous collection; background collection is not worth the complexity
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent361538c commit33a2a1d

File tree

7 files changed

+62
-135
lines changed

7 files changed

+62
-135
lines changed

‎cli/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
678678
}()
679679

680680
options.Database=database.New(sqlDB)
681-
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL,pubsub.LatencyMeasureInterval)
681+
ps,err:=pubsub.New(ctx,logger.Named("pubsub"),sqlDB,dbURL)
682682
iferr!=nil {
683683
returnxerrors.Errorf("create pubsub: %w",err)
684684
}

‎coderd/database/dbtestutil/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func NewDB(t testing.TB, opts ...Option) (database.Store, pubsub.Pubsub) {
127127
}
128128
db=database.New(sqlDB)
129129

130-
ps,err=pubsub.New(context.Background(),o.logger,sqlDB,connectionURL,pubsub.LatencyMeasureInterval)
130+
ps,err=pubsub.New(context.Background(),o.logger,sqlDB,connectionURL)
131131
require.NoError(t,err)
132132
t.Cleanup(func() {
133133
_=ps.Close()

‎coderd/database/pubsub/latency.go

Lines changed: 7 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"sync/atomic"
87
"time"
98

109
"github.com/google/uuid"
@@ -19,16 +18,6 @@ type LatencyMeasurer struct {
1918
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
2019
channel uuid.UUID
2120
logger slog.Logger
22-
23-
// background measurement members
24-
collections atomic.Int64
25-
last atomic.Value
26-
asyncCancel context.CancelCauseFunc
27-
}
28-
29-
typeLatencyMeasurementstruct {
30-
Send,Recv time.Duration
31-
Errerror
3221
}
3322

3423
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
@@ -42,7 +31,7 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
4231
}
4332

4433
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
45-
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub)LatencyMeasurement {
34+
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub)(send,recv time.Duration,errerror) {
4635
var (
4736
start time.Time
4837
res=make(chan time.Duration,1)
@@ -60,77 +49,23 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) LatencyMeasure
6049
res<-time.Since(start)
6150
})
6251
iferr!=nil {
63-
returnLatencyMeasurement{Send:-1,Recv:-1,Err:xerrors.Errorf("failed to subscribe: %w",err)}
52+
return-1,-1,xerrors.Errorf("failed to subscribe: %w",err)
6453
}
6554
defercancel()
6655

6756
start=time.Now()
6857
err=p.Publish(lm.latencyChannelName(),msg)
6958
iferr!=nil {
70-
returnLatencyMeasurement{Send:-1,Recv:-1,Err:xerrors.Errorf("failed to publish: %w",err)}
59+
return-1,-1,xerrors.Errorf("failed to publish: %w",err)
7160
}
7261

73-
send:=time.Since(start)
74-
62+
send=time.Since(start)
7563
select {
7664
case<-ctx.Done():
7765
lm.logger.Error(ctx,"context canceled before message could be received",slog.Error(ctx.Err()),slog.F("msg",msg))
78-
returnLatencyMeasurement{Send:send,Recv:-1,Err:ctx.Err()}
79-
caserecv:=<-res:
80-
returnLatencyMeasurement{Send:send,Recv:recv}
81-
}
82-
}
83-
84-
// MeasureAsync runs latency measurements asynchronously on a given interval.
85-
// This function is expected to be run in a goroutine and will exit when the context is canceled.
86-
func (lm*LatencyMeasurer)MeasureAsync(ctx context.Context,pPubsub,interval time.Duration) {
87-
tick:=time.NewTicker(interval)
88-
defertick.Stop()
89-
90-
ctx,cancel:=context.WithCancelCause(ctx)
91-
lm.asyncCancel=cancel
92-
93-
for {
94-
// run immediately on first call, then sleep a tick before each invocation
95-
ifp==nil {
96-
lm.logger.Error(ctx,"given pubsub is nil")
97-
return
98-
}
99-
100-
lm.collections.Add(1)
101-
measure:=lm.Measure(ctx,p)
102-
lm.last.Store(&measure)
103-
104-
select {
105-
case<-tick.C:
106-
continue
107-
108-
// bail out if signaled
109-
case<-ctx.Done():
110-
lm.logger.Debug(ctx,"async measurement canceled",slog.Error(ctx.Err()))
111-
return
112-
}
113-
}
114-
}
115-
116-
func (lm*LatencyMeasurer)LastMeasurement()*LatencyMeasurement {
117-
val:=lm.last.Load()
118-
ifval==nil {
119-
returnnil
120-
}
121-
122-
// nolint:forcetypeassert // Unnecessary type check.
123-
returnval.(*LatencyMeasurement)
124-
}
125-
126-
func (lm*LatencyMeasurer)MeasurementCount()int64 {
127-
returnlm.collections.Load()
128-
}
129-
130-
// Stop stops any background measurements.
131-
func (lm*LatencyMeasurer)Stop() {
132-
iflm.asyncCancel!=nil {
133-
lm.asyncCancel(xerrors.New("stopped"))
66+
returnsend,-1,ctx.Err()
67+
caserecv=<-res:
68+
returnsend,recv,nil
13469
}
13570
}
13671

‎coderd/database/pubsub/pubsub.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,9 @@ type PGPubsub struct {
210210
disconnectionsTotal prometheus.Counter
211211
connected prometheus.Gauge
212212

213-
latencyMeasurer*LatencyMeasurer
214-
latencyErrCounter atomic.Int64
213+
latencyMeasurer*LatencyMeasurer
214+
latencyMeasureCounter atomic.Int64
215+
latencyErrCounter atomic.Int64
215216
}
216217

217218
// BufferSize is the maximum number of unhandled messages we will buffer
@@ -311,8 +312,6 @@ func (p *PGPubsub) Close() error {
311312
err:=p.closeListener()
312313
<-p.listenDone
313314
p.logger.Debug(context.Background(),"pubsub closed")
314-
p.latencyMeasurer.Stop()
315-
p.logger.Debug(context.Background(),"background latency measurement has stopped")
316315
returnerr
317316
}
318317

@@ -569,32 +568,28 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
569568
metrics<-prometheus.MustNewConstMetric(currentEventsDesc,prometheus.GaugeValue,float64(events))
570569

571570
// additional metrics
572-
latency:=p.latencyMeasurer.LastMeasurement()
573-
iflatency==nil {
574-
p.logger.Debug(context.Background(),"latency measurement not completed yet")
575-
return
576-
}
571+
ctx,cancel:=context.WithTimeout(context.Background(),LatencyMeasureInterval)
572+
defercancel()
573+
send,recv,err:=p.latencyMeasurer.Measure(ctx,p)
577574

578-
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc,prometheus.CounterValue,float64(p.latencyMeasurer.MeasurementCount()))
579-
iflatency.Err!=nil {
580-
p.logger.Warn(context.Background(),"failed to measure latency",slog.Error(latency.Err))
575+
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc,prometheus.CounterValue,float64(p.latencyMeasureCounter.Add(1)))
576+
iferr!=nil {
577+
p.logger.Warn(context.Background(),"failed to measure latency",slog.Error(err))
581578
metrics<-prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc,prometheus.CounterValue,float64(p.latencyErrCounter.Add(1)))
582579
return
583580
}
584-
metrics<-prometheus.MustNewConstMetric(pubsubSendLatencyDesc,prometheus.GaugeValue,latency.Send.Seconds())
585-
metrics<-prometheus.MustNewConstMetric(pubsubRecvLatencyDesc,prometheus.GaugeValue,latency.Recv.Seconds())
581+
metrics<-prometheus.MustNewConstMetric(pubsubSendLatencyDesc,prometheus.GaugeValue,send.Seconds())
582+
metrics<-prometheus.MustNewConstMetric(pubsubRecvLatencyDesc,prometheus.GaugeValue,recv.Seconds())
586583
}
587584

588585
// New creates a new Pubsub implementation using a PostgreSQL connection.
589-
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring,latencyMeasureInterval time.Duration) (*PGPubsub,error) {
586+
funcNew(startCtx context.Context,logger slog.Logger,database*sql.DB,connectURLstring) (*PGPubsub,error) {
590587
p:=newWithoutListener(logger,database)
591588
iferr:=p.startListener(startCtx,connectURL);err!=nil {
592589
returnnil,err
593590
}
594591
gop.listen()
595592
logger.Info(startCtx,"pubsub has started")
596-
gop.latencyMeasurer.MeasureAsync(startCtx,p,latencyMeasureInterval)
597-
logger.Debug(startCtx,"background latency measurement has started")
598593
returnp,nil
599594
}
600595

‎coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestPubsub(t *testing.T) {
4545
db,err:=sql.Open("postgres",connectionURL)
4646
require.NoError(t,err)
4747
deferdb.Close()
48-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
48+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
4949
require.NoError(t,err)
5050
deferpubsub.Close()
5151
event:="test"
@@ -74,7 +74,7 @@ func TestPubsub(t *testing.T) {
7474
db,err:=sql.Open("postgres",connectionURL)
7575
require.NoError(t,err)
7676
deferdb.Close()
77-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
77+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
7878
require.NoError(t,err)
7979
deferpubsub.Close()
8080
cancelFunc()
@@ -90,7 +90,7 @@ func TestPubsub(t *testing.T) {
9090
db,err:=sql.Open("postgres",connectionURL)
9191
require.NoError(t,err)
9292
deferdb.Close()
93-
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
93+
pubsub,err:=pubsub.New(ctx,logger,db,connectionURL)
9494
require.NoError(t,err)
9595
deferpubsub.Close()
9696

@@ -127,7 +127,7 @@ func TestPubsub_ordering(t *testing.T) {
127127
db,err:=sql.Open("postgres",connectionURL)
128128
require.NoError(t,err)
129129
deferdb.Close()
130-
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
130+
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
131131
require.NoError(t,err)
132132
deferps.Close()
133133
event:="test"
@@ -176,7 +176,7 @@ func TestPubsub_Disconnect(t *testing.T) {
176176
ctx,cancelFunc:=context.WithTimeout(context.Background(),testutil.WaitSuperLong)
177177
defercancelFunc()
178178
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
179-
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
179+
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
180180
require.NoError(t,err)
181181
deferps.Close()
182182
event:="test"
@@ -308,7 +308,7 @@ func TestMeasureLatency(t *testing.T) {
308308
require.NoError(t,err)
309309
db,err:=sql.Open("postgres",connectionURL)
310310
require.NoError(t,err)
311-
ps,err:=pubsub.New(ctx,logger,db,connectionURL,pubsub.LatencyMeasureInterval)
311+
ps,err:=pubsub.New(ctx,logger,db,connectionURL)
312312
require.NoError(t,err)
313313

314314
returnps,func() {
@@ -329,10 +329,10 @@ func TestMeasureLatency(t *testing.T) {
329329
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
330330
defercancel()
331331

332-
l:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
333-
require.NoError(t,l.Err)
334-
require.Greater(t,l.Send.Seconds(),0.0)
335-
require.Greater(t,l.Recv.Seconds(),0.0)
332+
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
333+
require.NoError(t,err)
334+
require.Greater(t,send.Seconds(),0.0)
335+
require.Greater(t,recv.Seconds(),0.0)
336336
})
337337

338338
t.Run("MeasureLatencyRecvTimeout",func(t*testing.T) {
@@ -345,10 +345,10 @@ func TestMeasureLatency(t *testing.T) {
345345
ctx,cancel:=context.WithDeadline(context.Background(),time.Now().Add(-time.Hour))
346346
defercancel()
347347

348-
l:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
349-
require.ErrorContains(t,l.Err,context.DeadlineExceeded.Error())
350-
require.Greater(t,l.Send.Seconds(),0.0)
351-
require.EqualValues(t,l.Recv,time.Duration(-1))
348+
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
349+
require.ErrorContains(t,err,context.DeadlineExceeded.Error())
350+
require.Greater(t,send.Seconds(),0.0)
351+
require.EqualValues(t,recv,time.Duration(-1))
352352
})
353353

354354
t.Run("MeasureLatencyNotifyRace",func(t*testing.T) {
@@ -366,10 +366,10 @@ func TestMeasureLatency(t *testing.T) {
366366
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
367367
defercancel()
368368

369-
l:=lm.Measure(ctx,racy)
370-
assert.NoError(t,l.Err)
371-
assert.Greater(t,l.Send.Seconds(),0.0)
372-
assert.Greater(t,l.Recv.Seconds(),0.0)
369+
send,recv,err:=lm.Measure(ctx,racy)
370+
assert.NoError(t,err)
371+
assert.Greater(t,send.Seconds(),0.0)
372+
assert.Greater(t,recv.Seconds(),0.0)
373373

374374
logger.Sync()
375375
assert.Contains(t,buf.String(),"received unexpected message")

‎coderd/database/pubsub/pubsub_test.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"database/sql"
66
"testing"
7-
"time"
87

98
"github.com/prometheus/client_golang/prometheus"
109
"github.com/stretchr/testify/assert"
@@ -33,16 +32,18 @@ func TestPGPubsub_Metrics(t *testing.T) {
3332
registry:=prometheus.NewRegistry()
3433
ctx:=testutil.Context(t,testutil.WaitLong)
3534

36-
latencyMeasureInterval:=time.Second
37-
start:=time.Now()
38-
uut,err:=pubsub.New(ctx,logger,db,connectionURL,latencyMeasureInterval)
35+
uut,err:=pubsub.New(ctx,logger,db,connectionURL)
3936
require.NoError(t,err)
4037
deferuut.Close()
4138

4239
err=registry.Register(uut)
4340
require.NoError(t,err)
4441

42+
// each Gather measures pubsub latency by publishing a message & subscribing to it
43+
vargatherCountfloat64
44+
4545
metrics,err:=registry.Gather()
46+
gatherCount++
4647
require.NoError(t,err)
4748
require.True(t,testutil.PromGaugeHasValue(t,metrics,0,"coder_pubsub_current_events"))
4849
require.True(t,testutil.PromGaugeHasValue(t,metrics,0,"coder_pubsub_current_subscribers"))
@@ -62,24 +63,21 @@ func TestPGPubsub_Metrics(t *testing.T) {
6263
_=testutil.RequireRecvCtx(ctx,t,messageChannel)
6364

6465
require.Eventually(t,func()bool {
66+
latencyBytes:=gatherCount*pubsub.LatencyMessageLength
6567
metrics,err=registry.Gather()
68+
gatherCount++
6669
assert.NoError(t,err)
67-
68-
latencyMeasurements:= (time.Since(start).Truncate(latencyMeasureInterval).Seconds()/latencyMeasureInterval.Seconds())+1
69-
t.Log(latencyMeasurements)
70-
latencyMsgSize:=pubsub.LatencyMessageLength*latencyMeasurements
71-
7270
returntestutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_events")&&
7371
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_subscribers")&&
7472
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_connected")&&
75-
testutil.PromCounterHasValue(t,metrics,1+latencyMeasurements,"coder_pubsub_publishes_total","true")&&
76-
testutil.PromCounterHasValue(t,metrics,1+latencyMeasurements,"coder_pubsub_subscribes_total","true")&&
77-
testutil.PromCounterHasValue(t,metrics,1+latencyMeasurements,"coder_pubsub_messages_total","normal")&&
78-
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyMsgSize,"coder_pubsub_received_bytes_total")&&
79-
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyMsgSize,"coder_pubsub_published_bytes_total")&&
73+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_publishes_total","true")&&
74+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_subscribes_total","true")&&
75+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_messages_total","normal")&&
76+
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyBytes,"coder_pubsub_received_bytes_total")&&
77+
testutil.PromCounterHasValue(t,metrics,float64(len(data))+latencyBytes,"coder_pubsub_published_bytes_total")&&
8078
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_send_latency_seconds")&&
8179
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_receive_latency_seconds")&&
82-
testutil.PromCounterHasValue(t,metrics,latencyMeasurements,"coder_pubsub_latency_measures_total")&&
80+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_latency_measures_total")&&
8381
!testutil.PromCounterGathered(t,metrics,"coder_pubsub_latency_measure_errs_total")
8482
},testutil.WaitShort,testutil.IntervalFast)
8583

@@ -102,25 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
102100
_=testutil.RequireRecvCtx(ctx,t,messageChannel)
103101

104102
require.Eventually(t,func()bool {
103+
latencyBytes:=gatherCount*pubsub.LatencyMessageLength
105104
metrics,err=registry.Gather()
105+
gatherCount++
106106
assert.NoError(t,err)
107-
108-
latencyMeasurements:= (time.Since(start).Truncate(latencyMeasureInterval).Seconds()/latencyMeasureInterval.Seconds())+1
109-
latencyMsgSize:=pubsub.LatencyMessageLength*latencyMeasurements
110-
t.Log(latencyMeasurements)
111-
112107
returntestutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_current_events")&&
113108
testutil.PromGaugeHasValue(t,metrics,2,"coder_pubsub_current_subscribers")&&
114109
testutil.PromGaugeHasValue(t,metrics,1,"coder_pubsub_connected")&&
115-
testutil.PromCounterHasValue(t,metrics,2+latencyMeasurements,"coder_pubsub_publishes_total","true")&&
116-
testutil.PromCounterHasValue(t,metrics,2+latencyMeasurements,"coder_pubsub_subscribes_total","true")&&
117-
testutil.PromCounterHasValue(t,metrics,1+latencyMeasurements,"coder_pubsub_messages_total","normal")&&
110+
testutil.PromCounterHasValue(t,metrics,1+gatherCount,"coder_pubsub_publishes_total","true")&&
111+
testutil.PromCounterHasValue(t,metrics,1+gatherCount,"coder_pubsub_subscribes_total","true")&&
112+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_messages_total","normal")&&
118113
testutil.PromCounterHasValue(t,metrics,1,"coder_pubsub_messages_total","colossal")&&
119-
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyMsgSize,"coder_pubsub_received_bytes_total")&&
120-
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyMsgSize,"coder_pubsub_published_bytes_total")&&
114+
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyBytes,"coder_pubsub_received_bytes_total")&&
115+
testutil.PromCounterHasValue(t,metrics,float64(colossalSize+len(data))+latencyBytes,"coder_pubsub_published_bytes_total")&&
121116
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_send_latency_seconds")&&
122117
testutil.PromGaugeAssertion(t,metrics,func(infloat64)bool {returnin>0 },"coder_pubsub_receive_latency_seconds")&&
123-
testutil.PromCounterHasValue(t,metrics,latencyMeasurements,"coder_pubsub_latency_measures_total")&&
118+
testutil.PromCounterHasValue(t,metrics,gatherCount,"coder_pubsub_latency_measures_total")&&
124119
!testutil.PromCounterGathered(t,metrics,"coder_pubsub_latency_measure_errs_total")
125120
},testutil.WaitShort,testutil.IntervalFast)
126121
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp