Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5f516ed

Browse files
authored
feat: improve coder connect tunnel handling on reconnect (#17598)
Closescoder/internal#563The [Coder Connecttunnel](https://github.com/coder/coder/blob/main/vpn/tunnel.go) receivesworkspace state from the Coder server over a [dRPCstream.](https://github.com/coder/coder/blob/114ba4593b2a82dfd41cdcb7fd6eb70d866e7b86/tailnet/controllers.go#L1029)When first connecting to this stream, the current state of the user'sworkspaces is received, with subsequent messages being diffs on top ofthat state.However, if the client disconnects from this stream, such as when theuser's device is suspended, and then reconnects later, no mechanismexists for the tunnel to differentiate that message containing theentire initial state from another diff, and so that state is incorrectlyapplied as a diff.In practice:- Tunnel connects, receives a workspace update containing all theexisting workspaces & agents.- Tunnel loses connection, but isn't completely stopped.- All the user's workspaces are restarted, producing a new set ofagents.- Tunnel regains connection, and receives a workspace update containingall the existing workspaces & agents.- This initial update is incorrectly applied as a diff, with theTunnel's state containing both the old & new agents.This PR introduces a solution in which tunnelUpdater, when created,sends a FreshState flag with the WorkspaceUpdate type. This flag ishandled in the vpn tunnel in the following fashion:- Preserve existing Agents- Remove current Agents in the tunnel that are not present in theWorkspaceUpdate- Remove unreferenced Workspaces
1 parentebad5c3 commit5f516ed

File tree

4 files changed

+498
-16
lines changed

4 files changed

+498
-16
lines changed

‎tailnet/controllers.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,21 @@ type Workspace struct {
897897
agentsmap[uuid.UUID]*Agent
898898
}
899899

900+
func (w*Workspace)Clone()Workspace {
901+
agents:=make(map[uuid.UUID]*Agent,len(w.agents))
902+
fork,v:=rangew.agents {
903+
clone:=v.Clone()
904+
agents[k]=&clone
905+
}
906+
returnWorkspace{
907+
ID:w.ID,
908+
Name:w.Name,
909+
Status:w.Status,
910+
ownerUsername:w.ownerUsername,
911+
agents:agents,
912+
}
913+
}
914+
900915
typeDNSNameOptionsstruct {
901916
Suffixstring
902917
}
@@ -1049,6 +1064,7 @@ func (t *tunnelUpdater) recvLoop() {
10491064
t.logger.Debug(context.Background(),"tunnel updater recvLoop started")
10501065
defert.logger.Debug(context.Background(),"tunnel updater recvLoop done")
10511066
deferclose(t.recvLoopDone)
1067+
updateKind:=Snapshot
10521068
for {
10531069
update,err:=t.client.Recv()
10541070
iferr!=nil {
@@ -1061,8 +1077,10 @@ func (t *tunnelUpdater) recvLoop() {
10611077
}
10621078
t.logger.Debug(context.Background(),"got workspace update",
10631079
slog.F("workspace_update",update),
1080+
slog.F("update_kind",updateKind),
10641081
)
1065-
err=t.handleUpdate(update)
1082+
err=t.handleUpdate(update,updateKind)
1083+
updateKind=Diff
10661084
iferr!=nil {
10671085
t.logger.Critical(context.Background(),"failed to handle workspace Update",slog.Error(err))
10681086
cErr:=t.client.Close()
@@ -1083,14 +1101,23 @@ type WorkspaceUpdate struct {
10831101
UpsertedAgents []*Agent
10841102
DeletedWorkspaces []*Workspace
10851103
DeletedAgents []*Agent
1104+
KindUpdateKind
10861105
}
10871106

1107+
typeUpdateKindint
1108+
1109+
const (
1110+
DiffUpdateKind=iota
1111+
Snapshot
1112+
)
1113+
10881114
func (w*WorkspaceUpdate)Clone()WorkspaceUpdate {
10891115
clone:=WorkspaceUpdate{
10901116
UpsertedWorkspaces:make([]*Workspace,len(w.UpsertedWorkspaces)),
10911117
UpsertedAgents:make([]*Agent,len(w.UpsertedAgents)),
10921118
DeletedWorkspaces:make([]*Workspace,len(w.DeletedWorkspaces)),
10931119
DeletedAgents:make([]*Agent,len(w.DeletedAgents)),
1120+
Kind:w.Kind,
10941121
}
10951122
fori,ws:=rangew.UpsertedWorkspaces {
10961123
clone.UpsertedWorkspaces[i]=&Workspace{
@@ -1115,7 +1142,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
11151142
returnclone
11161143
}
11171144

1118-
func (t*tunnelUpdater)handleUpdate(update*proto.WorkspaceUpdate)error {
1145+
func (t*tunnelUpdater)handleUpdate(update*proto.WorkspaceUpdate,updateKindUpdateKind)error {
11191146
t.Lock()
11201147
defert.Unlock()
11211148

@@ -1124,6 +1151,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
11241151
UpsertedAgents: []*Agent{},
11251152
DeletedWorkspaces: []*Workspace{},
11261153
DeletedAgents: []*Agent{},
1154+
Kind:updateKind,
11271155
}
11281156

11291157
for_,uw:=rangeupdate.UpsertedWorkspaces {

‎tailnet/controllers_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,7 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16111611
},
16121612
DeletedWorkspaces: []*tailnet.Workspace{},
16131613
DeletedAgents: []*tailnet.Agent{},
1614+
Kind:tailnet.Snapshot,
16141615
}
16151616

16161617
// And the callback
@@ -1626,6 +1627,9 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16261627
slices.SortFunc(recvState.UpsertedAgents,func(a,b*tailnet.Agent)int {
16271628
returnstrings.Compare(a.Name,b.Name)
16281629
})
1630+
// tunnel is still open, so it's a diff
1631+
currentState.Kind=tailnet.Diff
1632+
16291633
require.Equal(t,currentState,recvState)
16301634
}
16311635

@@ -1692,14 +1696,17 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
16921696
},
16931697
DeletedWorkspaces: []*tailnet.Workspace{},
16941698
DeletedAgents: []*tailnet.Agent{},
1699+
Kind:tailnet.Snapshot,
16951700
}
16961701

16971702
cbUpdate:=testutil.TryReceive(ctx,t,fUH.ch)
16981703
require.Equal(t,initRecvUp,cbUpdate)
16991704

1700-
// Current state should match initial
17011705
state,err:=updateCtrl.CurrentState()
17021706
require.NoError(t,err)
1707+
// tunnel is still open, so it's a diff
1708+
initRecvUp.Kind=tailnet.Diff
1709+
17031710
require.Equal(t,initRecvUp,state)
17041711

17051712
// Send update that removes w1a1 and adds w1a2

‎vpn/tunnel.go

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func NewTunnel(
8888
netLoopDone:make(chanstruct{}),
8989
uSendCh:s.sendCh,
9090
agents:map[uuid.UUID]tailnet.Agent{},
91+
workspaces:map[uuid.UUID]tailnet.Workspace{},
9192
clock:quartz.NewReal(),
9293
},
9394
}
@@ -347,7 +348,9 @@ type updater struct {
347348
uSendChchan<-*TunnelMessage
348349
// agents contains the agents that are currently connected to the tunnel.
349350
agentsmap[uuid.UUID]tailnet.Agent
350-
connConn
351+
// workspaces contains the workspaces to which agents are currently connected via the tunnel.
352+
workspacesmap[uuid.UUID]tailnet.Workspace
353+
connConn
351354

352355
clock quartz.Clock
353356
}
@@ -397,14 +400,32 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag
397400
// createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating
398401
// the network status of the agents.
399402
func (u*updater)createPeerUpdateLocked(update tailnet.WorkspaceUpdate)*PeerUpdate {
403+
// if the update is a snapshot, we need to process the full state
404+
ifupdate.Kind==tailnet.Snapshot {
405+
processSnapshotUpdate(&update,u.agents,u.workspaces)
406+
}
407+
400408
out:=&PeerUpdate{
401409
UpsertedWorkspaces:make([]*Workspace,len(update.UpsertedWorkspaces)),
402410
UpsertedAgents:make([]*Agent,len(update.UpsertedAgents)),
403411
DeletedWorkspaces:make([]*Workspace,len(update.DeletedWorkspaces)),
404412
DeletedAgents:make([]*Agent,len(update.DeletedAgents)),
405413
}
406414

407-
u.saveUpdateLocked(update)
415+
// save the workspace update to the tunnel's state, such that it can
416+
// be used to populate automated peer updates.
417+
for_,agent:=rangeupdate.UpsertedAgents {
418+
u.agents[agent.ID]=agent.Clone()
419+
}
420+
for_,agent:=rangeupdate.DeletedAgents {
421+
delete(u.agents,agent.ID)
422+
}
423+
for_,workspace:=rangeupdate.UpsertedWorkspaces {
424+
u.workspaces[workspace.ID]=workspace.Clone()
425+
}
426+
for_,workspace:=rangeupdate.DeletedWorkspaces {
427+
delete(u.workspaces,workspace.ID)
428+
}
408429

409430
fori,ws:=rangeupdate.UpsertedWorkspaces {
410431
out.UpsertedWorkspaces[i]=&Workspace{
@@ -413,6 +434,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
413434
Status:Workspace_Status(ws.Status),
414435
}
415436
}
437+
416438
upsertedAgents:=u.convertAgentsLocked(update.UpsertedAgents)
417439
out.UpsertedAgents=upsertedAgents
418440
fori,ws:=rangeupdate.DeletedWorkspaces {
@@ -472,17 +494,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
472494
returnout
473495
}
474496

475-
// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can
476-
// be used to populate automated peer updates.
477-
func (u*updater)saveUpdateLocked(update tailnet.WorkspaceUpdate) {
478-
for_,agent:=rangeupdate.UpsertedAgents {
479-
u.agents[agent.ID]=agent.Clone()
480-
}
481-
for_,agent:=rangeupdate.DeletedAgents {
482-
delete(u.agents,agent.ID)
483-
}
484-
}
485-
486497
// setConn sets the `conn` and returns false if there's already a connection set.
487498
func (u*updater)setConn(connConn)bool {
488499
u.mu.Lock()
@@ -552,6 +563,46 @@ func (u *updater) netStatusLoop() {
552563
}
553564
}
554565

566+
// processSnapshotUpdate handles the logic when a full state update is received.
567+
// When the tunnel is live, we only receive diffs, but the first packet on any given
568+
// reconnect to the tailnet API is a full state.
569+
// Without this logic we weren't processing deletes for any workspaces or agents deleted
570+
// while the client was disconnected while the computer was asleep.
571+
funcprocessSnapshotUpdate(update*tailnet.WorkspaceUpdate,agentsmap[uuid.UUID]tailnet.Agent,workspacesmap[uuid.UUID]tailnet.Workspace) {
572+
// ignoredWorkspaces is initially populated with the workspaces that are
573+
// in the current update. Later on we populate it with the deleted workspaces too
574+
// so that we don't send duplicate updates. Same applies to ignoredAgents.
575+
ignoredWorkspaces:=make(map[uuid.UUID]struct{},len(update.UpsertedWorkspaces))
576+
ignoredAgents:=make(map[uuid.UUID]struct{},len(update.UpsertedAgents))
577+
578+
for_,workspace:=rangeupdate.UpsertedWorkspaces {
579+
ignoredWorkspaces[workspace.ID]=struct{}{}
580+
}
581+
for_,agent:=rangeupdate.UpsertedAgents {
582+
ignoredAgents[agent.ID]=struct{}{}
583+
}
584+
for_,agent:=rangeagents {
585+
if_,present:=ignoredAgents[agent.ID];!present {
586+
// delete any current agents that are not in the new update
587+
update.DeletedAgents=append(update.DeletedAgents,&tailnet.Agent{
588+
ID:agent.ID,
589+
Name:agent.Name,
590+
WorkspaceID:agent.WorkspaceID,
591+
})
592+
}
593+
}
594+
for_,workspace:=rangeworkspaces {
595+
if_,present:=ignoredWorkspaces[workspace.ID];!present {
596+
update.DeletedWorkspaces=append(update.DeletedWorkspaces,&tailnet.Workspace{
597+
ID:workspace.ID,
598+
Name:workspace.Name,
599+
Status:workspace.Status,
600+
})
601+
ignoredWorkspaces[workspace.ID]=struct{}{}
602+
}
603+
}
604+
}
605+
555606
// hostsToIPStrings returns a slice of all unique IP addresses in the values
556607
// of the given map.
557608
funchostsToIPStrings(hostsmap[dnsname.FQDN][]netip.Addr) []string {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp