Expand Up @@ -12,8 +12,6 @@ import ( "os" "os/user" "path/filepath" "runtime" "runtime/debug" "sort" "strconv" "strings" Expand All @@ -35,7 +33,6 @@ import ( "tailscale.com/util/clientmetric" "cdr.dev/slog" "github.com/coder/coder/v2/agent/agentproc" "github.com/coder/coder/v2/agent/agentscripts" "github.com/coder/coder/v2/agent/agentssh" "github.com/coder/coder/v2/agent/proto" Expand Down Expand Up @@ -82,12 +79,7 @@ type Options struct { PrometheusRegistry *prometheus.Registry ReportMetadataInterval time.Duration ServiceBannerRefreshInterval time.Duration Syscaller agentproc.Syscaller // ModifiedProcesses is used for testing process priority management. ModifiedProcesses chan []*agentproc.Process // ProcessManagementTick is used for testing process priority management. ProcessManagementTick <-chan time.Time BlockFileTransfer bool BlockFileTransfer bool } type Client interface { Expand Down Expand Up @@ -147,10 +139,6 @@ func New(options Options) Agent { prometheusRegistry = prometheus.NewRegistry() } if options.Syscaller == nil { options.Syscaller = agentproc.NewSyscaller() } hardCtx, hardCancel := context.WithCancel(context.Background()) gracefulCtx, gracefulCancel := context.WithCancel(hardCtx) a := &agent{ Expand Down Expand Up @@ -178,9 +166,6 @@ func New(options Options) Agent { announcementBannersRefreshInterval: options.ServiceBannerRefreshInterval, sshMaxTimeout: options.SSHMaxTimeout, subsystems: options.Subsystems, syscaller: options.Syscaller, modifiedProcs: options.ModifiedProcesses, processManagementTick: options.ProcessManagementTick, logSender: agentsdk.NewLogSender(options.Logger), blockFileTransfer: options.BlockFileTransfer, Expand Down Expand Up @@ -253,13 +238,7 @@ type agent struct { prometheusRegistry *prometheus.Registry // metrics are prometheus registered metrics that will be collected and // labeled in Coder with the agent + workspace. metrics *agentMetrics syscaller agentproc.Syscaller // modifiedProcs is used for testing process priority management. modifiedProcs chan []*agentproc.Process // processManagementTick is used for testing process priority management. processManagementTick <-chan time.Time metrics *agentMetrics } func (a *agent) TailnetConn() *tailnet.Conn { Expand Down Expand Up @@ -308,8 +287,6 @@ func (a *agent) init() { // may be happening, but regardless after the intermittent // failure, you'll want the agent to reconnect. func (a *agent) runLoop() { go a.manageProcessPriorityUntilGracefulShutdown() // need to keep retrying up to the hardCtx so that we can send graceful shutdown-related // messages. ctx := a.hardCtx Expand Down Expand Up @@ -1443,162 +1420,6 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect return stats } var prioritizedProcs = []string{"coder agent"} func (a *agent) manageProcessPriorityUntilGracefulShutdown() { // process priority can stop as soon as we are gracefully shutting down ctx := a.gracefulCtx defer func() { if r := recover(); r != nil { a.logger.Critical(ctx, "recovered from panic", slog.F("panic", r), slog.F("stack", string(debug.Stack())), ) } }() if val := a.environmentVariables[EnvProcPrioMgmt]; val == "" || runtime.GOOS != "linux" { a.logger.Debug(ctx, "process priority not enabled, agent will not manage process niceness/oom_score_adj ", slog.F("env_var", EnvProcPrioMgmt), slog.F("value", val), slog.F("goos", runtime.GOOS), ) return } if a.processManagementTick == nil { ticker := time.NewTicker(time.Second) defer ticker.Stop() a.processManagementTick = ticker.C } oomScore := unsetOOMScore if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok { score, err := strconv.Atoi(strings.TrimSpace(scoreStr)) if err == nil && score >= -1000 && score <= 1000 { oomScore = score } else { a.logger.Error(ctx, "invalid oom score", slog.F("min_value", -1000), slog.F("max_value", 1000), slog.F("value", scoreStr), ) } } debouncer := &logDebouncer{ logger: a.logger, messages: map[string]time.Time{}, interval: time.Minute, } for { procs, err := a.manageProcessPriority(ctx, debouncer, oomScore) // Avoid spamming the logs too often. if err != nil { debouncer.Error(ctx, "manage process priority", slog.Error(err), ) } if a.modifiedProcs != nil { a.modifiedProcs <- procs } select { case <-a.processManagementTick: case <-ctx.Done(): return } } } // unsetOOMScore is set to an invalid OOM score to imply an unset value. const unsetOOMScore = 1001 func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) { const ( niceness = 10 ) // We fetch the agent score each time because it's possible someone updates the // value after it is started. agentScore, err := a.getAgentOOMScore() if err != nil { agentScore = unsetOOMScore } if oomScore == unsetOOMScore && agentScore != unsetOOMScore { // If the child score has not been explicitly specified we should // set it to a score relative to the agent score. oomScore = childOOMScore(agentScore) } procs, err := agentproc.List(a.filesystem, a.syscaller) if err != nil { return nil, xerrors.Errorf("list: %w", err) } modProcs := []*agentproc.Process{} for _, proc := range procs { containsFn := func(e string) bool { contains := strings.Contains(proc.Cmd(), e) return contains } // If the process is prioritized we should adjust // it's oom_score_adj and avoid lowering its niceness. if slices.ContainsFunc(prioritizedProcs, containsFn) { continue } score, niceErr := proc.Niceness(a.syscaller) if niceErr != nil && !isBenignProcessErr(niceErr) { debouncer.Warn(ctx, "unable to get proc niceness", slog.F("cmd", proc.Cmd()), slog.F("pid", proc.PID), slog.Error(niceErr), ) } // We only want processes that don't have a nice value set // so we don't override user nice values. // Getpriority actually returns priority for the nice value // which is niceness + 20, so here 20 = a niceness of 0 (aka unset). if score != 20 { // We don't log here since it can get spammy continue } if niceErr == nil { err := proc.SetNiceness(a.syscaller, niceness) if err != nil && !isBenignProcessErr(err) { debouncer.Warn(ctx, "unable to set proc niceness", slog.F("cmd", proc.Cmd()), slog.F("pid", proc.PID), slog.F("niceness", niceness), slog.Error(err), ) } } // If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it. if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) { oomScoreStr := strconv.Itoa(oomScore) err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644) if err != nil && !isBenignProcessErr(err) { debouncer.Warn(ctx, "unable to set oom_score_adj", slog.F("cmd", proc.Cmd()), slog.F("pid", proc.PID), slog.F("score", oomScoreStr), slog.Error(err), ) } } modProcs = append(modProcs, proc) } return modProcs, nil } // isClosed returns whether the API is closed or not. func (a *agent) isClosed() bool { return a.hardCtx.Err() != nil Expand Down Expand Up @@ -1992,88 +1813,3 @@ func PrometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger sl } }) } // childOOMScore returns the oom_score_adj for a child process. It is based // on the oom_score_adj of the agent process. func childOOMScore(agentScore int) int { // If the agent has a negative oom_score_adj, we set the child to 0 // so it's treated like every other process. if agentScore < 0 { return 0 } // If the agent is already almost at the maximum then set it to the max. if agentScore >= 998 { return 1000 } // If the agent oom_score_adj is >=0, we set the child to slightly // less than the maximum. If users want a different score they set it // directly. return 998 } func (a *agent) getAgentOOMScore() (int, error) { scoreStr, err := afero.ReadFile(a.filesystem, "/proc/self/oom_score_adj") if err != nil { return 0, xerrors.Errorf("read file: %w", err) } score, err := strconv.Atoi(strings.TrimSpace(string(scoreStr))) if err != nil { return 0, xerrors.Errorf("parse int: %w", err) } return score, nil } // isCustomOOMScore checks to see if the oom_score_adj is not a value that would // originate from an agent-spawned process. func isCustomOOMScore(agentScore int, process *agentproc.Process) bool { score := process.OOMScoreAdj return agentScore != score && score != 1000 && score != 0 && score != 998 } // logDebouncer skips writing a log for a particular message if // it's been emitted within the given interval duration. // It's a shoddy implementation used in one spot that should be replaced at // some point. type logDebouncer struct { logger slog.Logger messages map[string]time.Time interval time.Duration } func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) { l.log(ctx, slog.LevelWarn, msg, fields...) } func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) { l.log(ctx, slog.LevelError, msg, fields...) } func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) { // This (bad) implementation assumes you wouldn't reuse the same msg // for different levels. if last, ok := l.messages[msg]; ok && time.Since(last) < l.interval { return } switch level { case slog.LevelWarn: l.logger.Warn(ctx, msg, fields...) case slog.LevelError: l.logger.Error(ctx, msg, fields...) } l.messages[msg] = time.Now() } func isBenignProcessErr(err error) bool { return err != nil && (xerrors.Is(err, os.ErrNotExist) || xerrors.Is(err, os.ErrPermission) || isNoSuchProcessErr(err)) } func isNoSuchProcessErr(err error) bool { return err != nil && strings.Contains(err.Error(), "no such process") }