|
9 | 9 | "fmt"
|
10 | 10 | "math/rand"
|
11 | 11 | "strconv"
|
12 |
| -"sync" |
13 | 12 | "testing"
|
14 | 13 | "time"
|
15 | 14 |
|
@@ -363,71 +362,46 @@ func TestMeasureLatency(t *testing.T) {
|
363 | 362 | ps,done:=newPubsub()
|
364 | 363 | deferdone()
|
365 | 364 |
|
366 |
| -slow:=newDelayedPublisher(ps,time.Second) |
367 |
| -fast:=newDelayedPublisher(ps,time.Nanosecond) |
368 |
| -hold:=make(chanstruct{},1) |
369 |
| - |
370 |
| -// Start two goroutines in which two subscribers are registered but the messages are received out-of-order because |
371 |
| -// the first Pubsub will publish its message slowly and the second will publish it quickly. Both will ultimately |
372 |
| -// receive their desired messages, but the slow publisher will receive an unexpected message first. |
373 |
| -varwg sync.WaitGroup |
374 |
| -wg.Add(2) |
375 |
| -gofunc() { |
376 |
| -ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort) |
377 |
| -defercancel() |
378 |
| -deferwg.Done() |
379 |
| - |
380 |
| -hold<-struct{}{} |
381 |
| -l:=lm.Measure(ctx,slow) |
382 |
| -assert.NoError(t,l.Err) |
383 |
| -assert.Greater(t,l.Send.Seconds(),0.0) |
384 |
| -assert.Greater(t,l.Recv.Seconds(),0.0) |
385 |
| - |
386 |
| -// The slow subscriber will complete first and receive the fast publisher's message first. |
387 |
| -logger.Sync() |
388 |
| -assert.Contains(t,buf.String(),"received unexpected message") |
389 |
| -}() |
| 365 | +racy:=newRacyPubsub(ps) |
| 366 | +ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort) |
| 367 | +defercancel() |
390 | 368 |
|
391 |
| -gofunc() { |
392 |
| -ctx,cancel:=context.WithTimeout(context.Background(),testutil.WaitShort) |
393 |
| -defercancel() |
394 |
| -deferwg.Done() |
395 |
| - |
396 |
| -// Force fast publisher to start after the slow one to avoid flakiness. |
397 |
| -<-hold |
398 |
| -time.Sleep(time.Millisecond*50) |
399 |
| -l:=lm.Measure(ctx,fast) |
400 |
| -assert.NoError(t,l.Err) |
401 |
| -assert.Greater(t,l.Send.Seconds(),0.0) |
402 |
| -assert.Greater(t,l.Recv.Seconds(),0.0) |
403 |
| -}() |
| 369 | +l:=lm.Measure(ctx,racy) |
| 370 | +assert.NoError(t,l.Err) |
| 371 | +assert.Greater(t,l.Send.Seconds(),0.0) |
| 372 | +assert.Greater(t,l.Recv.Seconds(),0.0) |
404 | 373 |
|
405 |
| -wg.Wait() |
| 374 | +logger.Sync() |
| 375 | +assert.Contains(t,buf.String(),"received unexpected message") |
406 | 376 | })
|
407 | 377 | }
|
408 | 378 |
|
409 |
| -typedelayedPublisherstruct { |
| 379 | +// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not). |
| 380 | +// This is used to verify that a subscriber will only listen for the message it explicitly expects. |
| 381 | +typeracyPubsubstruct { |
410 | 382 | pubsub.Pubsub
|
411 |
| -delay time.Duration |
412 | 383 | }
|
413 | 384 |
|
414 |
| -funcnewDelayedPublisher(ps pubsub.Pubsub,delay time.Duration)*delayedPublisher { |
415 |
| -return&delayedPublisher{Pubsub:ps,delay:delay} |
| 385 | +funcnewRacyPubsub(ps pubsub.Pubsub)*racyPubsub { |
| 386 | +return&racyPubsub{ps} |
416 | 387 | }
|
417 | 388 |
|
418 |
| -func (s*delayedPublisher)Subscribe(eventstring,listener pubsub.Listener) (cancelfunc(),errerror) { |
| 389 | +func (s*racyPubsub)Subscribe(eventstring,listener pubsub.Listener) (cancelfunc(),errerror) { |
419 | 390 | returns.Pubsub.Subscribe(event,listener)
|
420 | 391 | }
|
421 | 392 |
|
422 |
| -func (s*delayedPublisher)SubscribeWithErr(eventstring,listener pubsub.ListenerWithErr) (cancelfunc(),errerror) { |
| 393 | +func (s*racyPubsub)SubscribeWithErr(eventstring,listener pubsub.ListenerWithErr) (cancelfunc(),errerror) { |
423 | 394 | returns.Pubsub.SubscribeWithErr(event,listener)
|
424 | 395 | }
|
425 | 396 |
|
426 |
| -func (s*delayedPublisher)Publish(eventstring,message []byte)error { |
427 |
| -time.Sleep(s.delay) |
| 397 | +func (s*racyPubsub)Publish(eventstring,message []byte)error { |
| 398 | +err:=s.Pubsub.Publish(event, []byte("nonsense")) |
| 399 | +iferr!=nil { |
| 400 | +returnxerrors.Errorf("failed to send simulated race: %w",err) |
| 401 | +} |
428 | 402 | returns.Pubsub.Publish(event,message)
|
429 | 403 | }
|
430 | 404 |
|
431 |
| -func (s*delayedPublisher)Close()error { |
| 405 | +func (s*racyPubsub)Close()error { |
432 | 406 | returns.Pubsub.Close()
|
433 | 407 | }
|