Expand Up @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httptest" "net/url" "sync" "testing" "time" Expand All @@ -35,6 +36,7 @@ import ( "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/enterprise/coderd" "github.com/coder/coder/v2/enterprise/coderd/coderdenttest" "github.com/coder/coder/v2/enterprise/coderd/license" "github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk" Expand Down Expand Up @@ -464,6 +466,7 @@ func TestDERPMesh(t *testing.T) { "*", } ctx := testutil.Context(t, testutil.WaitLong) client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{ Options: &coderdtest.Options{ DeploymentValues: deploymentValues, Expand Down Expand Up @@ -494,35 +497,21 @@ func TestDERPMesh(t *testing.T) { require.NoError(t, err) // Create 3 proxy replicas. const count = 3 var ( sessionToken = "" proxies = [count]coderdenttest.WorkspaceProxy{} derpURLs = [count]string{} ) for i := range proxies { proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "best-proxy", Token: sessionToken, ProxyURL: proxyURL, }) if i == 0 { sessionToken = proxies[i].Options.ProxySessionToken } derpURL := *proxies[i].ServerURL proxies := createProxyReplicas(ctx, t, &createProxyReplicasOptions{ API: api, Client: client, Name: "best-proxy", ProxyURL: proxyURL, ProxyToken: "", // will be generated automatically Count: 3, }) derpURLs := make([]string, len(proxies)) for i, proxy := range proxies { derpURL := *proxy.ServerURL derpURL.Path = "/derp" derpURLs[i] = derpURL.String() } // Force all proxies to re-register immediately. This ensures the DERP mesh // is up-to-date. In production this will happen automatically after about // 15 seconds. for i, proxy := range proxies { err := proxy.RegisterNow() require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) } // Generate cases. We have a case for: // - Each proxy to itself. // - Each proxy to each other proxy (one way, no duplicates). Expand All @@ -533,7 +522,7 @@ func TestDERPMesh(t *testing.T) { cases = append(cases, [2]string{derpURL, derpURLs[j]}) } } require.Len(t, cases, (count*(count +1))/2) // triangle number require.Len(t, cases, (len(proxies)*(len(proxies) +1))/2) // triangle number for i, c := range cases { i, c := i, c Expand Down Expand Up @@ -598,7 +587,6 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { } t.Run("ProbeOK", func(t *testing.T) { t.Skip("flaky test: https://github.com/coder/internal/issues/957") t.Parallel() deploymentValues := coderdtest.DeploymentValues(t) Expand Down Expand Up @@ -642,51 +630,14 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { require.NoError(t, err) // Create 6 proxy replicas. const count = 6 var ( sessionToken = "" proxies = [count]coderdenttest.WorkspaceProxy{} replicaPingDone = [count]bool{} ) for i := range proxies { proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{ Name: "proxy-1", Token: sessionToken, ProxyURL: proxyURL, ReplicaPingCallback: func(replicas []codersdk.Replica, err string) { if len(replicas) != count-1 { // Still warming up... return } replicaPingDone[i] = true assert.Emptyf(t, err, "replica %d ping callback error", i) }, }) if i == 0 { sessionToken = proxies[i].Options.ProxySessionToken } } // Force all proxies to re-register immediately. This ensures the DERP // mesh is up-to-date. In production this will happen automatically // after about 15 seconds. for i, proxy := range proxies { err := proxy.RegisterNow() require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) } // Ensure that all proxies have pinged. require.Eventually(t, func() bool { ok := true for i := range proxies { if !replicaPingDone[i] { t.Logf("replica %d has not pinged yet", i) ok = false } } return ok }, testutil.WaitLong, testutil.IntervalSlow) t.Log("all replicas have pinged") proxies := createProxyReplicas(ctx, t, &createProxyReplicasOptions{ API: api, Client: client, Name: "proxy-1", ProxyURL: proxyURL, ProxyToken: "", // will be generated automatically Count: 6, }) // Check they're all healthy according to /healthz-report. httpClient := &http.Client{} Expand Down Expand Up @@ -771,7 +722,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { } // Force the proxy to re-register immediately. err = proxy.RegisterNow() err = proxy.RegisterNow(ctx ) require.NoError(t, err, "failed to force proxy to re-register") // Wait for the ping to fail. Expand Down Expand Up @@ -855,7 +806,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { // Force the proxy to re-register and wait for the ping to fail. for { err = proxy.RegisterNow() err = proxy.RegisterNow(ctx ) require.NoError(t, err, "failed to force proxy to re-register") pingRes := testutil.TryReceive(ctx, t, replicaPingRes) Expand Down Expand Up @@ -891,7 +842,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) { // Force the proxy to re-register and wait for the ping to be skipped // because there are no more siblings. for { err = proxy.RegisterNow() err = proxy.RegisterNow(ctx ) require.NoError(t, err, "failed to force proxy to re-register") replicaErr := testutil.TryReceive(ctx, t, replicaPingRes) Expand Down Expand Up @@ -1170,3 +1121,106 @@ func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstC require.NoError(t, err, "send message via DERP") } } type createProxyReplicasOptions struct { API *coderd.API Client *codersdk.Client Name string ProxyURL *url.URL // If ProxyToken is not provided, a new workspace proxy region will be // created automatically using the API client. ProxyToken string Count int } // createProxyReplicas creates and runs a set of proxy replicas and ensures that // they are all functioning correctly and aware of each other with no errors. func createProxyReplicas(ctx context.Context, t *testing.T, opts *createProxyReplicasOptions) []coderdenttest.WorkspaceProxy { t.Helper() var ( proxies = make([]coderdenttest.WorkspaceProxy, opts.Count) // replicaPingSuccessful tracks whether the replica ping callback // was called with no errors for each replica. replicaPingMutex sync.Mutex replicaPingSuccessful = make([]bool, opts.Count) ) for i := range proxies { proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, opts.API, opts.Client, &coderdenttest.ProxyOptions{ Name: opts.Name, Token: opts.ProxyToken, ProxyURL: opts.ProxyURL, ReplicaPingCallback: func(siblings []codersdk.Replica, err string) { t.Logf("got wsproxy ping callback: i=%d, siblings=%v, err=%s", i, len(siblings), err) replicaPingMutex.Lock() defer replicaPingMutex.Unlock() // The replica only "successfully" pinged if it has the // correct number of siblings and no error. replicaPingSuccessful[i] = len(siblings) == opts.Count-1 && err == "" }, }) if i == 0 { // The first proxy will have a new token if we just created a new // proxy region. opts.ProxyToken = proxies[i].Options.ProxySessionToken } } // Force all proxies to re-register immediately. This ensures the DERP // mesh is up-to-date. In production this will happen automatically // after about 15 seconds. for i, proxy := range proxies { err := proxy.RegisterNow(ctx) require.NoErrorf(t, err, "failed to force proxy %d to re-register", i) } // Ensure that all proxies have pinged successfully. If replicas haven't // successfully pinged yet, force them to re-register again. We don't // use require.Eventually here because it runs the condition function in // a goroutine. ticker := time.NewTicker(testutil.IntervalSlow) defer ticker.Stop() for { var ( ok = true wg sync.WaitGroup ) // Copy the replicaPingSuccessful slice to a local variable so we can // view the state of all proxies at the same point in time. replicaPingSuccessfulCopy := make([]bool, len(replicaPingSuccessful)) replicaPingMutex.Lock() copy(replicaPingSuccessfulCopy, replicaPingSuccessful) replicaPingMutex.Unlock() for i, proxy := range proxies { success := replicaPingSuccessfulCopy[i] if !success { t.Logf("replica %d has not successfully pinged yet", i) ok = false // Retry registration on this proxy. wg.Add(1) go func() { defer wg.Done() err := proxy.RegisterNow(ctx) t.Logf("replica %d re-registered: err=%v", i, err) }() } } wg.Wait() if ok { break } select { case <-ctx.Done(): t.Fatal("proxies did not ping successfully in time:", ctx.Err()) case <-ticker.C: } } t.Log("all replicas have pinged successfully") return proxies }