Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit49d2002

Browse files
committed
Reliably cause notify races by delaying publishes
Signed-off-by: Danny Kopping <danny@coder.com>
1 parent65f57b1 commit49d2002

File tree

2 files changed

+47
-52
lines changed

2 files changed

+47
-52
lines changed

‎coderd/database/pubsub/latency.go

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,24 @@ func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
3333
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
3434
func (lm*LatencyMeasurer)Measure(ctx context.Context,pPubsub) (sendfloat64,recvfloat64,errerror) {
3535
var (
36-
start time.Time
37-
res=make(chanfloat64,1)
38-
subscribeErr=make(chanerror,1)
36+
start time.Time
37+
res=make(chanfloat64,1)
3938
)
4039

4140
msg:= []byte(uuid.New().String())
42-
log:=lm.logger.With(slog.F("msg",msg))
4341

44-
gofunc() {
45-
_,err=p.Subscribe(lm.latencyChannelName(),func(ctx context.Context,in []byte) {
46-
p:=p
47-
_=p
48-
49-
if!bytes.Equal(in,msg) {
50-
log.Warn(ctx,"received unexpected message!",slog.F("in",in))
51-
return
52-
}
53-
54-
res<-time.Since(start).Seconds()
55-
})
56-
iferr!=nil {
57-
subscribeErr<-xerrors.Errorf("failed to subscribe: %w",err)
42+
cancel,err:=p.Subscribe(lm.latencyChannelName(),func(ctx context.Context,in []byte) {
43+
if!bytes.Equal(in,msg) {
44+
lm.logger.Warn(ctx,"received unexpected message",slog.F("got",in),slog.F("expected",msg))
45+
return
5846
}
59-
}()
47+
48+
res<-time.Since(start).Seconds()
49+
})
50+
iferr!=nil {
51+
return-1,-1,xerrors.Errorf("failed to subscribe: %w",err)
52+
}
53+
defercancel()
6054

6155
start=time.Now()
6256
err=p.Publish(lm.latencyChannelName(),msg)
@@ -68,12 +62,10 @@ func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send float64,
6862

6963
select {
7064
case<-ctx.Done():
71-
log.Error(ctx,"context canceled before message could be received",slog.Error(ctx.Err()))
65+
lm.logger.Error(ctx,"context canceled before message could be received",slog.Error(ctx.Err()),slog.F("msg",msg))
7266
returnsend,-1,ctx.Err()
7367
caseval:=<-res:
7468
returnsend,val,nil
75-
caseerr=<-subscribeErr:
76-
returnsend,-1,err
7769
}
7870
}
7971

‎coderd/database/pubsub/pubsub_linux_test.go

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import (
1313
"testing"
1414
"time"
1515

16-
"cdr.dev/slog/sloggers/sloghuman"
1716
"github.com/stretchr/testify/assert"
1817
"github.com/stretchr/testify/require"
1918
"golang.org/x/xerrors"
2019

20+
"cdr.dev/slog/sloggers/sloghuman"
21+
2122
"cdr.dev/slog"
2223
"cdr.dev/slog/sloggers/slogtest"
2324
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -342,8 +343,7 @@ func TestMeasureLatency(t *testing.T) {
342343
ps,done:=newPubsub()
343344
deferdone()
344345

345-
// nolint:gocritic // need a very short timeout here to trigger error
346-
ctx,cancel:=context.WithTimeout(context.Background(),time.Nanosecond)
346+
ctx,cancel:=context.WithDeadline(context.Background(),time.Now().Add(-time.Second))
347347
defercancel()
348348

349349
send,recv,err:=pubsub.NewLatencyMeasurer(logger).Measure(ctx,ps)
@@ -356,75 +356,78 @@ func TestMeasureLatency(t *testing.T) {
356356
t.Parallel()
357357

358358
varbuf bytes.Buffer
359-
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
359+
logger:=slogtest.Make(t,&slogtest.Options{IgnoreErrors:true}).Leveled(slog.LevelDebug)
360360
logger=logger.AppendSinks(sloghuman.Sink(&buf))
361361

362362
lm:=pubsub.NewLatencyMeasurer(logger)
363363
ps,done:=newPubsub()
364364
deferdone()
365365

366-
slow:=newDelayedListener(ps,time.Second)
367-
fast:=newDelayedListener(ps,time.Nanosecond)
366+
slow:=newDelayedPublisher(ps,time.Second)
367+
fast:=newDelayedPublisher(ps,time.Nanosecond)
368+
hold:=make(chanstruct{},1)
368369

370+
// Start two goroutines in which two subscribers are registered but the messages are received out-of-order because
371+
// the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately
372+
// receive their desired messages, but the slow publisher will receive an unexpected message first.
369373
varwg sync.WaitGroup
370374
wg.Add(2)
371-
372-
// Publish message concurrently to a slow receiver.
373375
gofunc() {
374376
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
375377
defercancel()
376378
deferwg.Done()
377379

378-
// Slow receiver will not receive its latency message because the fast one receives it first.
379-
_,_,err:=lm.Measure(ctx,slow)
380-
require.ErrorContains(t,err,context.DeadlineExceeded.Error())
380+
hold<-struct{}{}
381+
send,recv,err:=lm.Measure(ctx,slow)
382+
assert.NoError(t,err)
383+
assert.Greater(t,send,0.0)
384+
assert.Greater(t,recv,0.0)
385+
386+
// The slow subscriber will complete first and receive the fast publisher's message first.
387+
logger.Sync()
388+
assert.Contains(t,buf.String(),"received unexpected message")
381389
}()
382390

383-
// Publish message concurrently to a fast receiver who will receive both its own and the slow receiver's messages.
384-
// It should ignore the unexpected message and consume its own, leaving the slow receiver to timeout since it
385-
// will never receive their own message.
386391
gofunc() {
387392
ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort)
388393
defercancel()
389394
deferwg.Done()
390395

396+
// Force fast publisher to start after the slow one to avoid flakiness.
397+
<-hold
398+
time.Sleep(time.Millisecond*50)
391399
send,recv,err:=lm.Measure(ctx,fast)
392-
require.NoError(t,err)
393-
require.Greater(t,send,0.0)
394-
require.Greater(t,recv,0.0)
400+
assert.NoError(t,err)
401+
assert.Greater(t,send,0.0)
402+
assert.Greater(t,recv,0.0)
395403
}()
396404

397405
wg.Wait()
398-
399-
// Flush the contents of the logger to its buffer.
400-
logger.Sync()
401-
require.Contains(t,buf.String(),"received unexpected message!")
402406
})
403407
}
404408

405-
typedelayedListenerstruct {
409+
typedelayedPublisherstruct {
406410
pubsub.Pubsub
407411
delay time.Duration
408412
}
409413

410-
funcnewDelayedListener(ps pubsub.Pubsub,delay time.Duration)*delayedListener {
411-
return&delayedListener{Pubsub:ps,delay:delay}
414+
funcnewDelayedPublisher(ps pubsub.Pubsub,delay time.Duration)*delayedPublisher {
415+
return&delayedPublisher{Pubsub:ps,delay:delay}
412416
}
413417

414-
func (s*delayedListener)Subscribe(eventstring,listener pubsub.Listener) (cancelfunc(),errerror) {
415-
time.Sleep(s.delay)
418+
func (s*delayedPublisher)Subscribe(eventstring,listener pubsub.Listener) (cancelfunc(),errerror) {
416419
returns.Pubsub.Subscribe(event,listener)
417420
}
418421

419-
func (s*delayedListener)SubscribeWithErr(eventstring,listener pubsub.ListenerWithErr) (cancelfunc(),errerror) {
420-
time.Sleep(s.delay)
422+
func (s*delayedPublisher)SubscribeWithErr(eventstring,listener pubsub.ListenerWithErr) (cancelfunc(),errerror) {
421423
returns.Pubsub.SubscribeWithErr(event,listener)
422424
}
423425

424-
func (s*delayedListener)Publish(eventstring,message []byte)error {
426+
func (s*delayedPublisher)Publish(eventstring,message []byte)error {
427+
time.Sleep(s.delay)
425428
returns.Pubsub.Publish(event,message)
426429
}
427430

428-
func (s*delayedListener)Close()error {
431+
func (s*delayedPublisher)Close()error {
429432
returns.Pubsub.Close()
430433
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp