Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit05b6c81

Browse files
committed
feat: changes codersdk to use tailnet v2 for DERPMap updates
1 parentd6ba0df commit05b6c81

File tree

1 file changed

+188
-136
lines changed

1 file changed

+188
-136
lines changed

‎codersdk/workspaceagents.go

Lines changed: 188 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"strings"
1515
"time"
1616

17+
"golang.org/x/sync/errgroup"
18+
1719
"github.com/google/uuid"
1820
"golang.org/x/xerrors"
1921
"nhooyr.io/websocket"
@@ -317,142 +319,28 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
317319
q:=coordinateURL.Query()
318320
q.Add("version",proto.CurrentVersion.String())
319321
coordinateURL.RawQuery=q.Encode()
320-
closedCoordinator:=make(chanstruct{})
321-
// Must only ever be used once, send error OR close to avoid
322-
// reassignment race. Buffered so we don't hang in goroutine.
323-
firstCoordinator:=make(chanerror,1)
324-
gofunc() {
325-
deferclose(closedCoordinator)
326-
isFirst:=true
327-
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(ctx); {
328-
options.Logger.Debug(ctx,"connecting")
329-
// nolint:bodyclose
330-
ws,res,err:=websocket.Dial(ctx,coordinateURL.String(),&websocket.DialOptions{
331-
HTTPClient:c.HTTPClient,
332-
HTTPHeader:headers,
333-
// Need to disable compression to avoid a data-race.
334-
CompressionMode:websocket.CompressionDisabled,
335-
})
336-
ifisFirst {
337-
ifres!=nil&&res.StatusCode==http.StatusConflict {
338-
firstCoordinator<-ReadBodyAsError(res)
339-
return
340-
}
341-
isFirst=false
342-
close(firstCoordinator)
343-
}
344-
iferr!=nil {
345-
iferrors.Is(err,context.Canceled) {
346-
return
347-
}
348-
options.Logger.Debug(ctx,"failed to dial",slog.Error(err))
349-
continue
350-
}
351-
client,err:=tailnet.NewDRPCClient(websocket.NetConn(ctx,ws,websocket.MessageBinary))
352-
iferr!=nil {
353-
options.Logger.Debug(ctx,"failed to create DRPCClient",slog.Error(err))
354-
_=ws.Close(websocket.StatusInternalError,"")
355-
continue
356-
}
357-
coordinate,err:=client.Coordinate(ctx)
358-
iferr!=nil {
359-
options.Logger.Debug(ctx,"failed to reach the Coordinate endpoint",slog.Error(err))
360-
_=ws.Close(websocket.StatusInternalError,"")
361-
continue
362-
}
363-
364-
coordination:=tailnet.NewRemoteCoordination(options.Logger,coordinate,conn,agentID)
365-
options.Logger.Debug(ctx,"serving coordinator")
366-
err=<-coordination.Error()
367-
iferrors.Is(err,context.Canceled) {
368-
_=ws.Close(websocket.StatusGoingAway,"")
369-
return
370-
}
371-
iferr!=nil {
372-
options.Logger.Debug(ctx,"error serving coordinator",slog.Error(err))
373-
_=ws.Close(websocket.StatusGoingAway,"")
374-
continue
375-
}
376-
_=ws.Close(websocket.StatusGoingAway,"")
377-
}
378-
}()
379322

380-
derpMapURL,err:=c.URL.Parse("/api/v2/derp-map")
381-
iferr!=nil {
382-
returnnil,xerrors.Errorf("parse url: %w",err)
383-
}
384-
closedDerpMap:=make(chanstruct{})
385-
// Must only ever be used once, send error OR close to avoid
386-
// reassignment race. Buffered so we don't hang in goroutine.
387-
firstDerpMap:=make(chanerror,1)
388-
gofunc() {
389-
deferclose(closedDerpMap)
390-
isFirst:=true
391-
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(ctx); {
392-
options.Logger.Debug(ctx,"connecting to server for derp map updates")
393-
// nolint:bodyclose
394-
ws,res,err:=websocket.Dial(ctx,derpMapURL.String(),&websocket.DialOptions{
395-
HTTPClient:c.HTTPClient,
396-
HTTPHeader:headers,
397-
// Need to disable compression to avoid a data-race.
398-
CompressionMode:websocket.CompressionDisabled,
399-
})
400-
ifisFirst {
401-
ifres!=nil&&res.StatusCode==http.StatusConflict {
402-
firstDerpMap<-ReadBodyAsError(res)
403-
return
404-
}
405-
isFirst=false
406-
close(firstDerpMap)
407-
}
408-
iferr!=nil {
409-
iferrors.Is(err,context.Canceled) {
410-
return
411-
}
412-
options.Logger.Debug(ctx,"failed to dial",slog.Error(err))
413-
continue
414-
}
415-
416-
var (
417-
nconn=websocket.NetConn(ctx,ws,websocket.MessageBinary)
418-
dec=json.NewDecoder(nconn)
419-
)
420-
for {
421-
varderpMap tailcfg.DERPMap
422-
err:=dec.Decode(&derpMap)
423-
ifxerrors.Is(err,context.Canceled) {
424-
_=ws.Close(websocket.StatusGoingAway,"")
425-
return
426-
}
427-
iferr!=nil {
428-
options.Logger.Debug(ctx,"failed to decode derp map",slog.Error(err))
429-
_=ws.Close(websocket.StatusGoingAway,"")
430-
return
431-
}
432-
433-
if!tailnet.CompareDERPMaps(conn.DERPMap(),&derpMap) {
434-
options.Logger.Debug(ctx,"updating derp map due to detected changes")
435-
conn.SetDERPMap(&derpMap)
436-
}
437-
}
438-
}
439-
}()
440-
441-
forfirstCoordinator!=nil||firstDerpMap!=nil {
442-
select {
443-
case<-dialCtx.Done():
444-
returnnil,xerrors.Errorf("timed out waiting for coordinator and derp map: %w",dialCtx.Err())
445-
caseerr=<-firstCoordinator:
446-
iferr!=nil {
447-
returnnil,xerrors.Errorf("start coordinator: %w",err)
448-
}
449-
firstCoordinator=nil
450-
caseerr=<-firstDerpMap:
451-
iferr!=nil {
452-
returnnil,xerrors.Errorf("receive derp map: %w",err)
453-
}
454-
firstDerpMap=nil
323+
connector:=runTailnetAPIConnector(ctx,options.Logger,
324+
agentID,coordinateURL.String(),
325+
&websocket.DialOptions{
326+
HTTPClient:c.HTTPClient,
327+
HTTPHeader:headers,
328+
// Need to disable compression to avoid a data-race.
329+
CompressionMode:websocket.CompressionDisabled,
330+
},
331+
conn,
332+
)
333+
options.Logger.Debug(ctx,"running tailnet API v2+ connector")
334+
335+
select {
336+
case<-dialCtx.Done():
337+
returnnil,xerrors.Errorf("timed out waiting for coordinator and derp map: %w",dialCtx.Err())
338+
caseerr=<-connector.connected:
339+
iferr!=nil {
340+
options.Logger.Error(ctx,"failed to connect to tailnet v2+ API",slog.Error(err))
341+
returnnil,xerrors.Errorf("start connector: %w",err)
455342
}
343+
options.Logger.Debug(ctx,"connected to tailnet v2+ API")
456344
}
457345

458346
agentConn=NewWorkspaceAgentConn(conn,WorkspaceAgentConnOptions{
@@ -464,8 +352,7 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
464352
AgentIP:WorkspaceAgentIP,
465353
CloseFunc:func()error {
466354
cancel()
467-
<-closedCoordinator
468-
<-closedDerpMap
355+
<-connector.closed
469356
returnconn.Close()
470357
},
471358
})
@@ -478,6 +365,171 @@ func (c *Client) DialWorkspaceAgent(dialCtx context.Context, agentID uuid.UUID,
478365
returnagentConn,nil
479366
}
480367

368+
// tailnetAPIConnector dials the tailnet API (v2+) and then uses the API with a tailnet.Conn to
369+
//
370+
// 1) run the Coordinate API and pass node information back and forth
371+
// 2) stream DERPMap updates and program the Conn
372+
//
373+
// These functions share the same websocket, and so are combined here so that if we hit a problem
374+
// we tear the whole thing down and start over with a new websocket.
375+
//
376+
// @typescript-ignore tailnetAPIConnector
377+
typetailnetAPIConnectorstruct {
378+
ctx context.Context
379+
logger slog.Logger
380+
381+
agentID uuid.UUID
382+
coordinateURLstring
383+
dialOptions*websocket.DialOptions
384+
conn*tailnet.Conn
385+
386+
connectedchanerror
387+
isFirstbool
388+
closedchanstruct{}
389+
}
390+
391+
// runTailnetAPIConnector creates and runs a tailnetAPIConnector
392+
funcrunTailnetAPIConnector(
393+
ctx context.Context,logger slog.Logger,
394+
agentID uuid.UUID,coordinateURLstring,dialOptions*websocket.DialOptions,
395+
conn*tailnet.Conn,
396+
)*tailnetAPIConnector {
397+
tac:=&tailnetAPIConnector{
398+
ctx:ctx,
399+
logger:logger,
400+
agentID:agentID,
401+
coordinateURL:coordinateURL,
402+
dialOptions:dialOptions,
403+
conn:conn,
404+
connected:make(chanerror,1),
405+
closed:make(chanstruct{}),
406+
}
407+
gotac.run()
408+
returntac
409+
}
410+
411+
func (tac*tailnetAPIConnector)run() {
412+
tac.isFirst=true
413+
deferclose(tac.closed)
414+
forretrier:=retry.New(50*time.Millisecond,10*time.Second);retrier.Wait(tac.ctx); {
415+
tailnetClient,err:=tac.dial()
416+
iferr!=nil {
417+
continue
418+
}
419+
tac.logger.Debug(tac.ctx,"obtained tailnet API v2+ client")
420+
tac.coordinateAndDERPMap(tailnetClient)
421+
tac.logger.Debug(tac.ctx,"tailnet API v2+ connection lost")
422+
}
423+
}
424+
425+
func (tac*tailnetAPIConnector)dial() (proto.DRPCTailnetClient,error) {
426+
tac.logger.Debug(tac.ctx,"dialing Coder tailnet v2+ API")
427+
// nolint:bodyclose
428+
ws,res,err:=websocket.Dial(tac.ctx,tac.coordinateURL,tac.dialOptions)
429+
iftac.isFirst {
430+
ifres!=nil&&res.StatusCode==http.StatusConflict {
431+
err=ReadBodyAsError(res)
432+
tac.connected<-err
433+
returnnil,err
434+
}
435+
tac.isFirst=false
436+
close(tac.connected)
437+
}
438+
iferr!=nil {
439+
if!errors.Is(err,context.Canceled) {
440+
tac.logger.Error(tac.ctx,"failed to dial tailnet v2+ API",slog.Error(err))
441+
}
442+
returnnil,err
443+
}
444+
client,err:=tailnet.NewDRPCClient(websocket.NetConn(tac.ctx,ws,websocket.MessageBinary))
445+
iferr!=nil {
446+
tac.logger.Debug(tac.ctx,"failed to create DRPCClient",slog.Error(err))
447+
_=ws.Close(websocket.StatusInternalError,"")
448+
returnnil,err
449+
}
450+
returnclient,err
451+
}
452+
453+
// coordinateAndDERPMap uses the provided client to coordinate and stream DERP Maps. It is combined
454+
// into one function so that a problem with one tears down the other and triggers a retry (if
455+
// appropriate). We multiplex both RPCs over the same websocket, so we want them to share the same
456+
// fate.
457+
func (tac*tailnetAPIConnector)coordinateAndDERPMap(client proto.DRPCTailnetClient) {
458+
deferfunc() {
459+
conn:=client.DRPCConn()
460+
closeErr:=conn.Close()
461+
ifcloseErr!=nil&&
462+
!xerrors.Is(closeErr,io.EOF)&&
463+
!xerrors.Is(closeErr,context.Canceled)&&
464+
!xerrors.Is(closeErr,context.DeadlineExceeded) {
465+
tac.logger.Error(tac.ctx,"error closing DRPC connection",slog.Error(closeErr))
466+
<-conn.Closed()
467+
}
468+
}()
469+
eg,egCtx:=errgroup.WithContext(tac.ctx)
470+
eg.Go(func()error {
471+
returntac.coordinate(egCtx,client)
472+
})
473+
eg.Go(func()error {
474+
returntac.derpMap(egCtx,client)
475+
})
476+
err:=eg.Wait()
477+
iferr!=nil&&
478+
!xerrors.Is(err,io.EOF)&&
479+
!xerrors.Is(err,context.Canceled)&&
480+
!xerrors.Is(err,context.DeadlineExceeded) {
481+
tac.logger.Error(tac.ctx,"error while connected to tailnet v2+ API")
482+
}
483+
}
484+
485+
func (tac*tailnetAPIConnector)coordinate(ctx context.Context,client proto.DRPCTailnetClient)error {
486+
coord,err:=client.Coordinate(ctx)
487+
iferr!=nil {
488+
returnxerrors.Errorf("failed to connect to Coordinate RPC: %w",err)
489+
}
490+
deferfunc() {
491+
cErr:=coord.Close()
492+
ifcErr!=nil {
493+
tac.logger.Debug(ctx,"error closing Coordinate RPC",slog.Error(cErr))
494+
}
495+
}()
496+
coordination:=tailnet.NewRemoteCoordination(tac.logger,coord,tac.conn,tac.agentID)
497+
tac.logger.Debug(ctx,"serving coordinator")
498+
err=<-coordination.Error()
499+
iferr!=nil&&
500+
!xerrors.Is(err,io.EOF)&&
501+
!xerrors.Is(err,context.Canceled)&&
502+
!xerrors.Is(err,context.DeadlineExceeded) {
503+
returnxerrors.Errorf("remote coordination error: %w",err)
504+
}
505+
returnnil
506+
}
507+
508+
func (tac*tailnetAPIConnector)derpMap(ctx context.Context,client proto.DRPCTailnetClient)error {
509+
s,err:=client.StreamDERPMaps(ctx,&proto.StreamDERPMapsRequest{})
510+
iferr!=nil {
511+
returnxerrors.Errorf("failed to connect to StreamDERPMaps RPC: %w",err)
512+
}
513+
deferfunc() {
514+
cErr:=s.Close()
515+
ifcErr!=nil {
516+
tac.logger.Debug(ctx,"error closing StreamDERPMaps RPC",slog.Error(cErr))
517+
}
518+
}()
519+
for {
520+
dmp,err:=s.Recv()
521+
iferr!=nil {
522+
ifxerrors.Is(err,io.EOF)||xerrors.Is(err,context.Canceled)||xerrors.Is(err,context.DeadlineExceeded) {
523+
returnnil
524+
}
525+
returnxerrors.Errorf("error receiving DERP Map: %w",err)
526+
}
527+
tac.logger.Debug(ctx,"got new DERP Map",slog.F("derp_map",dmp))
528+
dm:=tailnet.DERPMapFromProto(dmp)
529+
tac.conn.SetDERPMap(dm)
530+
}
531+
}
532+
481533
// WatchWorkspaceAgentMetadata watches the metadata of a workspace agent.
482534
// The returned channel will be closed when the context is canceled. Exactly
483535
// one error will be sent on the error channel. The metadata channel is never closed.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp