Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: refactor coordination#15343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/14729-controllers
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletionsagent/agent.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1352,7 +1352,8 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
defer close(disconnected)
a.closeMutex.Unlock()

coordination := tailnet.NewRemoteCoordination(a.logger, coordinate, network, uuid.Nil)
ctrl := tailnet.NewAgentCoordinationController(a.logger, network)
coordination := ctrl.New(coordinate)

errCh := make(chan error, 1)
go func() {
Expand All@@ -1364,7 +1365,7 @@ func (a *agent) runCoordinator(ctx context.Context, conn drpc.Conn, network *tai
a.logger.Warn(ctx, "failed to close remote coordination", slog.Error(err))
}
return
case err := <-coordination.Error():
case err := <-coordination.Wait():
errCh <- err
}
}()
Expand Down
13 changes: 5 additions & 8 deletionsagent/agent_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1918,10 +1918,8 @@ func TestAgent_UpdatedDERP(t *testing.T) {
testCtx, testCtxCancel := context.WithCancel(context.Background())
t.Cleanup(testCtxCancel)
clientID := uuid.New()
coordination := tailnet.NewInMemoryCoordination(
testCtx, logger,
clientID, agentID,
coordinator, conn)
ctrl := tailnet.NewSingleDestController(logger, conn, agentID)
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(logger, clientID, agentID, coordinator))
t.Cleanup(func() {
t.Logf("closing coordination %s", name)
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
Expand DownExpand Up@@ -2409,10 +2407,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
testCtx, testCtxCancel := context.WithCancel(context.Background())
t.Cleanup(testCtxCancel)
clientID := uuid.New()
coordination := tailnet.NewInMemoryCoordination(
testCtx, logger,
clientID, metadata.AgentID,
coordinator, conn)
ctrl := tailnet.NewSingleDestController(logger, conn, metadata.AgentID)
coordination := ctrl.New(tailnet.NewInMemoryCoordinatorClient(
logger, clientID, metadata.AgentID, coordinator))
t.Cleanup(func() {
cctx, ccancel := context.WithTimeout(testCtx, testutil.WaitShort)
defer ccancel()
Expand Down
2 changes: 0 additions & 2 deletionsagent/agenttest/client.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -71,7 +71,6 @@ func NewClient(t testing.TB,
t: t,
logger: logger.Named("client"),
agentID: agentID,
coordinator: coordinator,
server: server,
fakeAgentAPI: fakeAAPI,
derpMapUpdates: derpMapUpdates,
Expand All@@ -82,7 +81,6 @@ type Client struct {
t testing.TB
logger slog.Logger
agentID uuid.UUID
coordinator tailnet.Coordinator
server *drpcserver.Server
fakeAgentAPI *FakeAgentAPI
LastWorkspaceAgent func()
Expand Down
6 changes: 4 additions & 2 deletionscodersdk/workspacesdk/connector.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -66,6 +66,7 @@ type tailnetAPIConnector struct {
clock quartz.Clock
dialOptions *websocket.DialOptions
conn tailnetConn
coordCtrl tailnet.CoordinationController
customDialFn func() (proto.DRPCTailnetClient, error)

clientMu sync.RWMutex
Expand DownExpand Up@@ -112,6 +113,7 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
// Runs a tailnetAPIConnector using the provided connection
func (tac *tailnetAPIConnector) runConnector(conn tailnetConn) {
tac.conn = conn
tac.coordCtrl = tailnet.NewSingleDestController(tac.logger, conn, tac.agentID)
tac.gracefulCtx, tac.cancelGracefulCtx = context.WithCancel(context.Background())
go tac.manageGracefulTimeout()
go func() {
Expand DownExpand Up@@ -272,7 +274,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
tac.logger.Debug(tac.ctx, "error closing Coordinate RPC", slog.Error(cErr))
}
}()
coordination :=tailnet.NewRemoteCoordination(tac.logger, coord, tac.conn, tac.agentID)
coordination := tac.coordCtrl.New(coord)
tac.logger.Debug(tac.ctx, "serving coordinator")
select {
case <-tac.ctx.Done():
Expand All@@ -281,7 +283,7 @@ func (tac *tailnetAPIConnector) coordinate(client proto.DRPCTailnetClient) {
if crdErr != nil {
tac.logger.Warn(tac.ctx, "failed to close remote coordination", slog.Error(err))
}
case err = <-coordination.Error():
case err = <-coordination.Wait():
if err != nil &&
!xerrors.Is(err, io.EOF) &&
!xerrors.Is(err, context.Canceled) &&
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp