- Notifications
You must be signed in to change notification settings - Fork1.1k
chore: implement oom/ood processing component#16436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
6c6240bb3081dea9c86761a84f9678ede460d2b9700df2fd5854d81a9d9d7b44c21ce7944fdb56444176bc87268d2265f682a985262621d481f43d37522b3769c4f4244ebf65714e7437cf5212d08e713ed42eae1b0d0d251b16c64e43bab4e144aeda25eccfe1e805abbd522456989e1550cc67998f89bda8f299d662a3bff48dcc343a70babc48f01ca549a975810ee35d85d2fa8df27d78d1File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -17,10 +17,12 @@ import ( | ||
| "cdr.dev/slog" | ||
| agentproto "github.com/coder/coder/v2/agent/proto" | ||
| "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor" | ||
| "github.com/coder/coder/v2/coderd/appearance" | ||
| "github.com/coder/coder/v2/coderd/database" | ||
| "github.com/coder/coder/v2/coderd/database/pubsub" | ||
| "github.com/coder/coder/v2/coderd/externalauth" | ||
| "github.com/coder/coder/v2/coderd/notifications" | ||
| "github.com/coder/coder/v2/coderd/prometheusmetrics" | ||
| "github.com/coder/coder/v2/coderd/tracing" | ||
| "github.com/coder/coder/v2/coderd/workspacestats" | ||
| @@ -29,6 +31,7 @@ import ( | ||
| "github.com/coder/coder/v2/codersdk/agentsdk" | ||
| "github.com/coder/coder/v2/tailnet" | ||
| tailnetproto "github.com/coder/coder/v2/tailnet/proto" | ||
| "github.com/coder/quartz" | ||
| ) | ||
| // API implements the DRPC agent API interface from agent/proto. This struct is | ||
| @@ -59,7 +62,9 @@ type Options struct { | ||
| Ctx context.Context | ||
| Log slog.Logger | ||
| Clock quartz.Clock | ||
DanielleMaywood marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| Database database.Store | ||
| NotificationsEnqueuer notifications.Enqueuer | ||
| Pubsub pubsub.Pubsub | ||
| DerpMapFn func() *tailcfg.DERPMap | ||
| TailnetCoordinator *atomic.Pointer[tailnet.Coordinator] | ||
| @@ -82,6 +87,10 @@ type Options struct { | ||
| } | ||
| func New(opts Options) *API { | ||
| if opts.Clock == nil { | ||
| opts.Clock = quartz.NewReal() | ||
| } | ||
| api := &API{ | ||
| opts: opts, | ||
| mu: sync.Mutex{}, | ||
| @@ -104,9 +113,22 @@ func New(opts Options) *API { | ||
| } | ||
| api.ResourcesMonitoringAPI = &ResourcesMonitoringAPI{ | ||
| AgentID: opts.AgentID, | ||
| WorkspaceID: opts.WorkspaceID, | ||
| Clock: opts.Clock, | ||
| Database: opts.Database, | ||
| NotificationsEnqueuer: opts.NotificationsEnqueuer, | ||
| Debounce: 5 * time.Minute, | ||
DanielleMaywood marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| Config: resourcesmonitor.Config{ | ||
| NumDatapoints: 20, | ||
| CollectionInterval: 10 * time.Second, | ||
| Alert: resourcesmonitor.AlertConfig{ | ||
| MinimumNOKsPercent: 20, | ||
| ConsecutiveNOKsPercent: 50, | ||
| }, | ||
| }, | ||
| } | ||
| api.StatsAPI = &StatsAPI{ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -4,20 +4,35 @@ import ( | ||
| "context" | ||
| "database/sql" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
| "golang.org/x/xerrors" | ||
| "cdr.dev/slog" | ||
| "github.com/google/uuid" | ||
| "github.com/coder/coder/v2/agent/proto" | ||
| "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor" | ||
| "github.com/coder/coder/v2/coderd/database" | ||
| "github.com/coder/coder/v2/coderd/database/dbauthz" | ||
| "github.com/coder/coder/v2/coderd/database/dbtime" | ||
| "github.com/coder/coder/v2/coderd/notifications" | ||
| "github.com/coder/quartz" | ||
| ) | ||
| type ResourcesMonitoringAPI struct { | ||
| AgentID uuid.UUID | ||
| WorkspaceID uuid.UUID | ||
| Log slog.Logger | ||
| Clock quartz.Clock | ||
| Database database.Store | ||
| NotificationsEnqueuer notifications.Enqueuer | ||
| Debounce time.Duration | ||
| Config resourcesmonitor.Config | ||
| } | ||
| func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) { | ||
| @@ -33,8 +48,8 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context | ||
| return &proto.GetResourcesMonitoringConfigurationResponse{ | ||
| Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{ | ||
| CollectionIntervalSeconds:int32(a.Config.CollectionInterval.Seconds()), | ||
| NumDatapoints:a.Config.NumDatapoints, | ||
| }, | ||
| Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory { | ||
| if memoryErr != nil { | ||
| @@ -60,8 +75,182 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context | ||
| } | ||
| func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) { | ||
| var err error | ||
| if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil { | ||
| err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr)) | ||
| } | ||
| if volumeErr := a.monitorVolumes(ctx, req.Datapoints); volumeErr != nil { | ||
| err = errors.Join(err, xerrors.Errorf("monitor volume: %w", volumeErr)) | ||
| } | ||
| return &proto.PushResourcesMonitoringUsageResponse{}, err | ||
| } | ||
| func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error { | ||
| monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID) | ||
DanielleMaywood marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| if err != nil { | ||
| // It is valid for an agent to not have a memory monitor, so we | ||
| // do not want to treat it as an error. | ||
| if errors.Is(err, sql.ErrNoRows) { | ||
| return nil | ||
| } | ||
| return xerrors.Errorf("fetch memory resource monitor: %w", err) | ||
| } | ||
| if !monitor.Enabled { | ||
| return nil | ||
| } | ||
| usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage, 0, len(datapoints)) | ||
| for _, datapoint := range datapoints { | ||
| usageDatapoints = append(usageDatapoints, datapoint.Memory) | ||
| } | ||
| usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints) | ||
| oldState := monitor.State | ||
| newState := resourcesmonitor.NextState(a.Config, oldState, usageStates) | ||
| debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState) | ||
| //nolint:gocritic // We need to be able to update the resource monitor here. | ||
| err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{ | ||
| AgentID: a.AgentID, | ||
| State: newState, | ||
| UpdatedAt: dbtime.Time(a.Clock.Now()), | ||
| DebouncedUntil: dbtime.Time(debouncedUntil), | ||
| }) | ||
| if err != nil { | ||
| return xerrors.Errorf("update workspace monitor: %w", err) | ||
| } | ||
| if !shouldNotify { | ||
| return nil | ||
| } | ||
| workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID) | ||
| if err != nil { | ||
| return xerrors.Errorf("get workspace by id: %w", err) | ||
| } | ||
| _, err = a.NotificationsEnqueuer.EnqueueWithData( | ||
| // nolint:gocritic // We need to be able to send the notification. | ||
| dbauthz.AsNotifier(ctx), | ||
| workspace.OwnerID, | ||
| notifications.TemplateWorkspaceOutOfMemory, | ||
| map[string]string{ | ||
| "workspace": workspace.Name, | ||
| "threshold": fmt.Sprintf("%d%%", monitor.Threshold), | ||
| }, | ||
| map[string]any{ | ||
| // NOTE(DanielleMaywood): | ||
| // When notifications are enqueued, they are checked to be | ||
| // unique within a single day. This means that if we attempt | ||
| // to send two OOM notifications for the same workspace on | ||
| // the same day, the enqueuer will prevent us from sending | ||
| // a second one. We are inject a timestamp to make the | ||
| // notifications appear different enough to circumvent this | ||
| // deduplication logic. | ||
| "timestamp": a.Clock.Now(), | ||
| }, | ||
| "workspace-monitor-memory", | ||
| ) | ||
| if err != nil { | ||
| return xerrors.Errorf("notify workspace OOM: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
| func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error { | ||
| volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID) | ||
| if err != nil { | ||
| return xerrors.Errorf("get or insert volume monitor: %w", err) | ||
| } | ||
| outOfDiskVolumes := make([]map[string]any, 0) | ||
dannykopping marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| for _, monitor := range volumeMonitors { | ||
| if !monitor.Enabled { | ||
| continue | ||
| } | ||
| usageDatapoints := make([]*proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage, 0, len(datapoints)) | ||
| for _, datapoint := range datapoints { | ||
| var usage *proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage | ||
| for _, volume := range datapoint.Volumes { | ||
| if volume.Volume == monitor.Path { | ||
| usage = volume | ||
| break | ||
| } | ||
| } | ||
| usageDatapoints = append(usageDatapoints, usage) | ||
| } | ||
| usageStates := resourcesmonitor.CalculateVolumeUsageStates(monitor, usageDatapoints) | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. On second thought, given that we're having to do this for more memory & volumes, I think we should seriously consider updating the agent to send back that bool to indicate enabled but failed to collect; that's a 1:1 with your "unknown" logic here. This can be in a follow-up. | ||
| oldState := monitor.State | ||
| newState := resourcesmonitor.NextState(a.Config, oldState, usageStates) | ||
| debouncedUntil, shouldNotify := monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState) | ||
| if shouldNotify { | ||
| outOfDiskVolumes = append(outOfDiskVolumes, map[string]any{ | ||
| "path": monitor.Path, | ||
| "threshold": fmt.Sprintf("%d%%", monitor.Threshold), | ||
| }) | ||
| } | ||
| //nolint:gocritic // We need to be able to update the resource monitor here. | ||
| if err := a.Database.UpdateVolumeResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateVolumeResourceMonitorParams{ | ||
| AgentID: a.AgentID, | ||
| Path: monitor.Path, | ||
| State: newState, | ||
| UpdatedAt: dbtime.Time(a.Clock.Now()), | ||
| DebouncedUntil: dbtime.Time(debouncedUntil), | ||
| }); err != nil { | ||
| return xerrors.Errorf("update workspace monitor: %w", err) | ||
| } | ||
| } | ||
| if len(outOfDiskVolumes) == 0 { | ||
| return nil | ||
| } | ||
| workspace, err := a.Database.GetWorkspaceByID(ctx, a.WorkspaceID) | ||
| if err != nil { | ||
| return xerrors.Errorf("get workspace by id: %w", err) | ||
| } | ||
| if _, err := a.NotificationsEnqueuer.EnqueueWithData( | ||
| // nolint:gocritic // We need to be able to send the notification. | ||
| dbauthz.AsNotifier(ctx), | ||
| workspace.OwnerID, | ||
| notifications.TemplateWorkspaceOutOfDisk, | ||
| map[string]string{ | ||
| "workspace": workspace.Name, | ||
| }, | ||
| map[string]any{ | ||
| "volumes": outOfDiskVolumes, | ||
| // NOTE(DanielleMaywood): | ||
| // When notifications are enqueued, they are checked to be | ||
| // unique within a single day. This means that if we attempt | ||
| // to send two OOM notifications for the same workspace on | ||
| // the same day, the enqueuer will prevent us from sending | ||
| // a second one. We are inject a timestamp to make the | ||
| // notifications appear different enough to circumvent this | ||
| // deduplication logic. | ||
| "timestamp": a.Clock.Now(), | ||
| }, | ||
| "workspace-monitor-volumes", | ||
| ); err != nil { | ||
| return xerrors.Errorf("notify workspace OOD: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.