Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: rework wsproxy mesh tests to avoid flakes#20296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
deansheather merged 3 commits intomainfromdean/wsproxy-flakes-2
Oct 16, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletionsenterprise/wsproxy/wsproxy.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -436,8 +436,8 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
return s, nil
}

func (s *Server) RegisterNow() error {
_, err := s.registerLoop.RegisterNow()
func (s *Server) RegisterNow(ctx context.Context) error {
_, err := s.registerLoop.RegisterNow(ctx)
return err
}

Expand DownExpand Up@@ -521,7 +521,7 @@ func pingSiblingReplicas(ctx context.Context, logger slog.Logger, sf *singleflig
errs := make(chan error, len(replicas))
for _, peer := range replicas {
go func(peer codersdk.Replica) {
err :=replicasync.PingPeerReplica(ctx, client, peer.RelayAddress)
err :=pingReplica(ctx, client, peer)
if err != nil {
errs <- xerrors.Errorf("ping sibling replica %s (%s): %w", peer.Hostname, peer.RelayAddress, err)
logger.Warn(ctx, "failed to ping sibling replica, this could happen if the replica has shutdown",
Expand DownExpand Up@@ -553,6 +553,28 @@ func pingSiblingReplicas(ctx context.Context, logger slog.Logger, sf *singleflig
return errStrInterface.(string)
}

// pingReplica pings a replica over it's internal relay address to ensure it's
// reachable and alive for health purposes. It will try to ping the replica
// twice if the first ping fails, with a short delay between attempts.
func pingReplica(ctx context.Context, client http.Client, replica codersdk.Replica) error {
const attempts = 2
var err error
for i := 0; i < attempts; i++ {
err = replicasync.PingPeerReplica(ctx, client, replica.RelayAddress)
if err == nil {
return nil
}
if i < attempts-1 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}
}
}
return err
}

func (s *Server) handleRegisterFailure(err error) {
if s.ctx.Err() != nil {
return
Expand Down
204 changes: 129 additions & 75 deletionsenterprise/wsproxy/wsproxy_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -9,6 +9,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"

Expand All@@ -35,6 +36,7 @@ import (
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/enterprise/coderd"
"github.com/coder/coder/v2/enterprise/coderd/coderdenttest"
"github.com/coder/coder/v2/enterprise/coderd/license"
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
Expand DownExpand Up@@ -464,6 +466,7 @@ func TestDERPMesh(t *testing.T) {
"*",
}

ctx := testutil.Context(t, testutil.WaitLong)
client, closer, api, _ := coderdenttest.NewWithAPI(t, &coderdenttest.Options{
Options: &coderdtest.Options{
DeploymentValues: deploymentValues,
Expand DownExpand Up@@ -494,35 +497,21 @@ func TestDERPMesh(t *testing.T) {
require.NoError(t, err)

// Create 3 proxy replicas.
const count = 3
var (
sessionToken = ""
proxies = [count]coderdenttest.WorkspaceProxy{}
derpURLs = [count]string{}
)
for i := range proxies {
proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{
Name: "best-proxy",
Token: sessionToken,
ProxyURL: proxyURL,
})
if i == 0 {
sessionToken = proxies[i].Options.ProxySessionToken
}

derpURL := *proxies[i].ServerURL
proxies := createProxyReplicas(ctx, t, &createProxyReplicasOptions{
API: api,
Client: client,
Name: "best-proxy",
ProxyURL: proxyURL,
ProxyToken: "", // will be generated automatically
Count: 3,
})
derpURLs := make([]string, len(proxies))
for i, proxy := range proxies {
derpURL := *proxy.ServerURL
derpURL.Path = "/derp"
derpURLs[i] = derpURL.String()
}

// Force all proxies to re-register immediately. This ensures the DERP mesh
// is up-to-date. In production this will happen automatically after about
// 15 seconds.
for i, proxy := range proxies {
err := proxy.RegisterNow()
require.NoErrorf(t, err, "failed to force proxy %d to re-register", i)
}

// Generate cases. We have a case for:
// - Each proxy to itself.
// - Each proxy to each other proxy (one way, no duplicates).
Expand All@@ -533,7 +522,7 @@ func TestDERPMesh(t *testing.T) {
cases = append(cases, [2]string{derpURL, derpURLs[j]})
}
}
require.Len(t, cases, (count*(count+1))/2) // triangle number
require.Len(t, cases, (len(proxies)*(len(proxies)+1))/2) // triangle number

for i, c := range cases {
i, c := i, c
Expand DownExpand Up@@ -598,7 +587,6 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
}

t.Run("ProbeOK", func(t *testing.T) {
t.Skip("flaky test: https://github.com/coder/internal/issues/957")
t.Parallel()

deploymentValues := coderdtest.DeploymentValues(t)
Expand DownExpand Up@@ -642,51 +630,14 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
require.NoError(t, err)

// Create 6 proxy replicas.
const count = 6
var (
sessionToken = ""
proxies = [count]coderdenttest.WorkspaceProxy{}
replicaPingDone = [count]bool{}
)
for i := range proxies {
proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{
Name: "proxy-1",
Token: sessionToken,
ProxyURL: proxyURL,
ReplicaPingCallback: func(replicas []codersdk.Replica, err string) {
if len(replicas) != count-1 {
// Still warming up...
return
}
replicaPingDone[i] = true
assert.Emptyf(t, err, "replica %d ping callback error", i)
},
})
if i == 0 {
sessionToken = proxies[i].Options.ProxySessionToken
}
}

// Force all proxies to re-register immediately. This ensures the DERP
// mesh is up-to-date. In production this will happen automatically
// after about 15 seconds.
for i, proxy := range proxies {
err := proxy.RegisterNow()
require.NoErrorf(t, err, "failed to force proxy %d to re-register", i)
}

// Ensure that all proxies have pinged.
require.Eventually(t, func() bool {
ok := true
for i := range proxies {
if !replicaPingDone[i] {
t.Logf("replica %d has not pinged yet", i)
ok = false
}
}
return ok
}, testutil.WaitLong, testutil.IntervalSlow)
t.Log("all replicas have pinged")
proxies := createProxyReplicas(ctx, t, &createProxyReplicasOptions{
API: api,
Client: client,
Name: "proxy-1",
ProxyURL: proxyURL,
ProxyToken: "", // will be generated automatically
Count: 6,
})

// Check they're all healthy according to /healthz-report.
httpClient := &http.Client{}
Expand DownExpand Up@@ -771,7 +722,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
}

// Force the proxy to re-register immediately.
err = proxy.RegisterNow()
err = proxy.RegisterNow(ctx)
require.NoError(t, err, "failed to force proxy to re-register")

// Wait for the ping to fail.
Expand DownExpand Up@@ -855,7 +806,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {

// Force the proxy to re-register and wait for the ping to fail.
for {
err = proxy.RegisterNow()
err = proxy.RegisterNow(ctx)
require.NoError(t, err, "failed to force proxy to re-register")

pingRes := testutil.TryReceive(ctx, t, replicaPingRes)
Expand DownExpand Up@@ -891,7 +842,7 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
// Force the proxy to re-register and wait for the ping to be skipped
// because there are no more siblings.
for {
err = proxy.RegisterNow()
err = proxy.RegisterNow(ctx)
require.NoError(t, err, "failed to force proxy to re-register")

replicaErr := testutil.TryReceive(ctx, t, replicaPingRes)
Expand DownExpand Up@@ -1170,3 +1121,106 @@ func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstC
require.NoError(t, err, "send message via DERP")
}
}

type createProxyReplicasOptions struct {
API *coderd.API
Client *codersdk.Client

Name string
ProxyURL *url.URL
// If ProxyToken is not provided, a new workspace proxy region will be
// created automatically using the API client.
ProxyToken string
Count int
}

// createProxyReplicas creates and runs a set of proxy replicas and ensures that
// they are all functioning correctly and aware of each other with no errors.
func createProxyReplicas(ctx context.Context, t *testing.T, opts *createProxyReplicasOptions) []coderdenttest.WorkspaceProxy {
t.Helper()

var (
proxies = make([]coderdenttest.WorkspaceProxy, opts.Count)
// replicaPingSuccessful tracks whether the replica ping callback
// was called with no errors for each replica.
replicaPingMutex sync.Mutex
replicaPingSuccessful = make([]bool, opts.Count)
)
for i := range proxies {
proxies[i] = coderdenttest.NewWorkspaceProxyReplica(t, opts.API, opts.Client, &coderdenttest.ProxyOptions{
Name: opts.Name,
Token: opts.ProxyToken,
ProxyURL: opts.ProxyURL,
ReplicaPingCallback: func(siblings []codersdk.Replica, err string) {
t.Logf("got wsproxy ping callback: i=%d, siblings=%v, err=%s", i, len(siblings), err)

replicaPingMutex.Lock()
defer replicaPingMutex.Unlock()
// The replica only "successfully" pinged if it has the
// correct number of siblings and no error.
replicaPingSuccessful[i] = len(siblings) == opts.Count-1 && err == ""
},
})
if i == 0 {
// The first proxy will have a new token if we just created a new
// proxy region.
opts.ProxyToken = proxies[i].Options.ProxySessionToken
}
}

// Force all proxies to re-register immediately. This ensures the DERP
// mesh is up-to-date. In production this will happen automatically
// after about 15 seconds.
for i, proxy := range proxies {
err := proxy.RegisterNow(ctx)
require.NoErrorf(t, err, "failed to force proxy %d to re-register", i)
}

// Ensure that all proxies have pinged successfully. If replicas haven't
// successfully pinged yet, force them to re-register again. We don't
// use require.Eventually here because it runs the condition function in
// a goroutine.
ticker := time.NewTicker(testutil.IntervalSlow)
defer ticker.Stop()
for {
var (
ok = true
wg sync.WaitGroup
)

// Copy the replicaPingSuccessful slice to a local variable so we can
// view the state of all proxies at the same point in time.
replicaPingSuccessfulCopy := make([]bool, len(replicaPingSuccessful))
replicaPingMutex.Lock()
copy(replicaPingSuccessfulCopy, replicaPingSuccessful)
replicaPingMutex.Unlock()

for i, proxy := range proxies {
success := replicaPingSuccessfulCopy[i]
if !success {
t.Logf("replica %d has not successfully pinged yet", i)
ok = false

// Retry registration on this proxy.
wg.Add(1)
go func() {
defer wg.Done()
err := proxy.RegisterNow(ctx)
t.Logf("replica %d re-registered: err=%v", i, err)
}()
}
}
wg.Wait()
if ok {
break
}
select {
case <-ctx.Done():
t.Fatal("proxies did not ping successfully in time:", ctx.Err())
case <-ticker.C:
}
}
t.Log("all replicas have pinged successfully")

return proxies
}
6 changes: 5 additions & 1 deletionenterprise/wsproxy/wsproxysdk/wsproxysdk.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -404,15 +404,19 @@ func (l *RegisterWorkspaceProxyLoop) Start(ctx context.Context) (RegisterWorkspa

// RegisterNow asks the registration loop to register immediately. A timeout of
// 2x the attempt timeout is used to wait for the response.
func (l *RegisterWorkspaceProxyLoop) RegisterNow() (RegisterWorkspaceProxyResponse, error) {
func (l *RegisterWorkspaceProxyLoop) RegisterNow(ctx context.Context) (RegisterWorkspaceProxyResponse, error) {
// The channel is closed by the loop after sending the response.
respCh := make(chan RegisterWorkspaceProxyResponse, 1)
select {
case <-ctx.Done():
return RegisterWorkspaceProxyResponse{}, ctx.Err()
case <-l.done:
return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed")
case l.runLoopNow <- respCh:
}
select {
case <-ctx.Done():
return RegisterWorkspaceProxyResponse{}, ctx.Err()
case <-l.done:
return RegisterWorkspaceProxyResponse{}, xerrors.New("proxy registration loop closed")
case resp := <-respCh:
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp