|
4 | 4 | "context"
|
5 | 5 | "fmt"
|
6 | 6 | "io"
|
| 7 | +"maps" |
7 | 8 | "math"
|
8 | 9 | "strings"
|
9 | 10 | "sync"
|
@@ -287,34 +288,139 @@ func (c *BasicCoordination) respLoop() {
|
287 | 288 | }
|
288 | 289 | }
|
289 | 290 |
|
290 |
| -typesingleDestControllerstruct { |
| 291 | +typeTunnelSrcCoordControllerstruct { |
291 | 292 | *BasicCoordinationController
|
292 |
| -dest uuid.UUID |
| 293 | + |
| 294 | +mu sync.Mutex |
| 295 | +destsmap[uuid.UUID]struct{} |
| 296 | +coordination*BasicCoordination |
293 | 297 | }
|
294 | 298 |
|
295 |
| -// NewSingleDestController creates a CoordinationController for Coder clients that connect to a |
296 |
| -// single tunnel destination, e.g. `coder ssh`, which connects to a single workspace Agent. |
297 |
| -funcNewSingleDestController(logger slog.Logger,coordinateeCoordinatee,dest uuid.UUID)CoordinationController { |
298 |
| -coordinatee.SetTunnelDestination(dest) |
299 |
| -return&singleDestController{ |
| 299 | +// NewTunnelSrcCoordController creates a CoordinationController for peers that are exclusively tunnel |
| 300 | +// sources (that is, they create tunnel --- Coder clients not workspaces). |
| 301 | +funcNewTunnelSrcCoordController(logger slog.Logger,coordinateeCoordinatee)*TunnelSrcCoordController { |
| 302 | +return&TunnelSrcCoordController{ |
300 | 303 | BasicCoordinationController:&BasicCoordinationController{
|
301 | 304 | Logger:logger,
|
302 | 305 | Coordinatee:coordinatee,
|
303 | 306 | SendAcks:false,
|
304 | 307 | },
|
305 |
| -dest:dest, |
| 308 | +dests:make(map[uuid.UUID]struct{}), |
306 | 309 | }
|
307 | 310 | }
|
308 | 311 |
|
309 |
| -func (c*singleDestController)New(clientCoordinatorClient)CloserWaiter { |
| 312 | +func (c*TunnelSrcCoordController)New(clientCoordinatorClient)CloserWaiter { |
| 313 | +c.mu.Lock() |
| 314 | +deferc.mu.Unlock() |
310 | 315 | b:=c.BasicCoordinationController.NewCoordination(client)
|
311 |
| -err:=client.Send(&proto.CoordinateRequest{AddTunnel:&proto.CoordinateRequest_Tunnel{Id:c.dest[:]}}) |
312 |
| -iferr!=nil { |
313 |
| -b.SendErr(err) |
| 316 | +c.coordination=b |
| 317 | +// resync destinations on reconnect |
| 318 | +fordest:=rangec.dests { |
| 319 | +err:=client.Send(&proto.CoordinateRequest{ |
| 320 | +AddTunnel:&proto.CoordinateRequest_Tunnel{Id:dest[:]}, |
| 321 | +}) |
| 322 | +iferr!=nil { |
| 323 | +b.SendErr(err) |
| 324 | +c.coordination=nil |
| 325 | +cErr:=client.Close() |
| 326 | +ifcErr!=nil { |
| 327 | +c.Logger.Debug(context.Background(),"failed to close coordinator client after add tunnel failure",slog.Error(cErr)) |
| 328 | +} |
| 329 | +break |
| 330 | +} |
314 | 331 | }
|
315 | 332 | returnb
|
316 | 333 | }
|
317 | 334 |
|
| 335 | +func (c*TunnelSrcCoordController)AddDestination(dest uuid.UUID) { |
| 336 | +c.mu.Lock() |
| 337 | +deferc.mu.Unlock() |
| 338 | +c.Coordinatee.SetTunnelDestination(dest)// this prepares us for an ack |
| 339 | +c.dests[dest]=struct{}{} |
| 340 | +ifc.coordination==nil { |
| 341 | +return |
| 342 | +} |
| 343 | +err:=c.coordination.Client.Send( |
| 344 | +&proto.CoordinateRequest{AddTunnel:&proto.CoordinateRequest_Tunnel{Id:dest[:]}}) |
| 345 | +iferr!=nil { |
| 346 | +c.coordination.SendErr(err) |
| 347 | +cErr:=c.coordination.Client.Close()// close the client so we don't gracefully disconnect |
| 348 | +ifcErr!=nil { |
| 349 | +c.Logger.Debug(context.Background(), |
| 350 | +"failed to close coordinator client after add tunnel failure", |
| 351 | +slog.Error(cErr)) |
| 352 | +} |
| 353 | +c.coordination=nil |
| 354 | +} |
| 355 | +} |
| 356 | + |
| 357 | +func (c*TunnelSrcCoordController)RemoveDestination(dest uuid.UUID) { |
| 358 | +c.mu.Lock() |
| 359 | +deferc.mu.Unlock() |
| 360 | +delete(c.dests,dest) |
| 361 | +ifc.coordination==nil { |
| 362 | +return |
| 363 | +} |
| 364 | +err:=c.coordination.Client.Send( |
| 365 | +&proto.CoordinateRequest{RemoveTunnel:&proto.CoordinateRequest_Tunnel{Id:dest[:]}}) |
| 366 | +iferr!=nil { |
| 367 | +c.coordination.SendErr(err) |
| 368 | +cErr:=c.coordination.Client.Close()// close the client so we don't gracefully disconnect |
| 369 | +ifcErr!=nil { |
| 370 | +c.Logger.Debug(context.Background(), |
| 371 | +"failed to close coordinator client after remove tunnel failure", |
| 372 | +slog.Error(cErr)) |
| 373 | +} |
| 374 | +c.coordination=nil |
| 375 | +} |
| 376 | +} |
| 377 | + |
| 378 | +func (c*TunnelSrcCoordController)SyncDestinations(destinations []uuid.UUID) { |
| 379 | +c.mu.Lock() |
| 380 | +deferc.mu.Unlock() |
| 381 | +toAdd:=make(map[uuid.UUID]struct{}) |
| 382 | +toRemove:=maps.Clone(c.dests) |
| 383 | +all:=make(map[uuid.UUID]struct{}) |
| 384 | +for_,dest:=rangedestinations { |
| 385 | +all[dest]=struct{}{} |
| 386 | +delete(toRemove,dest) |
| 387 | +if_,ok:=c.dests[dest];!ok { |
| 388 | +toAdd[dest]=struct{}{} |
| 389 | +} |
| 390 | +} |
| 391 | +c.dests=all |
| 392 | +ifc.coordination==nil { |
| 393 | +return |
| 394 | +} |
| 395 | +varerrerror |
| 396 | +deferfunc() { |
| 397 | +iferr!=nil { |
| 398 | +c.coordination.SendErr(err) |
| 399 | +cErr:=c.coordination.Client.Close()// close the client so we don't gracefully disconnect |
| 400 | +ifcErr!=nil { |
| 401 | +c.Logger.Debug(context.Background(), |
| 402 | +"failed to close coordinator client during sync destinations", |
| 403 | +slog.Error(cErr)) |
| 404 | +} |
| 405 | +c.coordination=nil |
| 406 | +} |
| 407 | +}() |
| 408 | +fordest:=rangetoAdd { |
| 409 | +err=c.coordination.Client.Send( |
| 410 | +&proto.CoordinateRequest{AddTunnel:&proto.CoordinateRequest_Tunnel{Id:dest[:]}}) |
| 411 | +iferr!=nil { |
| 412 | +return |
| 413 | +} |
| 414 | +} |
| 415 | +fordest:=rangetoRemove { |
| 416 | +err=c.coordination.Client.Send( |
| 417 | +&proto.CoordinateRequest{RemoveTunnel:&proto.CoordinateRequest_Tunnel{Id:dest[:]}}) |
| 418 | +iferr!=nil { |
| 419 | +return |
| 420 | +} |
| 421 | +} |
| 422 | +} |
| 423 | + |
318 | 424 | // NewAgentCoordinationController creates a CoordinationController for Coder Agents, which never
|
319 | 425 | // create tunnels and always send ReadyToHandshake acknowledgements.
|
320 | 426 | funcNewAgentCoordinationController(logger slog.Logger,coordinateeCoordinatee)CoordinationController {
|
|