@@ -13,11 +13,12 @@ import (
13
13
"testing"
14
14
"time"
15
15
16
- "cdr.dev/slog/sloggers/sloghuman"
17
16
"github.com/stretchr/testify/assert"
18
17
"github.com/stretchr/testify/require"
19
18
"golang.org/x/xerrors"
20
19
20
+ "cdr.dev/slog/sloggers/sloghuman"
21
+
21
22
"cdr.dev/slog"
22
23
"cdr.dev/slog/sloggers/slogtest"
23
24
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@@ -342,8 +343,7 @@ func TestMeasureLatency(t *testing.T) {
342
343
ps ,done := newPubsub ()
343
344
defer done ()
344
345
345
- // nolint:gocritic // need a very short timeout here to trigger error
346
- ctx ,cancel := context .WithTimeout (context .Background (),time .Nanosecond )
346
+ ctx ,cancel := context .WithDeadline (context .Background (),time .Now ().Add (- time .Second ))
347
347
defer cancel ()
348
348
349
349
send ,recv ,err := pubsub .NewLatencyMeasurer (logger ).Measure (ctx ,ps )
@@ -356,75 +356,78 @@ func TestMeasureLatency(t *testing.T) {
356
356
t .Parallel ()
357
357
358
358
var buf bytes.Buffer
359
- logger := slogtest .Make (t ,nil ).Leveled (slog .LevelDebug )
359
+ logger := slogtest .Make (t ,& slogtest. Options { IgnoreErrors : true } ).Leveled (slog .LevelDebug )
360
360
logger = logger .AppendSinks (sloghuman .Sink (& buf ))
361
361
362
362
lm := pubsub .NewLatencyMeasurer (logger )
363
363
ps ,done := newPubsub ()
364
364
defer done ()
365
365
366
- slow := newDelayedListener (ps ,time .Second )
367
- fast := newDelayedListener (ps ,time .Nanosecond )
366
+ slow := newDelayedPublisher (ps ,time .Second )
367
+ fast := newDelayedPublisher (ps ,time .Nanosecond )
368
+ hold := make (chan struct {},1 )
368
369
370
+ // Start two goroutines in which two subscribers are registered but the messages are received out-of-order because
371
+ // the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately
372
+ // receive their desired messages, but the slow publisher will receive an unexpected message first.
369
373
var wg sync.WaitGroup
370
374
wg .Add (2 )
371
-
372
- // Publish message concurrently to a slow receiver.
373
375
go func () {
374
376
ctx ,cancel := context .WithTimeout (context .Background (),testutil .WaitShort )
375
377
defer cancel ()
376
378
defer wg .Done ()
377
379
378
- // Slow receiver will not receive its latency message because the fast one receives it first.
379
- _ ,_ ,err := lm .Measure (ctx ,slow )
380
- require .ErrorContains (t ,err ,context .DeadlineExceeded .Error ())
380
+ hold <- struct {}{}
381
+ send ,recv ,err := lm .Measure (ctx ,slow )
382
+ assert .NoError (t ,err )
383
+ assert .Greater (t ,send ,0.0 )
384
+ assert .Greater (t ,recv ,0.0 )
385
+
386
+ // The slow subscriber will complete first and receive the fast publisher's message first.
387
+ logger .Sync ()
388
+ assert .Contains (t ,buf .String (),"received unexpected message" )
381
389
}()
382
390
383
- // Publish message concurrently to a fast receiver who will receive both its own and the slow receiver's messages.
384
- // It should ignore the unexpected message and consume its own, leaving the slow receiver to timeout since it
385
- // will never receive their own message.
386
391
go func () {
387
392
ctx ,cancel := context .WithTimeout (context .Background (),testutil .WaitShort )
388
393
defer cancel ()
389
394
defer wg .Done ()
390
395
396
+ // Force fast publisher to start after the slow one to avoid flakiness.
397
+ <- hold
398
+ time .Sleep (time .Millisecond * 50 )
391
399
send ,recv ,err := lm .Measure (ctx ,fast )
392
- require .NoError (t ,err )
393
- require .Greater (t ,send ,0.0 )
394
- require .Greater (t ,recv ,0.0 )
400
+ assert .NoError (t ,err )
401
+ assert .Greater (t ,send ,0.0 )
402
+ assert .Greater (t ,recv ,0.0 )
395
403
}()
396
404
397
405
wg .Wait ()
398
-
399
- // Flush the contents of the logger to its buffer.
400
- logger .Sync ()
401
- require .Contains (t ,buf .String (),"received unexpected message!" )
402
406
})
403
407
}
404
408
405
- type delayedListener struct {
409
+ type delayedPublisher struct {
406
410
pubsub.Pubsub
407
411
delay time.Duration
408
412
}
409
413
410
- func newDelayedListener (ps pubsub.Pubsub ,delay time.Duration )* delayedListener {
411
- return & delayedListener {Pubsub :ps ,delay :delay }
414
+ func newDelayedPublisher (ps pubsub.Pubsub ,delay time.Duration )* delayedPublisher {
415
+ return & delayedPublisher {Pubsub :ps ,delay :delay }
412
416
}
413
417
414
- func (s * delayedListener )Subscribe (event string ,listener pubsub.Listener ) (cancel func (),err error ) {
415
- time .Sleep (s .delay )
418
+ func (s * delayedPublisher )Subscribe (event string ,listener pubsub.Listener ) (cancel func (),err error ) {
416
419
return s .Pubsub .Subscribe (event ,listener )
417
420
}
418
421
419
- func (s * delayedListener )SubscribeWithErr (event string ,listener pubsub.ListenerWithErr ) (cancel func (),err error ) {
420
- time .Sleep (s .delay )
422
+ func (s * delayedPublisher )SubscribeWithErr (event string ,listener pubsub.ListenerWithErr ) (cancel func (),err error ) {
421
423
return s .Pubsub .SubscribeWithErr (event ,listener )
422
424
}
423
425
424
- func (s * delayedListener )Publish (event string ,message []byte )error {
426
+ func (s * delayedPublisher )Publish (event string ,message []byte )error {
427
+ time .Sleep (s .delay )
425
428
return s .Pubsub .Publish (event ,message )
426
429
}
427
430
428
- func (s * delayedListener )Close ()error {
431
+ func (s * delayedPublisher )Close ()error {
429
432
return s .Pubsub .Close ()
430
433
}