Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite61389c

Browse files
committed
feat: changes codersdk to use tailnet v2 for DERPMap updates
1 parenta0dd3a7 commite61389c

File tree

1 file changed

+186
-136
lines changed

1 file changed

+186
-136
lines changed

‎codersdk/workspaceagents.go

Lines changed: 186 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"strings"
1515
"time"
1616

17+
"golang.org/x/sync/errgroup"
18+
1719
"github.com/google/uuid"
1820
"golang.org/x/xerrors"
1921
"nhooyr.io/websocket"
@@ -317,142 +319,28 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
317319
q:=coordinateURL.Query()
318320
q.Add("version",proto.CurrentVersion.String())
319321
coordinateURL.RawQuery=q.Encode()
320-
closedCoordinator:=make(chanstruct{})
321-
// Must only ever be used once, send error OR close to avoid
322-
// reassignment race. Buffered so we don't hang in goroutine.
323-
firstCoordinator:=make(chanerror,1)
324-
gofunc() {
325-
deferclose(closedCoordinator)
326-
isFirst:=true
327-
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(ctx); {
328-
options.Logger.Debug(ctx,"connecting")
329-
// nolint:bodyclose
330-
ws,res,err:=websocket.Dial(ctx,coordinateURL.String(),&websocket.DialOptions{
331-
HTTPClient:c.HTTPClient,
332-
HTTPHeader:headers,
333-
// Need to disable compression to avoid a data-race.
334-
CompressionMode:websocket.CompressionDisabled,
335-
})
336-
ifisFirst {
337-
ifres!=nil&&res.StatusCode==http.StatusConflict {
338-
firstCoordinator<-ReadBodyAsError(res)
339-
return
340-
}
341-
isFirst=false
342-
close(firstCoordinator)
343-
}
344-
iferr!=nil {
345-
iferrors.Is(err,context.Canceled) {
346-
return
347-
}
348-
options.Logger.Debug(ctx,"failed to dial",slog.Error(err))
349-
continue
350-
}
351-
client,err:=tailnet.NewDRPCClient(websocket.NetConn(ctx,ws,websocket.MessageBinary))
352-
iferr!=nil {
353-
options.Logger.Debug(ctx,"failed to create DRPCClient",slog.Error(err))
354-
_=ws.Close(websocket.StatusInternalError,"")
355-
continue
356-
}
357-
coordinate,err:=client.Coordinate(ctx)
358-
iferr!=nil {
359-
options.Logger.Debug(ctx,"failed to reach the Coordinate endpoint",slog.Error(err))
360-
_=ws.Close(websocket.StatusInternalError,"")
361-
continue
362-
}
363-
364-
coordination:=tailnet.NewRemoteCoordination(options.Logger,coordinate,conn,agentID)
365-
options.Logger.Debug(ctx,"serving coordinator")
366-
err=<-coordination.Error()
367-
iferrors.Is(err,context.Canceled) {
368-
_=ws.Close(websocket.StatusGoingAway,"")
369-
return
370-
}
371-
iferr!=nil {
372-
options.Logger.Debug(ctx,"error serving coordinator",slog.Error(err))
373-
_=ws.Close(websocket.StatusGoingAway,"")
374-
continue
375-
}
376-
_=ws.Close(websocket.StatusGoingAway,"")
377-
}
378-
}()
379-
380-
derpMapURL,err:=c.URL.Parse("/api/v2/derp-map")
381-
iferr!=nil {
382-
returnnil,xerrors.Errorf("parse url: %w",err)
383-
}
384-
closedDerpMap:=make(chanstruct{})
385-
// Must only ever be used once, send error OR close to avoid
386-
// reassignment race. Buffered so we don't hang in goroutine.
387-
firstDerpMap:=make(chanerror,1)
388-
gofunc() {
389-
deferclose(closedDerpMap)
390-
isFirst:=true
391-
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(ctx); {
392-
options.Logger.Debug(ctx,"connecting to server for derp map updates")
393-
// nolint:bodyclose
394-
ws,res,err:=websocket.Dial(ctx,derpMapURL.String(),&websocket.DialOptions{
395-
HTTPClient:c.HTTPClient,
396-
HTTPHeader:headers,
397-
// Need to disable compression to avoid a data-race.
398-
CompressionMode:websocket.CompressionDisabled,
399-
})
400-
ifisFirst {
401-
ifres!=nil&&res.StatusCode==http.StatusConflict {
402-
firstDerpMap<-ReadBodyAsError(res)
403-
return
404-
}
405-
isFirst=false
406-
close(firstDerpMap)
407-
}
408-
iferr!=nil {
409-
iferrors.Is(err,context.Canceled) {
410-
return
411-
}
412-
options.Logger.Debug(ctx,"failed to dial",slog.Error(err))
413-
continue
414-
}
415-
416-
var (
417-
nconn=websocket.NetConn(ctx,ws,websocket.MessageBinary)
418-
dec=json.NewDecoder(nconn)
419-
)
420-
for {
421-
varderpMap tailcfg.DERPMap
422-
err:=dec.Decode(&derpMap)
423-
ifxerrors.Is(err,context.Canceled) {
424-
_=ws.Close(websocket.StatusGoingAway,"")
425-
return
426-
}
427-
iferr!=nil {
428-
options.Logger.Debug(ctx,"failed to decode derp map",slog.Error(err))
429-
_=ws.Close(websocket.StatusGoingAway,"")
430-
return
431-
}
432-
433-
if!tailnet.CompareDERPMaps(conn.DERPMap(),&derpMap) {
434-
options.Logger.Debug(ctx,"updating derp map due to detected changes")
435-
conn.SetDERPMap(&derpMap)
436-
}
437-
}
438-
}
439-
}()
440322

441-
forfirstCoordinator!=nil||firstDerpMap!=nil {
442-
select {
443-
case<-dialCtx.Done():
444-
returnnil,xerrors.Errorf("timed out waiting for coordinator and derp map: %w",dialCtx.Err())
445-
caseerr=<-firstCoordinator:
446-
iferr!=nil {
447-
returnnil,xerrors.Errorf("start coordinator: %w",err)
448-
}
449-
firstCoordinator=nil
450-
caseerr=<-firstDerpMap:
451-
iferr!=nil {
452-
returnnil,xerrors.Errorf("receive derp map: %w",err)
453-
}
454-
firstDerpMap=nil
323+
connector:=runTailnetAPIConnector(ctx,options.Logger,
324+
agentID,coordinateURL.String(),
325+
&websocket.DialOptions{
326+
HTTPClient:c.HTTPClient,
327+
HTTPHeader:headers,
328+
// Need to disable compression to avoid a data-race.
329+
CompressionMode:websocket.CompressionDisabled,
330+
},
331+
conn,
332+
)
333+
options.Logger.Debug(ctx,"running tailnet API v2+ connector")
334+
335+
select {
336+
case<-dialCtx.Done():
337+
returnnil,xerrors.Errorf("timed out waiting for coordinator and derp map: %w",dialCtx.Err())
338+
caseerr=<-connector.connected:
339+
iferr!=nil {
340+
options.Logger.Error(ctx,"failed to connect to tailnet v2+ API",slog.Error(err))
341+
returnnil,xerrors.Errorf("start connector: %w",err)
455342
}
343+
options.Logger.Debug(ctx,"connected to tailnet v2+ API")
456344
}
457345

458346
agentConn=NewWorkspaceAgentConn(conn,WorkspaceAgentConnOptions{
@@ -464,8 +352,7 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
464352
AgentIP:WorkspaceAgentIP,
465353
CloseFunc:func()error {
466354
cancel()
467-
<-closedCoordinator
468-
<-closedDerpMap
355+
<-connector.closed
469356
returnconn.Close()
470357
},
471358
})
@@ -478,6 +365,169 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
478365
returnagentConn,nil
479366
}
480367

368+
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
369+
//
370+
// 1) run the Coordinate API and pass node information back and forth
371+
// 2) stream DERPMap updates and program the Conn
372+
//
373+
// These functions share the same websocket, and so are combined here so that if we hit a problem
374+
// we tear the whole thing down and start over with a new websocket.
375+
typetailnetAPIConnectorstruct {
376+
ctx context.Context
377+
logger slog.Logger
378+
379+
agentID uuid.UUID
380+
coordinateURLstring
381+
dialOptions*websocket.DialOptions
382+
conn*tailnet.Conn
383+
384+
connectedchanerror
385+
isFirstbool
386+
closedchanstruct{}
387+
}
388+
389+
// runTailnetAPIConnector creates and runs a tailnetAPIConnector
390+
funcrunTailnetAPIConnector(
391+
ctx context.Context,logger slog.Logger,
392+
agentID uuid.UUID,coordinateURLstring,dialOptions*websocket.DialOptions,
393+
conn*tailnet.Conn,
394+
)*tailnetAPIConnector {
395+
tac:=&tailnetAPIConnector{
396+
ctx:ctx,
397+
logger:logger,
398+
agentID:agentID,
399+
coordinateURL:coordinateURL,
400+
dialOptions:dialOptions,
401+
conn:conn,
402+
connected:make(chanerror,1),
403+
closed:make(chanstruct{}),
404+
}
405+
gotac.run()
406+
returntac
407+
}
408+
409+
func (tac*tailnetAPIConnector)run() {
410+
tac.isFirst=true
411+
deferclose(tac.closed)
412+
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(tac.ctx); {
413+
tailnetClient,err:=tac.dial()
414+
iferr!=nil {
415+
continue
416+
}
417+
tac.logger.Debug(tac.ctx,"obtained tailnet API v2+ client")
418+
tac.coordinateAndDERPMap(tailnetClient)
419+
tac.logger.Debug(tac.ctx,"tailnet API v2+ connection lost")
420+
}
421+
}
422+
423+
func (tac*tailnetAPIConnector)dial() (proto.DRPCTailnetClient,error) {
424+
tac.logger.Debug(tac.ctx,"dialing Coder tailnet v2+ API")
425+
// nolint:bodyclose
426+
ws,res,err:=websocket.Dial(tac.ctx,tac.coordinateURL,tac.dialOptions)
427+
iftac.isFirst {
428+
ifres!=nil&&res.StatusCode==http.StatusConflict {
429+
err=ReadBodyAsError(res)
430+
tac.connected<-err
431+
returnnil,err
432+
}
433+
tac.isFirst=false
434+
close(tac.connected)
435+
}
436+
iferr!=nil {
437+
if!errors.Is(err,context.Canceled) {
438+
tac.logger.Error(tac.ctx,"failed to dial tailnet v2+ API",slog.Error(err))
439+
}
440+
returnnil,err
441+
}
442+
client,err:=tailnet.NewDRPCClient(websocket.NetConn(tac.ctx,ws,websocket.MessageBinary))
443+
iferr!=nil {
444+
tac.logger.Debug(tac.ctx,"failed to create DRPCClient",slog.Error(err))
445+
_=ws.Close(websocket.StatusInternalError,"")
446+
returnnil,err
447+
}
448+
returnclient,err
449+
}
450+
451+
// coordinateAndDERPMap uses the provided client to coordinate and stream DERP Maps. It is combined
452+
// into one function so that a problem with one tears down the other and triggers a retry (if
453+
// appropriate). We multiplex both RPCs over the same websocket, so we want them to share the same
454+
// fate.
455+
func (tac*tailnetAPIConnector)coordinateAndDERPMap(client proto.DRPCTailnetClient) {
456+
deferfunc() {
457+
conn:=client.DRPCConn()
458+
closeErr:=conn.Close()
459+
ifcloseErr!=nil&&
460+
!xerrors.Is(closeErr,io.EOF)&&
461+
!xerrors.Is(closeErr,context.Canceled)&&
462+
!xerrors.Is(closeErr,context.DeadlineExceeded) {
463+
tac.logger.Error(tac.ctx,"error closing DRPC connection",slog.Error(closeErr))
464+
<-conn.Closed()
465+
}
466+
}()
467+
eg,egCtx:=errgroup.WithContext(tac.ctx)
468+
eg.Go(func()error {
469+
returntac.coordinate(egCtx,client)
470+
})
471+
eg.Go(func()error {
472+
returntac.derpMap(egCtx,client)
473+
})
474+
err:=eg.Wait()
475+
iferr!=nil&&
476+
!xerrors.Is(err,io.EOF)&&
477+
!xerrors.Is(err,context.Canceled)&&
478+
!xerrors.Is(err,context.DeadlineExceeded) {
479+
tac.logger.Error(tac.ctx,"error while connected to tailnet v2+ API")
480+
}
481+
}
482+
483+
func (tac*tailnetAPIConnector)coordinate(ctx context.Context,client proto.DRPCTailnetClient)error {
484+
coord,err:=client.Coordinate(ctx)
485+
iferr!=nil {
486+
returnxerrors.Errorf("failed to connect to Coordinate RPC: %w",err)
487+
}
488+
deferfunc() {
489+
cErr:=coord.Close()
490+
ifcErr!=nil {
491+
tac.logger.Debug(ctx,"error closing Coordinate RPC",slog.Error(cErr))
492+
}
493+
}()
494+
coordination:=tailnet.NewRemoteCoordination(tac.logger,coord,tac.conn,tac.agentID)
495+
tac.logger.Debug(ctx,"serving coordinator")
496+
err=<-coordination.Error()
497+
iferr!=nil&&
498+
!xerrors.Is(err,io.EOF)&&
499+
!xerrors.Is(err,context.Canceled)&&
500+
!xerrors.Is(err,context.DeadlineExceeded) {
501+
returnxerrors.Errorf("remote coordination error: %w",err)
502+
}
503+
returnnil
504+
}
505+
506+
func (tac*tailnetAPIConnector)derpMap(ctx context.Context,client proto.DRPCTailnetClient)error {
507+
s,err:=client.StreamDERPMaps(ctx,&proto.StreamDERPMapsRequest{})
508+
iferr!=nil {
509+
returnxerrors.Errorf("failed to connect to StreamDERPMaps RPC: %w",err)
510+
}
511+
deferfunc() {
512+
cErr:=s.Close()
513+
ifcErr!=nil {
514+
tac.logger.Debug(ctx,"error closing StreamDERPMaps RPC",slog.Error(cErr))
515+
}
516+
}()
517+
for {
518+
dmp,err:=s.Recv()
519+
iferr!=nil {
520+
ifxerrors.Is(err,io.EOF)||xerrors.Is(err,context.Canceled)||xerrors.Is(err,context.DeadlineExceeded) {
521+
returnnil
522+
}
523+
returnxerrors.Errorf("error receiving DERP Map: %w",err)
524+
}
525+
tac.logger.Debug(ctx,"got new DERP Map",slog.F("derp_map",dmp))
526+
dm:=tailnet.DERPMapFromProto(dmp)
527+
tac.conn.SetDERPMap(dm)
528+
}
529+
}
530+
481531
// WatchWorkspaceAgentMetadata watches the metadata of a workspace agent.
482532
// The returned channel will be closed when the context is canceled. Exactly
483533
// one error will be sent on the error channel. The metadata channel is never closed.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp