Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit16992ee

Browse files
authored
feat(tailnet): add workspace updates support to Controller (#15529)
re:#14730Adds support in `tailnet.Controller` for WorkspaceUpdates.Also checks configured controllers against the clients returned by the dialer, so that if we connect with a dialer that doesn't support an RPC (for instance the in-memory dialer for ServerTailnet doesn't support WorkspaceUpdates), we throw an error if there is a controller expecting it.
1 parentaa0dc2d commit16992ee

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

‎tailnet/controllers.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,12 +1177,44 @@ func (c *Controller) Run(ctx context.Context) {
11771177
continue
11781178
}
11791179
c.logger.Info(c.ctx,"obtained tailnet API v2+ client")
1180+
err=c.precheckClientsAndControllers(tailnetClients)
1181+
iferr!=nil {
1182+
c.logger.Critical(c.ctx,"failed precheck",slog.Error(err))
1183+
_=tailnetClients.Closer.Close()
1184+
continue
1185+
}
1186+
retrier.Reset()
11801187
c.runControllersOnce(tailnetClients)
11811188
c.logger.Info(c.ctx,"tailnet API v2+ connection lost")
11821189
}
11831190
}()
11841191
}
11851192

1193+
// precheckClientsAndControllers checks that the set of clients we got is compatible with the
1194+
// configured controllers. These checks will fail if the dialer is incompatible with the set of
1195+
// controllers, or not configured correctly with respect to Tailnet API version.
1196+
func (c*Controller)precheckClientsAndControllers(clientsControlProtocolClients)error {
1197+
ifclients.Coordinator==nil&&c.CoordCtrl!=nil {
1198+
returnxerrors.New("missing Coordinator client; have controller")
1199+
}
1200+
ifclients.DERP==nil&&c.DERPCtrl!=nil {
1201+
returnxerrors.New("missing DERPMap client; have controller")
1202+
}
1203+
ifclients.WorkspaceUpdates==nil&&c.WorkspaceUpdatesCtrl!=nil {
1204+
returnxerrors.New("missing WorkspaceUpdates client; have controller")
1205+
}
1206+
1207+
// Telemetry and ResumeToken support is considered optional, but the clients must be present
1208+
// so that we can call the functions and get an "unimplemented" error.
1209+
ifclients.ResumeToken==nil&&c.ResumeTokenCtrl!=nil {
1210+
returnxerrors.New("missing ResumeToken client; have controller")
1211+
}
1212+
ifclients.Telemetry==nil&&c.TelemetryCtrl!=nil {
1213+
returnxerrors.New("missing Telemetry client; have controller")
1214+
}
1215+
returnnil
1216+
}
1217+
11861218
// runControllersOnce uses the provided clients to call into the controllers once. It is combined
11871219
// into one function so that a problem with one tears down the other and triggers a retry (if
11881220
// appropriate). We typically multiplex all RPCs over the same websocket, so we want them to share
@@ -1236,6 +1268,18 @@ func (c *Controller) runControllersOnce(clients ControlProtocolClients) {
12361268
}
12371269
}()
12381270
}
1271+
ifc.WorkspaceUpdatesCtrl!=nil {
1272+
wg.Add(1)
1273+
gofunc() {
1274+
deferwg.Done()
1275+
c.workspaceUpdates(clients.WorkspaceUpdates)
1276+
ifc.ctx.Err()==nil {
1277+
// Main context is still active, but our workspace updates stream exited, due to
1278+
// some error. Close down all the rest of the clients so we'll exit and retry.
1279+
closeClients()
1280+
}
1281+
}()
1282+
}
12391283

12401284
// Refresh token is a little different, in that we don't want its controller to hold open the
12411285
// connection on its own. So we keep it separate from the other wait group, and cancel its
@@ -1308,6 +1352,30 @@ func (c *Controller) derpMap(client DERPClient) error {
13081352
}
13091353
}
13101354

1355+
func (c*Controller)workspaceUpdates(clientWorkspaceUpdatesClient) {
1356+
deferfunc() {
1357+
c.logger.Debug(c.ctx,"exiting workspaceUpdates control routine")
1358+
cErr:=client.Close()
1359+
ifcErr!=nil {
1360+
c.logger.Debug(c.ctx,"error closing WorkspaceUpdates RPC",slog.Error(cErr))
1361+
}
1362+
}()
1363+
cw:=c.WorkspaceUpdatesCtrl.New(client)
1364+
select {
1365+
case<-c.ctx.Done():
1366+
c.logger.Debug(c.ctx,"workspaceUpdates: context done")
1367+
return
1368+
caseerr:=<-cw.Wait():
1369+
c.logger.Debug(c.ctx,"workspaceUpdates: wait done")
1370+
iferr!=nil&&
1371+
!xerrors.Is(err,io.EOF)&&
1372+
!xerrors.Is(err,context.Canceled)&&
1373+
!xerrors.Is(err,context.DeadlineExceeded) {
1374+
c.logger.Error(c.ctx,"workspace updates stream error",slog.Error(err))
1375+
}
1376+
}
1377+
}
1378+
13111379
func (c*Controller)refreshToken(ctx context.Context,clientResumeTokenClient) {
13121380
cw:=c.ResumeTokenCtrl.New(client)
13131381
gofunc() {

‎tailnet/controllers_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,49 @@ func TestController_TelemetrySuccess(t *testing.T) {
11351135
require.Equal(t, []byte("test event"),testEvents[0].Id)
11361136
}
11371137

1138+
funcTestController_WorkspaceUpdates(t*testing.T) {
1139+
t.Parallel()
1140+
theError:=xerrors.New("a bad thing happened")
1141+
testCtx:=testutil.Context(t,testutil.WaitShort)
1142+
ctx,cancel:=context.WithCancel(testCtx)
1143+
logger:=slogtest.Make(t,&slogtest.Options{
1144+
IgnoredErrorIs:append(slogtest.DefaultIgnoredErrorIs,theError),
1145+
}).Leveled(slog.LevelDebug)
1146+
1147+
fClient:=newFakeWorkspaceUpdateClient(testCtx,t)
1148+
dialer:=&fakeWorkspaceUpdatesDialer{
1149+
client:fClient,
1150+
}
1151+
1152+
uut:=tailnet.NewController(logger.Named("ctrl"),dialer)
1153+
fCtrl:=newFakeUpdatesController(ctx,t)
1154+
uut.WorkspaceUpdatesCtrl=fCtrl
1155+
uut.Run(ctx)
1156+
1157+
// it should dial and pass the client to the controller
1158+
call:=testutil.RequireRecvCtx(testCtx,t,fCtrl.calls)
1159+
require.Equal(t,fClient,call.client)
1160+
fCW:=newFakeCloserWaiter()
1161+
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx,t,call.resp,fCW)
1162+
1163+
// if the CloserWaiter exits...
1164+
testutil.RequireSendCtx(testCtx,t,fCW.errCh,theError)
1165+
1166+
// it should close, redial and reconnect
1167+
cCall:=testutil.RequireRecvCtx(testCtx,t,fClient.close)
1168+
testutil.RequireSendCtx(testCtx,t,cCall,nil)
1169+
1170+
call=testutil.RequireRecvCtx(testCtx,t,fCtrl.calls)
1171+
require.Equal(t,fClient,call.client)
1172+
fCW=newFakeCloserWaiter()
1173+
testutil.RequireSendCtx[tailnet.CloserWaiter](testCtx,t,call.resp,fCW)
1174+
1175+
// canceling the context should close the client
1176+
cancel()
1177+
cCall=testutil.RequireRecvCtx(testCtx,t,fClient.close)
1178+
testutil.RequireSendCtx(testCtx,t,cCall,nil)
1179+
}
1180+
11381181
typefakeTailnetConnstruct {
11391182
peersLostChchanstruct{}
11401183
}
@@ -1717,3 +1760,97 @@ func TestTunnelAllWorkspaceUpdatesController_HandleErrors(t *testing.T) {
17171760
})
17181761
}
17191762
}
1763+
1764+
typefakeWorkspaceUpdatesControllerstruct {
1765+
ctx context.Context
1766+
t testing.TB
1767+
callschan*newWorkspaceUpdatesCall
1768+
}
1769+
1770+
typenewWorkspaceUpdatesCallstruct {
1771+
client tailnet.WorkspaceUpdatesClient
1772+
respchan<- tailnet.CloserWaiter
1773+
}
1774+
1775+
func (ffakeWorkspaceUpdatesController)New(client tailnet.WorkspaceUpdatesClient) tailnet.CloserWaiter {
1776+
resps:=make(chan tailnet.CloserWaiter)
1777+
call:=&newWorkspaceUpdatesCall{
1778+
client:client,
1779+
resp:resps,
1780+
}
1781+
select {
1782+
case<-f.ctx.Done():
1783+
f.t.Error("timed out waiting to send New call")
1784+
cw:=newFakeCloserWaiter()
1785+
cw.errCh<-f.ctx.Err()
1786+
returncw
1787+
casef.calls<-call:
1788+
// OK
1789+
}
1790+
select {
1791+
case<-f.ctx.Done():
1792+
f.t.Error("timed out waiting to get New call response")
1793+
cw:=newFakeCloserWaiter()
1794+
cw.errCh<-f.ctx.Err()
1795+
returncw
1796+
caseresp:=<-resps:
1797+
returnresp
1798+
}
1799+
}
1800+
1801+
funcnewFakeUpdatesController(ctx context.Context,t*testing.T)*fakeWorkspaceUpdatesController {
1802+
return&fakeWorkspaceUpdatesController{
1803+
ctx:ctx,
1804+
t:t,
1805+
calls:make(chan*newWorkspaceUpdatesCall),
1806+
}
1807+
}
1808+
1809+
typefakeCloserWaiterstruct {
1810+
closeCallschanchanerror
1811+
errChchanerror
1812+
}
1813+
1814+
func (f*fakeCloserWaiter)Close(ctx context.Context)error {
1815+
errRes:=make(chanerror)
1816+
select {
1817+
case<-ctx.Done():
1818+
returnctx.Err()
1819+
casef.closeCalls<-errRes:
1820+
// OK
1821+
}
1822+
select {
1823+
case<-ctx.Done():
1824+
returnctx.Err()
1825+
caseerr:=<-errRes:
1826+
returnerr
1827+
}
1828+
}
1829+
1830+
func (f*fakeCloserWaiter)Wait()<-chanerror {
1831+
returnf.errCh
1832+
}
1833+
1834+
funcnewFakeCloserWaiter()*fakeCloserWaiter {
1835+
return&fakeCloserWaiter{
1836+
closeCalls:make(chanchanerror),
1837+
errCh:make(chanerror,1),
1838+
}
1839+
}
1840+
1841+
typefakeWorkspaceUpdatesDialerstruct {
1842+
client tailnet.WorkspaceUpdatesClient
1843+
}
1844+
1845+
func (f*fakeWorkspaceUpdatesDialer)Dial(_ context.Context,_ tailnet.ResumeTokenController) (tailnet.ControlProtocolClients,error) {
1846+
return tailnet.ControlProtocolClients{
1847+
WorkspaceUpdates:f.client,
1848+
Closer:fakeCloser{},
1849+
},nil
1850+
}
1851+
1852+
typefakeCloserstruct{}
1853+
1854+
func (fakeCloser)Close()error {
1855+
returnnil
1856+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp