Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5887867

Browse files
authored
chore: rework wsproxy mesh tests to avoid flakes (#20296)
- Attempts pings twice per replicasync callback in wsproxy- Reworks the test setup code to be more lenient and retry proxyregistration on failureClosescoder/internal#957
1 parent41de4ad commit5887867

File tree

3 files changed

+159
-79
lines changed

3 files changed

+159
-79
lines changed

‎enterprise/wsproxy/wsproxy.go‎

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
436436
returns,nil
437437
}
438438

439-
func (s*Server)RegisterNow()error {
440-
_,err:=s.registerLoop.RegisterNow()
439+
func (s*Server)RegisterNow(ctx context.Context)error {
440+
_,err:=s.registerLoop.RegisterNow(ctx)
441441
returnerr
442442
}
443443

@@ -521,7 +521,7 @@ func pingSiblingReplicas(ctx context.Context, logger slog.Logger, sf *singleflig
521521
errs:=make(chanerror,len(replicas))
522522
for_,peer:=rangereplicas {
523523
gofunc(peer codersdk.Replica) {
524-
err:=replicasync.PingPeerReplica(ctx,client,peer.RelayAddress)
524+
err:=pingReplica(ctx,client,peer)
525525
iferr!=nil {
526526
errs<-xerrors.Errorf("ping sibling replica %s (%s): %w",peer.Hostname,peer.RelayAddress,err)
527527
logger.Warn(ctx,"failed to ping sibling replica, this could happen if the replica has shutdown",
@@ -553,6 +553,28 @@ func pingSiblingReplicas(ctx context.Context, logger slog.Logger, sf *singleflig
553553
returnerrStrInterface.(string)
554554
}
555555

556+
// pingReplica pings a replica over it's internal relay address to ensure it's
557+
// reachable and alive for health purposes. It will try to ping the replica
558+
// twice if the first ping fails, with a short delay between attempts.
559+
funcpingReplica(ctx context.Context,client http.Client,replica codersdk.Replica)error {
560+
constattempts=2
561+
varerrerror
562+
fori:=0;i<attempts;i++ {
563+
err=replicasync.PingPeerReplica(ctx,client,replica.RelayAddress)
564+
iferr==nil {
565+
returnnil
566+
}
567+
ifi<attempts-1 {
568+
select {
569+
case<-ctx.Done():
570+
returnctx.Err()
571+
case<-time.After(1*time.Second):
572+
}
573+
}
574+
}
575+
returnerr
576+
}
577+
556578
func (s*Server)handleRegisterFailure(errerror) {
557579
ifs.ctx.Err()!=nil {
558580
return

‎enterprise/wsproxy/wsproxy_test.go‎

Lines changed: 129 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/http/httptest"
1111
"net/url"
12+
"sync"
1213
"testing"
1314
"time"
1415

@@ -35,6 +36,7 @@ import (
3536
"github.com/coder/coder/v2/codersdk"
3637
"github.com/coder/coder/v2/codersdk/workspacesdk"
3738
"github.com/coder/coder/v2/cryptorand"
39+
"github.com/coder/coder/v2/enterprise/coderd"
3840
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
3941
"github.com/coder/coder/v2/enterprise/coderd/license"
4042
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
@@ -464,6 +466,7 @@ func TestDERPMesh(t *testing.T) {
464466
"*",
465467
}
466468

469+
ctx:=testutil.Context(t,testutil.WaitLong)
467470
client,closer,api,_:=coderdenttest.NewWithAPI(t,&coderdenttest.Options{
468471
Options:&coderdtest.Options{
469472
DeploymentValues:deploymentValues,
@@ -494,35 +497,21 @@ func TestDERPMesh(t *testing.T) {
494497
require.NoError(t,err)
495498

496499
// Create 3 proxy replicas.
497-
constcount=3
498-
var (
499-
sessionToken=""
500-
proxies= [count]coderdenttest.WorkspaceProxy{}
501-
derpURLs= [count]string{}
502-
)
503-
fori:=rangeproxies {
504-
proxies[i]=coderdenttest.NewWorkspaceProxyReplica(t,api,client,&coderdenttest.ProxyOptions{
505-
Name:"best-proxy",
506-
Token:sessionToken,
507-
ProxyURL:proxyURL,
508-
})
509-
ifi==0 {
510-
sessionToken=proxies[i].Options.ProxySessionToken
511-
}
512-
513-
derpURL:=*proxies[i].ServerURL
500+
proxies:=createProxyReplicas(ctx,t,&createProxyReplicasOptions{
501+
API:api,
502+
Client:client,
503+
Name:"best-proxy",
504+
ProxyURL:proxyURL,
505+
ProxyToken:"",// will be generated automatically
506+
Count:3,
507+
})
508+
derpURLs:=make([]string,len(proxies))
509+
fori,proxy:=rangeproxies {
510+
derpURL:=*proxy.ServerURL
514511
derpURL.Path="/derp"
515512
derpURLs[i]=derpURL.String()
516513
}
517514

518-
// Force all proxies to re-register immediately. This ensures the DERP mesh
519-
// is up-to-date. In production this will happen automatically after about
520-
// 15 seconds.
521-
fori,proxy:=rangeproxies {
522-
err:=proxy.RegisterNow()
523-
require.NoErrorf(t,err,"failed to force proxy %d to re-register",i)
524-
}
525-
526515
// Generate cases. We have a case for:
527516
// - Each proxy to itself.
528517
// - Each proxy to each other proxy (one way, no duplicates).
@@ -533,7 +522,7 @@ func TestDERPMesh(t *testing.T) {
533522
cases=append(cases, [2]string{derpURL,derpURLs[j]})
534523
}
535524
}
536-
require.Len(t,cases, (count*(count+1))/2)// triangle number
525+
require.Len(t,cases, (len(proxies)*(len(proxies)+1))/2)// triangle number
537526

538527
fori,c:=rangecases {
539528
i,c:=i,c
@@ -598,7 +587,6 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
598587
}
599588

600589
t.Run("ProbeOK",func(t*testing.T) {
601-
t.Skip("flaky test: https://github.com/coder/internal/issues/957")
602590
t.Parallel()
603591

604592
deploymentValues:=coderdtest.DeploymentValues(t)
@@ -642,51 +630,14 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
642630
require.NoError(t,err)
643631

644632
// Create 6 proxy replicas.
645-
constcount=6
646-
var (
647-
sessionToken=""
648-
proxies= [count]coderdenttest.WorkspaceProxy{}
649-
replicaPingDone= [count]bool{}
650-
)
651-
fori:=rangeproxies {
652-
proxies[i]=coderdenttest.NewWorkspaceProxyReplica(t,api,client,&coderdenttest.ProxyOptions{
653-
Name:"proxy-1",
654-
Token:sessionToken,
655-
ProxyURL:proxyURL,
656-
ReplicaPingCallback:func(replicas []codersdk.Replica,errstring) {
657-
iflen(replicas)!=count-1 {
658-
// Still warming up...
659-
return
660-
}
661-
replicaPingDone[i]=true
662-
assert.Emptyf(t,err,"replica %d ping callback error",i)
663-
},
664-
})
665-
ifi==0 {
666-
sessionToken=proxies[i].Options.ProxySessionToken
667-
}
668-
}
669-
670-
// Force all proxies to re-register immediately. This ensures the DERP
671-
// mesh is up-to-date. In production this will happen automatically
672-
// after about 15 seconds.
673-
fori,proxy:=rangeproxies {
674-
err:=proxy.RegisterNow()
675-
require.NoErrorf(t,err,"failed to force proxy %d to re-register",i)
676-
}
677-
678-
// Ensure that all proxies have pinged.
679-
require.Eventually(t,func()bool {
680-
ok:=true
681-
fori:=rangeproxies {
682-
if!replicaPingDone[i] {
683-
t.Logf("replica %d has not pinged yet",i)
684-
ok=false
685-
}
686-
}
687-
returnok
688-
},testutil.WaitLong,testutil.IntervalSlow)
689-
t.Log("all replicas have pinged")
633+
proxies:=createProxyReplicas(ctx,t,&createProxyReplicasOptions{
634+
API:api,
635+
Client:client,
636+
Name:"proxy-1",
637+
ProxyURL:proxyURL,
638+
ProxyToken:"",// will be generated automatically
639+
Count:6,
640+
})
690641

691642
// Check they're all healthy according to /healthz-report.
692643
httpClient:=&http.Client{}
@@ -771,7 +722,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
771722
}
772723

773724
// Force the proxy to re-register immediately.
774-
err=proxy.RegisterNow()
725+
err=proxy.RegisterNow(ctx)
775726
require.NoError(t,err,"failed to force proxy to re-register")
776727

777728
// Wait for the ping to fail.
@@ -855,7 +806,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
855806

856807
// Force the proxy to re-register and wait for the ping to fail.
857808
for {
858-
err=proxy.RegisterNow()
809+
err=proxy.RegisterNow(ctx)
859810
require.NoError(t,err,"failed to force proxy to re-register")
860811

861812
pingRes:=testutil.TryReceive(ctx,t,replicaPingRes)
@@ -891,7 +842,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
891842
// Force the proxy to re-register and wait for the ping to be skipped
892843
// because there are no more siblings.
893844
for {
894-
err=proxy.RegisterNow()
845+
err=proxy.RegisterNow(ctx)
895846
require.NoError(t,err,"failed to force proxy to re-register")
896847

897848
replicaErr:=testutil.TryReceive(ctx,t,replicaPingRes)
@@ -1170,3 +1121,106 @@ func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstC
11701121
require.NoError(t,err,"send message via DERP")
11711122
}
11721123
}
1124+
1125+
typecreateProxyReplicasOptionsstruct {
1126+
API*coderd.API
1127+
Client*codersdk.Client
1128+
1129+
Namestring
1130+
ProxyURL*url.URL
1131+
// If ProxyToken is not provided, a new workspace proxy region will be
1132+
// created automatically using the API client.
1133+
ProxyTokenstring
1134+
Countint
1135+
}
1136+
1137+
// createProxyReplicas creates and runs a set of proxy replicas and ensures that
1138+
// they are all functioning correctly and aware of each other with no errors.
1139+
funccreateProxyReplicas(ctx context.Context,t*testing.T,opts*createProxyReplicasOptions) []coderdenttest.WorkspaceProxy {
1140+
t.Helper()
1141+
1142+
var (
1143+
proxies=make([]coderdenttest.WorkspaceProxy,opts.Count)
1144+
// replicaPingSuccessful tracks whether the replica ping callback
1145+
// was called with no errors for each replica.
1146+
replicaPingMutex sync.Mutex
1147+
replicaPingSuccessful=make([]bool,opts.Count)
1148+
)
1149+
fori:=rangeproxies {
1150+
proxies[i]=coderdenttest.NewWorkspaceProxyReplica(t,opts.API,opts.Client,&coderdenttest.ProxyOptions{
1151+
Name:opts.Name,
1152+
Token:opts.ProxyToken,
1153+
ProxyURL:opts.ProxyURL,
1154+
ReplicaPingCallback:func(siblings []codersdk.Replica,errstring) {
1155+
t.Logf("got wsproxy ping callback: i=%d, siblings=%v, err=%s",i,len(siblings),err)
1156+
1157+
replicaPingMutex.Lock()
1158+
deferreplicaPingMutex.Unlock()
1159+
// The replica only "successfully" pinged if it has the
1160+
// correct number of siblings and no error.
1161+
replicaPingSuccessful[i]=len(siblings)==opts.Count-1&&err==""
1162+
},
1163+
})
1164+
ifi==0 {
1165+
// The first proxy will have a new token if we just created a new
1166+
// proxy region.
1167+
opts.ProxyToken=proxies[i].Options.ProxySessionToken
1168+
}
1169+
}
1170+
1171+
// Force all proxies to re-register immediately. This ensures the DERP
1172+
// mesh is up-to-date. In production this will happen automatically
1173+
// after about 15 seconds.
1174+
fori,proxy:=rangeproxies {
1175+
err:=proxy.RegisterNow(ctx)
1176+
require.NoErrorf(t,err,"failed to force proxy %d to re-register",i)
1177+
}
1178+
1179+
// Ensure that all proxies have pinged successfully. If replicas haven't
1180+
// successfully pinged yet, force them to re-register again. We don't
1181+
// use require.Eventually here because it runs the condition function in
1182+
// a goroutine.
1183+
ticker:=time.NewTicker(testutil.IntervalSlow)
1184+
deferticker.Stop()
1185+
for {
1186+
var (
1187+
ok=true
1188+
wg sync.WaitGroup
1189+
)
1190+
1191+
// Copy the replicaPingSuccessful slice to a local variable so we can
1192+
// view the state of all proxies at the same point in time.
1193+
replicaPingSuccessfulCopy:=make([]bool,len(replicaPingSuccessful))
1194+
replicaPingMutex.Lock()
1195+
copy(replicaPingSuccessfulCopy,replicaPingSuccessful)
1196+
replicaPingMutex.Unlock()
1197+
1198+
fori,proxy:=rangeproxies {
1199+
success:=replicaPingSuccessfulCopy[i]
1200+
if!success {
1201+
t.Logf("replica %d has not successfully pinged yet",i)
1202+
ok=false
1203+
1204+
// Retry registration on this proxy.
1205+
wg.Add(1)
1206+
gofunc() {
1207+
deferwg.Done()
1208+
err:=proxy.RegisterNow(ctx)
1209+
t.Logf("replica %d re-registered: err=%v",i,err)
1210+
}()
1211+
}
1212+
}
1213+
wg.Wait()
1214+
ifok {
1215+
break
1216+
}
1217+
select {
1218+
case<-ctx.Done():
1219+
t.Fatal("proxies did not ping successfully in time:",ctx.Err())
1220+
case<-ticker.C:
1221+
}
1222+
}
1223+
t.Log("all replicas have pinged successfully")
1224+
1225+
returnproxies
1226+
}

‎enterprise/wsproxy/wsproxysdk/wsproxysdk.go‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,15 +404,19 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa
404404

405405
// RegisterNow asks the registration loop to register immediately. A timeout of
406406
// 2x the attempt timeout is used to wait for the response.
407-
func (l*RegisterWorkspaceProxyLoop)RegisterNow() (RegisterWorkspaceProxyResponse,error) {
407+
func (l*RegisterWorkspaceProxyLoop)RegisterNow(ctx context.Context) (RegisterWorkspaceProxyResponse,error) {
408408
// The channel is closed by the loop after sending the response.
409409
respCh:=make(chanRegisterWorkspaceProxyResponse,1)
410410
select {
411+
case<-ctx.Done():
412+
returnRegisterWorkspaceProxyResponse{},ctx.Err()
411413
case<-l.done:
412414
returnRegisterWorkspaceProxyResponse{},xerrors.New("proxy registration loop closed")
413415
casel.runLoopNow<-respCh:
414416
}
415417
select {
418+
case<-ctx.Done():
419+
returnRegisterWorkspaceProxyResponse{},ctx.Err()
416420
case<-l.done:
417421
returnRegisterWorkspaceProxyResponse{},xerrors.New("proxy registration loop closed")
418422
caseresp:=<-respCh:

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp