Expand Up @@ -37,15 +37,18 @@ import ( "github.com/coder/coder/v2/agent/agentscripts" "github.com/coder/coder/v2/agent/agentssh" "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/agent/proto/resourcesmonitor" "github.com/coder/coder/v2/agent/reconnectingpty" "github.com/coder/coder/v2/buildinfo" "github.com/coder/coder/v2/cli/clistat" "github.com/coder/coder/v2/cli/gitauth" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/tailnet" tailnetproto "github.com/coder/coder/v2/tailnet/proto" "github.com/coder/quartz" "github.com/coder/retry" ) Expand Down Expand Up @@ -85,8 +88,8 @@ type Options struct { } type Client interface { ConnectRPC23 (ctx context.Context) (proto.DRPCAgentClient23 , tailnetproto.DRPCTailnetClient23 , error, ConnectRPC24 (ctx context.Context) (proto.DRPCAgentClient24 , tailnetproto.DRPCTailnetClient24 , error, ) RewriteDERPMap(derpMap *tailcfg.DERPMap) } Expand Down Expand Up @@ -399,7 +402,7 @@ func (t *trySingleflight) Do(key string, fn func()) { fn() } func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { tickerDone := make(chan struct{}) collectDone := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) Expand Down Expand Up @@ -615,7 +618,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23 // reportLifecycle reports the current lifecycle state once. All state // changes are reported in order. func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { for { select { case <-a.lifecycleUpdate: Expand Down Expand Up @@ -697,7 +700,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) { // fetchServiceBannerLoop fetches the service banner on an interval. It will // not be fetched immediately; the expectation is that it is primed elsewhere // (and must be done before the session actually starts). func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { ticker := time.NewTicker(a.announcementBannersRefreshInterval) defer ticker.Stop() for { Expand Down Expand Up @@ -733,7 +736,7 @@ func (a *agent) run() (retErr error) { a.sessionToken.Store(&sessionToken) // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs aAPI, tAPI, err := a.client.ConnectRPC23 (a.hardCtx) aAPI, tAPI, err := a.client.ConnectRPC24 (a.hardCtx) if err != nil { return err } Expand All @@ -750,7 +753,7 @@ func (a *agent) run() (retErr error) { connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI) connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{}) if err != nil { return xerrors.Errorf("fetch service banner: %w", err) Expand All @@ -767,7 +770,7 @@ func (a *agent) run() (retErr error) { // sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by // shutdown scripts. connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain, func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { err := a.logSender.SendLoop(ctx, aAPI) if xerrors.Is(err, agentsdk.LogLimitExceededError) { // we don't want this error to tear down the API connection and propagate to the Expand All @@ -785,6 +788,25 @@ func (a *agent) run() (retErr error) { // metadata reporting can cease as soon as we start gracefully shutting down connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata) // resources monitor can cease as soon as we start gracefully shutting down. connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { logger := a.logger.Named("resources_monitor") clk := quartz.NewReal() config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{}) if err != nil { return xerrors.Errorf("failed to get resources monitoring configuration: %w", err) } statfetcher, err := clistat.New() if err != nil { return xerrors.Errorf("failed to create resources fetcher: %w", err) } resourcesFetcher := resourcesmonitor.NewFetcher(statfetcher) resourcesmonitor := resourcesmonitor.NewResourcesMonitor(logger, clk, config, resourcesFetcher, aAPI) return resourcesmonitor.Start(ctx) }) // channels to sync goroutines below // handle manifest // | Expand All @@ -807,7 +829,7 @@ func (a *agent) run() (retErr error) { connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK)) connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { if err := manifestOK.wait(ctx); err != nil { return xerrors.Errorf("no manifest: %w", err) } Expand All @@ -822,7 +844,7 @@ func (a *agent) run() (retErr error) { a.createOrUpdateNetwork(manifestOK, networkOK)) connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop, func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23 ) error { func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24 ) error { if err := networkOK.wait(ctx); err != nil { return xerrors.Errorf("no network: %w", err) } Expand All @@ -831,7 +853,7 @@ func (a *agent) run() (retErr error) { ) connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop, func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23 ) error { func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24 ) error { if err := networkOK.wait(ctx); err != nil { return xerrors.Errorf("no network: %w", err) } Expand All @@ -840,7 +862,7 @@ func (a *agent) run() (retErr error) { connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop) connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { if err := networkOK.wait(ctx); err != nil { return xerrors.Errorf("no network: %w", err) } Expand All @@ -851,8 +873,8 @@ func (a *agent) run() (retErr error) { } // handleManifest returns a function that fetches and processes the manifest func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { return func(ctx context.Context, aAPI proto.DRPCAgentClient23 ) error { func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { return func(ctx context.Context, aAPI proto.DRPCAgentClient24 ) error { var ( sentResult = false err error Expand Down Expand Up @@ -961,8 +983,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, // createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates // the tailnet using the information in the manifest func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23 ) error { return func(ctx context.Context, _ proto.DRPCAgentClient23 ) (retErr error) { func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient24 ) error { return func(ctx context.Context, _ proto.DRPCAgentClient24 ) (retErr error) { if err := manifestOK.wait(ctx); err != nil { return xerrors.Errorf("no manifest: %w", err) } Expand Down Expand Up @@ -1266,7 +1288,7 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t // runCoordinator runs a coordinator and returns whether a reconnect // should occur. func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23 , network *tailnet.Conn) error { func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24 , network *tailnet.Conn) error { defer a.logger.Debug(ctx, "disconnected from coordination RPC") // we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we // gracefully shut down. Expand Down Expand Up @@ -1313,7 +1335,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai } // runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur. func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23 , network *tailnet.Conn) error { func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24 , network *tailnet.Conn) error { defer a.logger.Debug(ctx, "disconnected from derp map RPC") ctx, cancel := context.WithCancel(ctx) defer cancel() Expand Down Expand Up @@ -1683,16 +1705,16 @@ const ( type apiConnRoutineManager struct { logger slog.Logger aAPI proto.DRPCAgentClient23 tAPI tailnetproto.DRPCTailnetClient23 aAPI proto.DRPCAgentClient24 tAPI tailnetproto.DRPCTailnetClient24 eg *errgroup.Group stopCtx context.Context remainCtx context.Context } func newAPIConnRoutineManager( gracefulCtx, hardCtx context.Context, logger slog.Logger, aAPI proto.DRPCAgentClient23 , tAPI tailnetproto.DRPCTailnetClient23 , aAPI proto.DRPCAgentClient24 , tAPI tailnetproto.DRPCTailnetClient24 , ) *apiConnRoutineManager { // routines that remain in operation during graceful shutdown use the remainCtx. They'll still // exit if the errgroup hits an error, which usually means a problem with the conn. Expand Down Expand Up @@ -1725,7 +1747,7 @@ func newAPIConnRoutineManager( // but for Tailnet. func (a *apiConnRoutineManager) startAgentAPI( name string, behavior gracefulShutdownBehavior, f func(context.Context, proto.DRPCAgentClient23 ) error, f func(context.Context, proto.DRPCAgentClient24 ) error, ) { logger := a.logger.With(slog.F("name", name)) var ctx context.Context Expand Down Expand Up @@ -1762,7 +1784,7 @@ func (a *apiConnRoutineManager) startAgentAPI( // but for the Agent API. func (a *apiConnRoutineManager) startTailnetAPI( name string, behavior gracefulShutdownBehavior, f func(context.Context, tailnetproto.DRPCTailnetClient23 ) error, f func(context.Context, tailnetproto.DRPCTailnetClient24 ) error, ) { logger := a.logger.With(slog.F("name", name)) var ctx context.Context Expand Down