Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit52f1c2b

Browse files
committed
initial implementation
1 parentb47d54d commit52f1c2b

File tree

4 files changed

+159
-16
lines changed

4 files changed

+159
-16
lines changed

‎tailnet/controllers.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ func (t *tunnelUpdater) recvLoop() {
10491049
t.logger.Debug(context.Background(),"tunnel updater recvLoop started")
10501050
defert.logger.Debug(context.Background(),"tunnel updater recvLoop done")
10511051
deferclose(t.recvLoopDone)
1052+
freshState:=true
10521053
for {
10531054
update,err:=t.client.Recv()
10541055
iferr!=nil {
@@ -1061,8 +1062,10 @@ func (t *tunnelUpdater) recvLoop() {
10611062
}
10621063
t.logger.Debug(context.Background(),"got workspace update",
10631064
slog.F("workspace_update",update),
1065+
slog.F("fresh_state",freshState),
10641066
)
1065-
err=t.handleUpdate(update)
1067+
err=t.handleUpdate(update,freshState)
1068+
freshState=false
10661069
iferr!=nil {
10671070
t.logger.Critical(context.Background(),"failed to handle workspace Update",slog.Error(err))
10681071
cErr:=t.client.Close()
@@ -1083,6 +1086,7 @@ type WorkspaceUpdate struct {
10831086
UpsertedAgents []*Agent
10841087
DeletedWorkspaces []*Workspace
10851088
DeletedAgents []*Agent
1089+
FreshStatebool
10861090
}
10871091

10881092
func (w*WorkspaceUpdate)Clone()WorkspaceUpdate {
@@ -1091,6 +1095,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
10911095
UpsertedAgents:make([]*Agent,len(w.UpsertedAgents)),
10921096
DeletedWorkspaces:make([]*Workspace,len(w.DeletedWorkspaces)),
10931097
DeletedAgents:make([]*Agent,len(w.DeletedAgents)),
1098+
FreshState:w.FreshState,
10941099
}
10951100
fori,ws:=rangew.UpsertedWorkspaces {
10961101
clone.UpsertedWorkspaces[i]=&Workspace{
@@ -1115,7 +1120,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
11151120
returnclone
11161121
}
11171122

1118-
func (t*tunnelUpdater)handleUpdate(update*proto.WorkspaceUpdate)error {
1123+
func (t*tunnelUpdater)handleUpdate(update*proto.WorkspaceUpdate,freshStatebool)error {
11191124
t.Lock()
11201125
defert.Unlock()
11211126

@@ -1124,6 +1129,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
11241129
UpsertedAgents: []*Agent{},
11251130
DeletedWorkspaces: []*Workspace{},
11261131
DeletedAgents: []*Agent{},
1132+
FreshState:freshState,
11271133
}
11281134

11291135
for_,uw:=rangeupdate.UpsertedWorkspaces {

‎tailnet/controllers_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,13 +1611,15 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
16111611
},
16121612
DeletedWorkspaces: []*tailnet.Workspace{},
16131613
DeletedAgents: []*tailnet.Agent{},
1614+
FreshState:true,
16141615
}
16151616

16161617
// And the callback
16171618
cbUpdate:=testutil.TryReceive(ctx,t,fUH.ch)
16181619
require.Equal(t,currentState,cbUpdate)
16191620

1620-
// Current recvState should match
1621+
// Current recvState should match but shouldn't be a fresh state
1622+
currentState.FreshState=false
16211623
recvState,err:=updateCtrl.CurrentState()
16221624
require.NoError(t,err)
16231625
slices.SortFunc(recvState.UpsertedWorkspaces,func(a,b*tailnet.Workspace)int {
@@ -1692,12 +1694,14 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
16921694
},
16931695
DeletedWorkspaces: []*tailnet.Workspace{},
16941696
DeletedAgents: []*tailnet.Agent{},
1697+
FreshState:true,
16951698
}
16961699

16971700
cbUpdate:=testutil.TryReceive(ctx,t,fUH.ch)
16981701
require.Equal(t,initRecvUp,cbUpdate)
16991702

1700-
// Current state should match initial
1703+
// Current state should match initial but shouldn't be a fresh state
1704+
initRecvUp.FreshState=false
17011705
state,err:=updateCtrl.CurrentState()
17021706
require.NoError(t,err)
17031707
require.Equal(t,initRecvUp,state)
@@ -1753,6 +1757,7 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
17531757
"w1.coder.": {ws1a1IP},
17541758
}},
17551759
},
1760+
FreshState:false,
17561761
}
17571762
require.Equal(t,sndRecvUpdate,cbUpdate)
17581763

@@ -1771,6 +1776,7 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
17711776
},
17721777
DeletedWorkspaces: []*tailnet.Workspace{},
17731778
DeletedAgents: []*tailnet.Agent{},
1779+
FreshState:false,
17741780
},state)
17751781
}
17761782

‎vpn/tunnel.go

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,57 @@ func (u *updater) sendUpdateResponse(req *request[*TunnelMessage, *ManagerMessag
397397
// createPeerUpdateLocked creates a PeerUpdate message from a workspace update, populating
398398
// the network status of the agents.
399399
func (u*updater)createPeerUpdateLocked(update tailnet.WorkspaceUpdate)*PeerUpdate {
400+
// this flag is true on the first update after a reconnect
401+
ifupdate.FreshState {
402+
// ignoredWorkspaces is initially populated with the workspaces that are
403+
// in the current update. Later on we populate it with the deleted workspaces too
404+
// so that we don't send duplicate updates. Same applies to ignoredAgents.
405+
ignoredWorkspaces:=make(map[uuid.UUID]struct{},len(update.UpsertedWorkspaces))
406+
ignoredAgents:=make(map[uuid.UUID]struct{},len(update.UpsertedAgents))
407+
408+
for_,workspace:=rangeupdate.UpsertedWorkspaces {
409+
ignoredWorkspaces[workspace.ID]=struct{}{}
410+
}
411+
for_,agent:=rangeupdate.UpsertedAgents {
412+
ignoredAgents[agent.ID]=struct{}{}
413+
}
414+
for_,agent:=rangeu.agents {
415+
if_,ok:=ignoredAgents[agent.ID];!ok {
416+
// delete any current agents that are not in the new update
417+
update.DeletedAgents=append(update.DeletedAgents,&tailnet.Agent{
418+
ID:agent.ID,
419+
Name:agent.Name,
420+
WorkspaceID:agent.WorkspaceID,
421+
})
422+
// if the workspace connected to an agent we're deleting,
423+
// is not present in the fresh state, add it to the deleted workspaces
424+
if_,ok:=ignoredWorkspaces[agent.WorkspaceID];!ok {
425+
update.DeletedWorkspaces=append(update.DeletedWorkspaces,&tailnet.Workspace{
426+
// other fields cannot be populated because the tunnel
427+
// only stores agents and corresponding workspaceIDs
428+
ID:agent.WorkspaceID,
429+
})
430+
ignoredWorkspaces[agent.WorkspaceID]=struct{}{}
431+
}
432+
}
433+
}
434+
}
435+
400436
out:=&PeerUpdate{
401437
UpsertedWorkspaces:make([]*Workspace,len(update.UpsertedWorkspaces)),
402438
UpsertedAgents:make([]*Agent,len(update.UpsertedAgents)),
403439
DeletedWorkspaces:make([]*Workspace,len(update.DeletedWorkspaces)),
404440
DeletedAgents:make([]*Agent,len(update.DeletedAgents)),
405441
}
406442

407-
u.saveUpdateLocked(update)
443+
// save the workspace update to the tunnel's state, such that it can
444+
// be used to populate automated peer updates.
445+
for_,agent:=rangeupdate.UpsertedAgents {
446+
u.agents[agent.ID]=agent.Clone()
447+
}
448+
for_,agent:=rangeupdate.DeletedAgents {
449+
delete(u.agents,agent.ID)
450+
}
408451

409452
fori,ws:=rangeupdate.UpsertedWorkspaces {
410453
out.UpsertedWorkspaces[i]=&Workspace{
@@ -413,6 +456,7 @@ func (u *updater) createPeerUpdateLocked(update tailnet.WorkspaceUpdate) *PeerUp
413456
Status:Workspace_Status(ws.Status),
414457
}
415458
}
459+
416460
upsertedAgents:=u.convertAgentsLocked(update.UpsertedAgents)
417461
out.UpsertedAgents=upsertedAgents
418462
fori,ws:=rangeupdate.DeletedWorkspaces {
@@ -472,17 +516,6 @@ func (u *updater) convertAgentsLocked(agents []*tailnet.Agent) []*Agent {
472516
returnout
473517
}
474518

475-
// saveUpdateLocked saves the workspace update to the tunnel's state, such that it can
476-
// be used to populate automated peer updates.
477-
func (u*updater)saveUpdateLocked(update tailnet.WorkspaceUpdate) {
478-
for_,agent:=rangeupdate.UpsertedAgents {
479-
u.agents[agent.ID]=agent.Clone()
480-
}
481-
for_,agent:=rangeupdate.DeletedAgents {
482-
delete(u.agents,agent.ID)
483-
}
484-
}
485-
486519
// setConn sets the `conn` and returns false if there's already a connection set.
487520
func (u*updater)setConn(connConn)bool {
488521
u.mu.Lock()

‎vpn/tunnel_internal_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,104 @@ func TestTunnel_sendAgentUpdate(t *testing.T) {
486486
require.Equal(t,hsTime,req.msg.GetPeerUpdate().UpsertedAgents[0].LastHandshake.AsTime())
487487
}
488488

489+
funcTestTunnel_sendAgentUpdateReconnect(t*testing.T) {
490+
t.Parallel()
491+
492+
ctx:=testutil.Context(t,testutil.WaitShort)
493+
494+
mClock:=quartz.NewMock(t)
495+
496+
wID1:= uuid.UUID{1}
497+
aID1:= uuid.UUID{2}
498+
aID2:= uuid.UUID{3}
499+
hsTime:=time.Now().Add(-time.Minute).UTC()
500+
501+
client:=newFakeClient(ctx,t)
502+
conn:=newFakeConn(tailnet.WorkspaceUpdate{},hsTime)
503+
504+
tun,mgr:=setupTunnel(t,ctx,client,mClock)
505+
errCh:=make(chanerror,1)
506+
varresp*TunnelMessage
507+
gofunc() {
508+
r,err:=mgr.unaryRPC(ctx,&ManagerMessage{
509+
Msg:&ManagerMessage_Start{
510+
Start:&StartRequest{
511+
TunnelFileDescriptor:2,
512+
CoderUrl:"https://coder.example.com",
513+
ApiToken:"fakeToken",
514+
},
515+
},
516+
})
517+
resp=r
518+
errCh<-err
519+
}()
520+
testutil.RequireSend(ctx,t,client.ch,conn)
521+
err:=testutil.TryReceive(ctx,t,errCh)
522+
require.NoError(t,err)
523+
_,ok:=resp.Msg.(*TunnelMessage_Start)
524+
require.True(t,ok)
525+
526+
// Inform the tunnel of the initial state
527+
err=tun.Update(tailnet.WorkspaceUpdate{
528+
UpsertedWorkspaces: []*tailnet.Workspace{
529+
{
530+
ID:wID1,Name:"w1",Status:proto.Workspace_STARTING,
531+
},
532+
},
533+
UpsertedAgents: []*tailnet.Agent{
534+
{
535+
ID:aID1,
536+
Name:"agent1",
537+
WorkspaceID:wID1,
538+
Hosts:map[dnsname.FQDN][]netip.Addr{
539+
"agent1.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
540+
},
541+
},
542+
},
543+
})
544+
require.NoError(t,err)
545+
req:=testutil.TryReceive(ctx,t,mgr.requests)
546+
require.Nil(t,req.msg.Rpc)
547+
require.NotNil(t,req.msg.GetPeerUpdate())
548+
require.Len(t,req.msg.GetPeerUpdate().UpsertedAgents,1)
549+
require.Equal(t,aID1[:],req.msg.GetPeerUpdate().UpsertedAgents[0].Id)
550+
551+
// Upsert a new agent simulating a reconnect
552+
err=tun.Update(tailnet.WorkspaceUpdate{
553+
UpsertedWorkspaces: []*tailnet.Workspace{
554+
{
555+
ID:wID1,Name:"w1",Status:proto.Workspace_STARTING,
556+
},
557+
},
558+
UpsertedAgents: []*tailnet.Agent{
559+
{
560+
ID:aID2,
561+
Name:"agent2",
562+
WorkspaceID:wID1,
563+
Hosts:map[dnsname.FQDN][]netip.Addr{
564+
"agent2.coder.": {netip.MustParseAddr("fd60:627a:a42b:0101::")},
565+
},
566+
},
567+
},
568+
FreshState:true,
569+
})
570+
require.NoError(t,err)
571+
572+
// The new update only contains the new agent
573+
mClock.AdvanceNext()
574+
req=testutil.TryReceive(ctx,t,mgr.requests)
575+
require.Nil(t,req.msg.Rpc)
576+
peerUpdate:=req.msg.GetPeerUpdate()
577+
require.NotNil(t,peerUpdate)
578+
require.Len(t,peerUpdate.UpsertedAgents,1)
579+
require.Len(t,peerUpdate.DeletedAgents,1)
580+
581+
require.Equal(t,aID2[:],peerUpdate.UpsertedAgents[0].Id)
582+
require.Equal(t,hsTime,peerUpdate.UpsertedAgents[0].LastHandshake.AsTime())
583+
584+
require.Equal(t,aID1[:],peerUpdate.DeletedAgents[0].Id)
585+
}
586+
489587
//nolint:revive // t takes precedence
490588
funcsetupTunnel(t*testing.T,ctx context.Context,client*fakeClient,mClock quartz.Clock) (*Tunnel,*speaker[*ManagerMessage,*TunnelMessage,TunnelMessage]) {
491589
mp,tp:=net.Pipe()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp