Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement DERP streaming on tailnet Client API#11302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/10532-derp-streaming
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletioncoderd/coderd.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -479,7 +479,11 @@ func New(options *Options) *API {
}
}
api.TailnetClientService, err = tailnet.NewClientService(
api.Logger.Named("tailnetclient"), &api.TailnetCoordinator)
api.Logger.Named("tailnetclient"),
&api.TailnetCoordinator,
api.Options.DERPMapUpdateFrequency,
api.DERPMap,
)
if err != nil {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}
Expand Down
61 changes: 46 additions & 15 deletionstailnet/service.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,11 +7,13 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/hashicorp/yamux"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"tailscale.com/tailcfg"

"cdr.dev/slog"
"github.com/coder/coder/v2/tailnet/proto"
Expand DownExpand Up@@ -92,10 +94,22 @@ type ClientService struct {

// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
// loaded on each processed connection.
func NewClientService(logger slog.Logger, coordPtr *atomic.Pointer[Coordinator]) (*ClientService, error) {
func NewClientService(
logger slog.Logger,
coordPtr *atomic.Pointer[Coordinator],
derpMapUpdateFrequency time.Duration,
derpMapFn func() *tailcfg.DERPMap,
) (
*ClientService, error,
) {
s := &ClientService{logger: logger, coordPtr: coordPtr}
mux := drpcmux.New()
drpcService := NewDRPCService(logger, coordPtr)
drpcService := &DRPCService{
CoordPtr: coordPtr,
Logger: logger,
DerpMapUpdateFrequency: derpMapUpdateFrequency,
DerpMapFn: derpMapFn,
}
err := proto.DRPCRegisterClient(mux, drpcService)
if err != nil {
return nil, xerrors.Errorf("register DRPC service: %w", err)
Expand DownExpand Up@@ -145,20 +159,37 @@ func (s *ClientService) ServeClient(ctx context.Context, version string, conn ne

// DRPCService is the dRPC-based, version 2.x of the tailnet API and implements proto.DRPCClientServer
type DRPCService struct {
coordPtr *atomic.Pointer[Coordinator]
logger slog.Logger
CoordPtr *atomic.Pointer[Coordinator]
Logger slog.Logger
DerpMapUpdateFrequency time.Duration
DerpMapFn func() *tailcfg.DERPMap
}

func NewDRPCService(logger slog.Logger, coordPtr *atomic.Pointer[Coordinator]) *DRPCService {
return &DRPCService{
coordPtr: coordPtr,
logger: logger,
}
}
func (s *DRPCService) StreamDERPMaps(_ *proto.StreamDERPMapsRequest, stream proto.DRPCClient_StreamDERPMapsStream) error {
defer stream.Close()

ticker := time.NewTicker(s.DerpMapUpdateFrequency)
defer ticker.Stop()

func (*DRPCService) StreamDERPMaps(*proto.StreamDERPMapsRequest, proto.DRPCClient_StreamDERPMapsStream) error {
// TODO integrate with Dean's PR implementation
return xerrors.New("unimplemented")
var lastDERPMap *tailcfg.DERPMap
for {
derpMap := s.DerpMapFn()
if lastDERPMap == nil || !CompareDERPMaps(lastDERPMap, derpMap) {
protoDERPMap := DERPMapToProto(derpMap)
err := stream.Send(protoDERPMap)
if err != nil {
return xerrors.Errorf("send derp map: %w", err)
}
lastDERPMap = derpMap
}

ticker.Reset(s.DerpMapUpdateFrequency)
select {
case <-stream.Context().Done():
return nil
case <-ticker.C:
}
}
}

func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailnetStream) error {
Expand All@@ -168,9 +199,9 @@ func (s *DRPCService) CoordinateTailnet(stream proto.DRPCClient_CoordinateTailne
_ = stream.Close()
return xerrors.New("no Stream ID")
}
logger := s.logger.With(slog.F("peer_id", streamID), slog.F("name", streamID.Name))
logger := s.Logger.With(slog.F("peer_id", streamID), slog.F("name", streamID.Name))
logger.Debug(ctx, "starting tailnet Coordinate")
coord := *(s.coordPtr.Load())
coord := *(s.CoordPtr.Load())
reqs, resps := coord.Coordinate(ctx, streamID.ID, streamID.Name, streamID.Auth)
c := communicator{
logger: logger,
Expand Down
24 changes: 21 additions & 3 deletionstailnet/service_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -8,8 +8,10 @@ import (
"net/http"
"sync/atomic"
"testing"
"time"

"golang.org/x/xerrors"
"tailscale.com/tailcfg"

"github.com/google/uuid"

Expand DownExpand Up@@ -94,7 +96,11 @@ func TestClientService_ServeClient_V2(t *testing.T) {
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut, err := tailnet.NewClientService(logger, &coordPtr)
derpMap := &tailcfg.DERPMap{Regions: map[int]*tailcfg.DERPRegion{999: {RegionCode: "test"}}}
uut, err := tailnet.NewClientService(
logger, &coordPtr,
time.Millisecond, func() *tailcfg.DERPMap { return derpMap },
)
require.NoError(t, err)

ctx := testutil.Context(t, testutil.WaitShort)
Expand All@@ -112,6 +118,8 @@ func TestClientService_ServeClient_V2(t *testing.T) {

client, err := tailnet.NewDRPCClient(c)
require.NoError(t, err)

// Coordinate
stream, err := client.CoordinateTailnet(ctx)
require.NoError(t, err)
defer stream.Close()
Expand DownExpand Up@@ -145,7 +153,17 @@ func TestClientService_ServeClient_V2(t *testing.T) {
err = stream.Close()
require.NoError(t, err)

// stream ^^ is just one RPC; we need to close the Conn to end the session.
// DERP Map
dms, err := client.StreamDERPMaps(ctx, &proto.StreamDERPMapsRequest{})
require.NoError(t, err)

gotDermMap, err := dms.Recv()
require.NoError(t, err)
require.Equal(t, "test", gotDermMap.GetRegions()[999].GetRegionCode())
err = dms.Close()
require.NoError(t, err)

// RPCs closed; we need to close the Conn to end the session.
err = c.Close()
require.NoError(t, err)
err = testutil.RequireRecvCtx(ctx, t, errCh)
Expand All@@ -159,7 +177,7 @@ func TestClientService_ServeClient_V1(t *testing.T) {
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut, err := tailnet.NewClientService(logger, &coordPtr)
uut, err := tailnet.NewClientService(logger, &coordPtr, 0, nil)
require.NoError(t, err)

ctx := testutil.Context(t, testutil.WaitShort)
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp