Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(tailnet): add workspace updates support to Controller#15529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/14730-controller
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletionstailnet/controllers.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1177,12 +1177,44 @@ func (c *Controller) Run(ctx context.Context) {
continue
}
c.logger.Info(c.ctx, "obtained tailnet API v2+ client")
err = c.precheckClientsAndControllers(tailnetClients)
if err != nil {
c.logger.Critical(c.ctx, "failed precheck", slog.Error(err))
_ = tailnetClients.Closer.Close()
continue
}
retrier.Reset()
c.runControllersOnce(tailnetClients)
c.logger.Info(c.ctx, "tailnet API v2+ connection lost")
}
}()
}

// precheckClientsAndControllers checks that the set of clients we got is compatible with the
// configured controllers. These checks will fail if the dialer is incompatible with the set of
// controllers, or not configured correctly with respect to Tailnet API version.
func (c *Controller) precheckClientsAndControllers(clients ControlProtocolClients) error {
if clients.Coordinator == nil && c.CoordCtrl != nil {
return xerrors.New("missing Coordinator client; have controller")
}
if clients.DERP == nil && c.DERPCtrl != nil {
return xerrors.New("missing DERPMap client; have controller")
}
if clients.WorkspaceUpdates == nil && c.WorkspaceUpdatesCtrl != nil {
return xerrors.New("missing WorkspaceUpdates client; have controller")
}

// Telemetry and ResumeToken support is considered optional, but the clients must be present
// so that we can call the functions and get an "unimplemented" error.
if clients.ResumeToken == nil && c.ResumeTokenCtrl != nil {
return xerrors.New("missing ResumeToken client; have controller")
}
if clients.Telemetry == nil && c.TelemetryCtrl != nil {
return xerrors.New("missing Telemetry client; have controller")
}
return nil
}

// runControllersOnce uses the provided clients to call into the controllers once. It is combined
// into one function so that a problem with one tears down the other and triggers a retry (if
// appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share
Expand DownExpand Up@@ -1236,6 +1268,18 @@ func (c *Controller) runControllersOnce(clients ControlProtocolClients) {
}
}()
}
if c.WorkspaceUpdatesCtrl != nil {
wg.Add(1)
go func() {
defer wg.Done()
c.workspaceUpdates(clients.WorkspaceUpdates)
if c.ctx.Err() == nil {
// Main context is still active, but our workspace updates stream exited, due to
// some error. Close down all the rest of the clients so we'll exit and retry.
closeClients()
}
}()
}

// Refresh token is a little different, in that we don't want its controller to hold open the
// connection on its own. So we keep it separate from the other wait group, and cancel its
Expand DownExpand Up@@ -1308,6 +1352,30 @@ func (c *Controller) derpMap(client DERPClient) error {
}
}

func (c *Controller) workspaceUpdates(client WorkspaceUpdatesClient) {
defer func() {
c.logger.Debug(c.ctx, "exiting workspaceUpdates control routine")
cErr := client.Close()
if cErr != nil {
c.logger.Debug(c.ctx, "error closing WorkspaceUpdates RPC", slog.Error(cErr))
}
}()
cw := c.WorkspaceUpdatesCtrl.New(client)
select {
case <-c.ctx.Done():
c.logger.Debug(c.ctx, "workspaceUpdates: context done")
return
case err := <-cw.Wait():
c.logger.Debug(c.ctx, "workspaceUpdates: wait done")
if err != nil &&
!xerrors.Is(err, io.EOF) &&
!xerrors.Is(err, context.Canceled) &&
!xerrors.Is(err, context.DeadlineExceeded) {
c.logger.Error(c.ctx, "workspace updates stream error", slog.Error(err))
}
}
}

func (c *Controller) refreshToken(ctx context.Context, client ResumeTokenClient) {
cw := c.ResumeTokenCtrl.New(client)
go func() {
Expand Down
137 changes: 137 additions & 0 deletionstailnet/controllers_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1135,6 +1135,49 @@ func TestController_TelemetrySuccess(t *testing.T) {
require.Equal(t, []byte("test event"), testEvents[0].Id)
}

func TestController_WorkspaceUpdates(t *testing.T) {
t.Parallel()
theError := xerrors.New("a bad thing happened")
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, &slogtest.Options{
IgnoredErrorIs: append(slogtest.DefaultIgnoredErrorIs, theError),
}).Leveled(slog.LevelDebug)

fClient := newFakeWorkspaceUpdateClient(testCtx, t)
dialer := &fakeWorkspaceUpdatesDialer{
client: fClient,
}

uut := tailnet.NewController(logger.Named("ctrl"), dialer)
fCtrl := newFakeUpdatesController(ctx, t)
uut.WorkspaceUpdatesCtrl = fCtrl
uut.Run(ctx)

// it should dial and pass the client to the controller
call := testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
require.Equal(t, fClient, call.client)
fCW := newFakeCloserWaiter()
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)

// if the CloserWaiter exits...
testutil.RequireSendCtx(testCtx, t, fCW.errCh, theError)

// it should close, redial and reconnect
cCall := testutil.RequireRecvCtx(testCtx, t, fClient.close)
testutil.RequireSendCtx(testCtx, t, cCall, nil)

call = testutil.RequireRecvCtx(testCtx, t, fCtrl.calls)
require.Equal(t, fClient, call.client)
fCW = newFakeCloserWaiter()
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx, t, call.resp, fCW)

// canceling the context should close the client
cancel()
cCall = testutil.RequireRecvCtx(testCtx, t, fClient.close)
testutil.RequireSendCtx(testCtx, t, cCall, nil)
}

type fakeTailnetConn struct {
peersLostCh chan struct{}
}
Expand DownExpand Up@@ -1717,3 +1760,97 @@ func TestTunnelAllWorkspaceUpdatesController_HandleErrors(t *testing.T) {
})
}
}

type fakeWorkspaceUpdatesController struct {
ctx context.Context
t testing.TB
calls chan *newWorkspaceUpdatesCall
}

type newWorkspaceUpdatesCall struct {
client tailnet.WorkspaceUpdatesClient
resp chan<- tailnet.CloserWaiter
}

func (f fakeWorkspaceUpdatesController) New(client tailnet.WorkspaceUpdatesClient) tailnet.CloserWaiter {
resps := make(chan tailnet.CloserWaiter)
call := &newWorkspaceUpdatesCall{
client: client,
resp: resps,
}
select {
case <-f.ctx.Done():
f.t.Error("timed out waiting to send New call")
cw := newFakeCloserWaiter()
cw.errCh <- f.ctx.Err()
return cw
case f.calls <- call:
// OK
}
select {
case <-f.ctx.Done():
f.t.Error("timed out waiting to get New call response")
cw := newFakeCloserWaiter()
cw.errCh <- f.ctx.Err()
return cw
case resp := <-resps:
return resp
}
}

func newFakeUpdatesController(ctx context.Context, t *testing.T) *fakeWorkspaceUpdatesController {
return &fakeWorkspaceUpdatesController{
ctx: ctx,
t: t,
calls: make(chan *newWorkspaceUpdatesCall),
}
}

type fakeCloserWaiter struct {
closeCalls chan chan error
errCh chan error
}

func (f *fakeCloserWaiter) Close(ctx context.Context) error {
errRes := make(chan error)
select {
case <-ctx.Done():
return ctx.Err()
case f.closeCalls <- errRes:
// OK
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errRes:
return err
}
}

func (f *fakeCloserWaiter) Wait() <-chan error {
return f.errCh
}

func newFakeCloserWaiter() *fakeCloserWaiter {
return &fakeCloserWaiter{
closeCalls: make(chan chan error),
errCh: make(chan error, 1),
}
}

type fakeWorkspaceUpdatesDialer struct {
client tailnet.WorkspaceUpdatesClient
}

func (f *fakeWorkspaceUpdatesDialer) Dial(_ context.Context, _ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients, error) {
return tailnet.ControlProtocolClients{
WorkspaceUpdates: f.client,
Closer: fakeCloser{},
}, nil
}

type fakeCloser struct{}

func (fakeCloser) Close() error {
return nil
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp