Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf318880

Browse files
committed
feat: add support for multiple tunnel destinations in tailnet
1 parent778457b commitf318880

File tree

5 files changed

+530
-23
lines changed

5 files changed

+530
-23
lines changed

‎agent/agent_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,7 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
19181918
testCtx,testCtxCancel:=context.WithCancel(context.Background())
19191919
t.Cleanup(testCtxCancel)
19201920
clientID:=uuid.New()
1921-
ctrl:=tailnet.NewSingleDestController(logger,conn,agentID)
1921+
ctrl:=tailnet.NewTunnelSrcCoordController(logger,conn)
1922+
ctrl.AddDestination(agentID)
19221923
auth:= tailnet.ClientCoordinateeAuth{AgentID:agentID}
19231924
coordination:=ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger,clientID,auth,coordinator))
19241925
t.Cleanup(func() {
@@ -2408,7 +2409,8 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
24082409
testCtx,testCtxCancel:=context.WithCancel(context.Background())
24092410
t.Cleanup(testCtxCancel)
24102411
clientID:=uuid.New()
2411-
ctrl:=tailnet.NewSingleDestController(logger,conn,metadata.AgentID)
2412+
ctrl:=tailnet.NewTunnelSrcCoordController(logger,conn)
2413+
ctrl.AddDestination(metadata.AgentID)
24122414
auth:= tailnet.ClientCoordinateeAuth{AgentID:metadata.AgentID}
24132415
coordination:=ctrl.New(tailnet.NewInMemoryCoordinatorClient(
24142416
logger,clientID,auth,coordinator))

‎codersdk/workspacesdk/workspacesdk.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ func (c *Client) DialAgent(dialCtx context.Context, agentID uuid.UUID, options *
268268
_=conn.Close()
269269
}
270270
}()
271-
controller.CoordCtrl=tailnet.NewSingleDestController(options.Logger,conn,agentID)
271+
coordCtrl:=tailnet.NewTunnelSrcCoordController(options.Logger,conn)
272+
coordCtrl.AddDestination(agentID)
273+
controller.CoordCtrl=coordCtrl
272274
controller.DERPCtrl=tailnet.NewBasicDERPController(options.Logger,conn)
273275
controller.Run(ctx)
274276

‎tailnet/controllers.go

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"maps"
78
"math"
89
"strings"
910
"sync"
@@ -239,15 +240,17 @@ func (c *BasicCoordination) respLoop() {
239240
deferfunc() {
240241
cErr:=c.Client.Close()
241242
ifcErr!=nil {
242-
c.logger.Debug(context.Background(),"failed to close coordinate client after respLoop exit",slog.Error(cErr))
243+
c.logger.Debug(context.Background(),
244+
"failed to close coordinate client after respLoop exit",slog.Error(cErr))
243245
}
244246
c.coordinatee.SetAllPeersLost()
245247
close(c.respLoopDone)
246248
}()
247249
for {
248250
resp,err:=c.Client.Recv()
249251
iferr!=nil {
250-
c.logger.Debug(context.Background(),"failed to read from protocol",slog.Error(err))
252+
c.logger.Debug(context.Background(),
253+
"failed to read from protocol",slog.Error(err))
251254
c.SendErr(xerrors.Errorf("read: %w",err))
252255
return
253256
}
@@ -278,7 +281,8 @@ func (c *BasicCoordination) respLoop() {
278281
ReadyForHandshake:rfh,
279282
})
280283
iferr!=nil {
281-
c.logger.Debug(context.Background(),"failed to send ready for handshake",slog.Error(err))
284+
c.logger.Debug(context.Background(),
285+
"failed to send ready for handshake",slog.Error(err))
282286
c.SendErr(xerrors.Errorf("send: %w",err))
283287
return
284288
}
@@ -287,37 +291,158 @@ func (c *BasicCoordination) respLoop() {
287291
}
288292
}
289293

290-
typesingleDestControllerstruct {
294+
typeTunnelSrcCoordControllerstruct {
291295
*BasicCoordinationController
292-
dest uuid.UUID
296+
297+
mu sync.Mutex
298+
destsmap[uuid.UUID]struct{}
299+
coordination*BasicCoordination
293300
}
294301

295-
// NewSingleDestController creates a CoordinationController for Coder clients that connect to a
296-
// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent.
297-
funcNewSingleDestController(logger slog.Logger,coordinateeCoordinatee,dest uuid.UUID)CoordinationController {
298-
coordinatee.SetTunnelDestination(dest)
299-
return&singleDestController{
302+
// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively
303+
// tunnel sources (that is, they create tunnel --- Coder clients not workspaces).
304+
funcNewTunnelSrcCoordController(
305+
logger slog.Logger,coordinateeCoordinatee,
306+
)*TunnelSrcCoordController {
307+
return&TunnelSrcCoordController{
300308
BasicCoordinationController:&BasicCoordinationController{
301309
Logger:logger,
302310
Coordinatee:coordinatee,
303311
SendAcks:false,
304312
},
305-
dest:dest,
313+
dests:make(map[uuid.UUID]struct{}),
306314
}
307315
}
308316

309-
func (c*singleDestController)New(clientCoordinatorClient)CloserWaiter {
317+
func (c*TunnelSrcCoordController)New(clientCoordinatorClient)CloserWaiter {
318+
c.mu.Lock()
319+
deferc.mu.Unlock()
310320
b:=c.BasicCoordinationController.NewCoordination(client)
311-
err:=client.Send(&proto.CoordinateRequest{AddTunnel:&proto.CoordinateRequest_Tunnel{Id:c.dest[:]}})
312-
iferr!=nil {
313-
b.SendErr(err)
321+
c.coordination=b
322+
// resync destinations on reconnect
323+
fordest:=rangec.dests {
324+
err:=client.Send(&proto.CoordinateRequest{
325+
AddTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
326+
})
327+
iferr!=nil {
328+
b.SendErr(err)
329+
c.coordination=nil
330+
cErr:=client.Close()
331+
ifcErr!=nil {
332+
c.Logger.Debug(
333+
context.Background(),
334+
"failed to close coordinator client after add tunnel failure",
335+
slog.Error(cErr),
336+
)
337+
}
338+
break
339+
}
314340
}
315341
returnb
316342
}
317343

344+
func (c*TunnelSrcCoordController)AddDestination(dest uuid.UUID) {
345+
c.mu.Lock()
346+
deferc.mu.Unlock()
347+
c.Coordinatee.SetTunnelDestination(dest)// this prepares us for an ack
348+
c.dests[dest]=struct{}{}
349+
ifc.coordination==nil {
350+
return
351+
}
352+
err:=c.coordination.Client.Send(
353+
&proto.CoordinateRequest{
354+
AddTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
355+
})
356+
iferr!=nil {
357+
c.coordination.SendErr(err)
358+
cErr:=c.coordination.Client.Close()// close the client so we don't gracefully disconnect
359+
ifcErr!=nil {
360+
c.Logger.Debug(context.Background(),
361+
"failed to close coordinator client after add tunnel failure",
362+
slog.Error(cErr))
363+
}
364+
c.coordination=nil
365+
}
366+
}
367+
368+
func (c*TunnelSrcCoordController)RemoveDestination(dest uuid.UUID) {
369+
c.mu.Lock()
370+
deferc.mu.Unlock()
371+
delete(c.dests,dest)
372+
ifc.coordination==nil {
373+
return
374+
}
375+
err:=c.coordination.Client.Send(
376+
&proto.CoordinateRequest{
377+
RemoveTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
378+
})
379+
iferr!=nil {
380+
c.coordination.SendErr(err)
381+
cErr:=c.coordination.Client.Close()// close the client so we don't gracefully disconnect
382+
ifcErr!=nil {
383+
c.Logger.Debug(context.Background(),
384+
"failed to close coordinator client after remove tunnel failure",
385+
slog.Error(cErr))
386+
}
387+
c.coordination=nil
388+
}
389+
}
390+
391+
func (c*TunnelSrcCoordController)SyncDestinations(destinations []uuid.UUID) {
392+
c.mu.Lock()
393+
deferc.mu.Unlock()
394+
toAdd:=make(map[uuid.UUID]struct{})
395+
toRemove:=maps.Clone(c.dests)
396+
all:=make(map[uuid.UUID]struct{})
397+
for_,dest:=rangedestinations {
398+
all[dest]=struct{}{}
399+
delete(toRemove,dest)
400+
if_,ok:=c.dests[dest];!ok {
401+
toAdd[dest]=struct{}{}
402+
}
403+
}
404+
c.dests=all
405+
ifc.coordination==nil {
406+
return
407+
}
408+
varerrerror
409+
deferfunc() {
410+
iferr!=nil {
411+
c.coordination.SendErr(err)
412+
cErr:=c.coordination.Client.Close()// don't gracefully disconnect
413+
ifcErr!=nil {
414+
c.Logger.Debug(context.Background(),
415+
"failed to close coordinator client during sync destinations",
416+
slog.Error(cErr))
417+
}
418+
c.coordination=nil
419+
}
420+
}()
421+
fordest:=rangetoAdd {
422+
err=c.coordination.Client.Send(
423+
&proto.CoordinateRequest{
424+
AddTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
425+
})
426+
iferr!=nil {
427+
return
428+
}
429+
}
430+
fordest:=rangetoRemove {
431+
err=c.coordination.Client.Send(
432+
&proto.CoordinateRequest{
433+
RemoveTunnel:&proto.CoordinateRequest_Tunnel{Id:UUIDToByteSlice(dest)},
434+
})
435+
iferr!=nil {
436+
return
437+
}
438+
}
439+
}
440+
318441
// NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
319442
// create tunnels and always send ReadyToHandshake acknowledgements.
320-
funcNewAgentCoordinationController(logger slog.Logger,coordinateeCoordinatee)CoordinationController {
443+
funcNewAgentCoordinationController(
444+
logger slog.Logger,coordinateeCoordinatee,
445+
)CoordinationController {
321446
return&BasicCoordinationController{
322447
Logger:logger,
323448
Coordinatee:coordinatee,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp