Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbb4a87e

Browse files
committed
review p1
1 parenteba9261 commitbb4a87e

File tree

2 files changed

+337
-348
lines changed

2 files changed

+337
-348
lines changed

‎enterprise/coderd/coderd_test.go‎

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,37 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"io"
8+
"net"
79
"net/http"
810
"net/http/httptest"
11+
"net/url"
912
"reflect"
1013
"strings"
14+
"sync"
1115
"testing"
1216
"time"
1317

1418
"github.com/google/uuid"
19+
"github.com/moby/moby/pkg/namesgenerator"
1520
"github.com/stretchr/testify/assert"
1621
"github.com/stretchr/testify/require"
1722
"go.uber.org/goleak"
1823

24+
"cdr.dev/slog"
1925
"cdr.dev/slog/sloggers/slogtest"
26+
"github.com/coder/coder/v2/agent"
27+
"github.com/coder/coder/v2/agent/agenttest"
2028
"github.com/coder/coder/v2/coderd/httpapi"
2129
"github.com/coder/coder/v2/coderd/rbac/policy"
30+
"github.com/coder/coder/v2/coderd/util/ptr"
2231
"github.com/coder/coder/v2/tailnet/tailnettest"
2332

2433
agplaudit"github.com/coder/coder/v2/coderd/audit"
2534
"github.com/coder/coder/v2/coderd/coderdtest"
2635
"github.com/coder/coder/v2/coderd/database"
2736
"github.com/coder/coder/v2/coderd/database/dbauthz"
37+
"github.com/coder/coder/v2/coderd/database/dbfake"
2838
"github.com/coder/coder/v2/coderd/database/dbmem"
2939
"github.com/coder/coder/v2/coderd/database/dbtestutil"
3040
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -522,3 +532,330 @@ func testDBAuthzRole(ctx context.Context) context.Context {
522532
Scope:rbac.ScopeAll,
523533
})
524534
}
535+
536+
// restartableListener is a TCP listener that can have all of it's connections
537+
// severed on demand.
538+
typerestartableListenerstruct {
539+
net.Listener
540+
mu sync.Mutex
541+
conns []net.Conn
542+
}
543+
544+
func (l*restartableListener)Accept() (net.Conn,error) {
545+
conn,err:=l.Listener.Accept()
546+
iferr!=nil {
547+
returnnil,err
548+
}
549+
l.mu.Lock()
550+
l.conns=append(l.conns,conn)
551+
l.mu.Unlock()
552+
returnconn,nil
553+
}
554+
555+
func (l*restartableListener)CloseConnections() {
556+
l.mu.Lock()
557+
deferl.mu.Unlock()
558+
for_,conn:=rangel.conns {
559+
_=conn.Close()
560+
}
561+
l.conns=nil
562+
}
563+
564+
typerestartableTestServerstruct {
565+
options*coderdenttest.Options
566+
rl*restartableListener
567+
568+
mu sync.Mutex
569+
api*coderd.API
570+
closer io.Closer
571+
}
572+
573+
funcnewRestartableTestServer(t*testing.T,options*coderdenttest.Options) (*codersdk.Client, codersdk.CreateFirstUserResponse,*restartableTestServer) {
574+
t.Helper()
575+
ifoptions==nil {
576+
options=&coderdenttest.Options{}
577+
}
578+
579+
s:=&restartableTestServer{
580+
options:options,
581+
}
582+
srv:=httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {
583+
s.mu.Lock()
584+
api:=s.api
585+
s.mu.Unlock()
586+
587+
ifapi==nil {
588+
w.WriteHeader(http.StatusBadGateway)
589+
_,_=w.Write([]byte("server is not started"))
590+
return
591+
}
592+
api.AGPL.RootHandler.ServeHTTP(w,r)
593+
}))
594+
s.rl=&restartableListener{Listener:srv.Listener}
595+
srv.Listener=s.rl
596+
srv.Start()
597+
t.Cleanup(srv.Close)
598+
599+
u,err:=url.Parse(srv.URL)
600+
require.NoError(t,err,"failed to parse server URL")
601+
s.options.AccessURL=u
602+
603+
client,firstUser:=s.startWithFirstUser(t)
604+
client.URL=u
605+
returnclient,firstUser,s
606+
}
607+
608+
func (s*restartableTestServer)Stop(t*testing.T) {
609+
t.Helper()
610+
611+
s.mu.Lock()
612+
closer:=s.closer
613+
s.closer=nil
614+
api:=s.api
615+
s.api=nil
616+
s.mu.Unlock()
617+
618+
ifcloser!=nil {
619+
err:=closer.Close()
620+
require.NoError(t,err)
621+
}
622+
ifapi!=nil {
623+
err:=api.Close()
624+
require.NoError(t,err)
625+
}
626+
627+
s.rl.CloseConnections()
628+
}
629+
630+
func (s*restartableTestServer)Start(t*testing.T) {
631+
t.Helper()
632+
_,_=s.startWithFirstUser(t)
633+
}
634+
635+
func (s*restartableTestServer)startWithFirstUser(t*testing.T) (client*codersdk.Client,firstUser codersdk.CreateFirstUserResponse) {
636+
t.Helper()
637+
s.mu.Lock()
638+
defers.mu.Unlock()
639+
640+
ifs.closer!=nil||s.api!=nil {
641+
t.Fatal("server already started, close must be called first")
642+
}
643+
// This creates it's own TCP listener unfortunately, but it's not being
644+
// used in this test.
645+
client,s.closer,s.api,firstUser=coderdenttest.NewWithAPI(t,s.options)
646+
647+
// Never add the first user or license on subsequent restarts.
648+
s.options.DontAddFirstUser=true
649+
s.options.DontAddLicense=true
650+
651+
returnclient,firstUser
652+
}
653+
654+
// Test_CoordinatorRollingRestart tests that two peers can maintain a connection
655+
// without forgetting about each other when a HA coordinator does a rolling
656+
// restart.
657+
//
658+
// We had a few issues with this in the past:
659+
// 1. We didn't allow clients to maintain their peer ID after a reconnect,
660+
// which resulted in the other peer thinking the client was a new peer.
661+
// (This is fixed and independently tested in AGPL code)
662+
// 2. HA coordinators would delete all peers (via FK constraints) when they
663+
// were closed, which meant tunnels would be deleted and peers would be
664+
// notified that the other peer was permanently gone.
665+
// (This is fixed and independently tested above)
666+
//
667+
// This test uses a real server and real clients.
668+
funcTestConn_CoordinatorRollingRestart(t*testing.T) {
669+
t.Parallel()
670+
671+
if!dbtestutil.WillUsePostgres() {
672+
t.Skip("test only with postgres")
673+
}
674+
675+
// Although DERP will have connection issues until the connection is
676+
// reestablished, any open connections should be maintained.
677+
//
678+
// Direct connections should be able to transmit packets throughout the
679+
// restart without issue.
680+
for_,direct:=range []bool{true,false} {
681+
direct:=direct
682+
name:="DERP"
683+
ifdirect {
684+
name="Direct"
685+
}
686+
687+
t.Run(name,func(t*testing.T) {
688+
t.Parallel()
689+
690+
store,ps:=dbtestutil.NewDB(t)
691+
dv:=coderdtest.DeploymentValues(t,func(dv*codersdk.DeploymentValues) {
692+
dv.DERP.Config.BlockDirect=serpent.Bool(!direct)
693+
})
694+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
695+
696+
// Create two restartable test servers with the same database.
697+
client1,user,s1:=newRestartableTestServer(t,&coderdenttest.Options{
698+
DontAddFirstUser:false,
699+
DontAddLicense:false,
700+
Options:&coderdtest.Options{
701+
Logger:ptr.Ref(logger.Named("server1")),
702+
Database:store,
703+
Pubsub:ps,
704+
DeploymentValues:dv,
705+
IncludeProvisionerDaemon:true,
706+
},
707+
LicenseOptions:&coderdenttest.LicenseOptions{
708+
Features: license.Features{
709+
codersdk.FeatureHighAvailability:1,
710+
},
711+
},
712+
})
713+
client2,_,s2:=newRestartableTestServer(t,&coderdenttest.Options{
714+
DontAddFirstUser:true,
715+
DontAddLicense:true,
716+
Options:&coderdtest.Options{
717+
Logger:ptr.Ref(logger.Named("server2")),
718+
Database:store,
719+
Pubsub:ps,
720+
DeploymentValues:dv,
721+
},
722+
})
723+
client2.SetSessionToken(client1.SessionToken())
724+
725+
workspace:=dbfake.WorkspaceBuild(t,store, database.Workspace{
726+
OrganizationID:user.OrganizationID,
727+
OwnerID:user.UserID,
728+
}).WithAgent().Do()
729+
730+
// Agent connects via the first coordinator.
731+
_=agenttest.New(t,client1.URL,workspace.AgentToken,func(o*agent.Options) {
732+
o.Logger=logger.Named("agent1")
733+
})
734+
resources:=coderdtest.NewWorkspaceAgentWaiter(t,client1,workspace.Workspace.ID).Wait()
735+
736+
agentID:=uuid.Nil
737+
for_,r:=rangeresources {
738+
for_,a:=ranger.Agents {
739+
agentID=a.ID
740+
break
741+
}
742+
}
743+
require.NotEqual(t,uuid.Nil,agentID)
744+
745+
// Client connects via the second coordinator.
746+
ctx:=testutil.Context(t,testutil.WaitSuperLong)
747+
workspaceClient2:=workspacesdk.New(client2)
748+
conn,err:=workspaceClient2.DialAgent(ctx,agentID,&workspacesdk.DialAgentOptions{
749+
Logger:logger.Named("client"),
750+
})
751+
require.NoError(t,err)
752+
deferconn.Close()
753+
754+
require.Eventually(t,func()bool {
755+
_,p2p,_,err:=conn.Ping(ctx)
756+
assert.NoError(t,err)
757+
returnp2p==direct
758+
},testutil.WaitShort,testutil.IntervalFast)
759+
760+
// Open a TCP server and connection to it through the tunnel that
761+
// should be maintained throughout the restart.
762+
tcpServerAddr:=tcpEchoServer(t)
763+
tcpConn,err:=conn.DialContext(ctx,"tcp",tcpServerAddr)
764+
require.NoError(t,err)
765+
defertcpConn.Close()
766+
writeReadEcho(t,ctx,tcpConn)
767+
768+
// Stop the first server.
769+
logger.Info(ctx,"test: stopping server 1")
770+
s1.Stop(t)
771+
772+
// Pings should fail on DERP but succeed on direct connections.
773+
pingCtx,pingCancel:=context.WithTimeout(ctx,2*time.Second)//nolint:gocritic // it's going to hang and timeout for DERP, so this needs to be short
774+
deferpingCancel()
775+
_,p2p,_,err:=conn.Ping(pingCtx)
776+
ifdirect {
777+
require.NoError(t,err)
778+
require.True(t,p2p,"expected direct connection")
779+
}else {
780+
require.ErrorIs(t,err,context.DeadlineExceeded)
781+
}
782+
783+
// The existing TCP connection should still be working if we're
784+
// using direct connections.
785+
ifdirect {
786+
writeReadEcho(t,ctx,tcpConn)
787+
}
788+
789+
// Start the first server again.
790+
logger.Info(ctx,"test: starting server 1")
791+
s1.Start(t)
792+
793+
// Restart the second server.
794+
logger.Info(ctx,"test: stopping server 2")
795+
s2.Stop(t)
796+
logger.Info(ctx,"test: starting server 2")
797+
s2.Start(t)
798+
799+
// Pings should eventually succeed on both DERP and direct
800+
// connections.
801+
require.True(t,conn.AwaitReachable(ctx))
802+
_,p2p,_,err=conn.Ping(ctx)
803+
require.NoError(t,err)
804+
require.Equal(t,direct,p2p,"mismatched p2p state")
805+
806+
// The existing TCP connection should still be working.
807+
writeReadEcho(t,ctx,tcpConn)
808+
})
809+
}
810+
}
811+
812+
functcpEchoServer(t*testing.T)string {
813+
varlistenerWg sync.WaitGroup
814+
tcpListener,err:=net.Listen("tcp","127.0.0.1:0")
815+
require.NoError(t,err)
816+
t.Cleanup(func() {
817+
_=tcpListener.Close()
818+
listenerWg.Wait()
819+
})
820+
listenerWg.Add(1)
821+
gofunc() {
822+
deferlistenerWg.Done()
823+
for {
824+
conn,err:=tcpListener.Accept()
825+
iferr!=nil {
826+
return
827+
}
828+
listenerWg.Add(1)
829+
gofunc() {
830+
deferlistenerWg.Done()
831+
deferconn.Close()
832+
_,_=io.Copy(conn,conn)
833+
}()
834+
}
835+
}()
836+
837+
returntcpListener.Addr().String()
838+
}
839+
840+
// nolint:revive // t takes precedence.
841+
funcwriteReadEcho(t*testing.T,ctx context.Context,conn net.Conn) {
842+
msg:=namesgenerator.GetRandomName(0)
843+
844+
deadline,ok:=ctx.Deadline()
845+
ifok {
846+
_=conn.SetWriteDeadline(deadline)
847+
deferconn.SetWriteDeadline(time.Time{})
848+
_=conn.SetReadDeadline(deadline)
849+
deferconn.SetReadDeadline(time.Time{})
850+
}
851+
852+
// Write a message
853+
_,err:=conn.Write([]byte(msg))
854+
require.NoError(t,err)
855+
856+
// Read the message back
857+
buf:=make([]byte,1024)
858+
n,err:=conn.Read(buf)
859+
require.NoError(t,err)
860+
require.Equal(t,msg,string(buf[:n]))
861+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp