@@ -4,20 +4,35 @@ import (
4
4
"context"
5
5
"database/sql"
6
6
"errors"
7
+ "fmt"
8
+ "time"
7
9
8
10
"golang.org/x/xerrors"
9
11
12
+ "cdr.dev/slog"
13
+
10
14
"github.com/google/uuid"
11
15
12
- "cdr.dev/slog"
13
16
"github.com/coder/coder/v2/agent/proto"
17
+ "github.com/coder/coder/v2/coderd/agentapi/resourcesmonitor"
14
18
"github.com/coder/coder/v2/coderd/database"
19
+ "github.com/coder/coder/v2/coderd/database/dbauthz"
20
+ "github.com/coder/coder/v2/coderd/database/dbtime"
21
+ "github.com/coder/coder/v2/coderd/notifications"
22
+ "github.com/coder/quartz"
15
23
)
16
24
17
25
type ResourcesMonitoringAPI struct {
18
- AgentID uuid.UUID
19
- Database database.Store
20
- Log slog.Logger
26
+ AgentID uuid.UUID
27
+ WorkspaceID uuid.UUID
28
+
29
+ Log slog.Logger
30
+ Clock quartz.Clock
31
+ Database database.Store
32
+ NotificationsEnqueuer notifications.Enqueuer
33
+
34
+ Debounce time.Duration
35
+ Config resourcesmonitor.Config
21
36
}
22
37
23
38
func (a * ResourcesMonitoringAPI )GetResourcesMonitoringConfiguration (ctx context.Context ,_ * proto.GetResourcesMonitoringConfigurationRequest ) (* proto.GetResourcesMonitoringConfigurationResponse ,error ) {
@@ -33,8 +48,8 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
33
48
34
49
return & proto.GetResourcesMonitoringConfigurationResponse {
35
50
Config :& proto.GetResourcesMonitoringConfigurationResponse_Config {
36
- CollectionIntervalSeconds :10 ,
37
- NumDatapoints :20 ,
51
+ CollectionIntervalSeconds :int32 ( a . Config . CollectionInterval . Seconds ()) ,
52
+ NumDatapoints :a . Config . NumDatapoints ,
38
53
},
39
54
Memory :func ()* proto.GetResourcesMonitoringConfigurationResponse_Memory {
40
55
if memoryErr != nil {
@@ -60,8 +75,182 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
60
75
}
61
76
62
77
func (a * ResourcesMonitoringAPI )PushResourcesMonitoringUsage (ctx context.Context ,req * proto.PushResourcesMonitoringUsageRequest ) (* proto.PushResourcesMonitoringUsageResponse ,error ) {
63
- a .Log .Info (ctx ,"resources monitoring usage received" ,
64
- slog .F ("request" ,req ))
78
+ var err error
79
+
80
+ if memoryErr := a .monitorMemory (ctx ,req .Datapoints );memoryErr != nil {
81
+ err = errors .Join (err ,xerrors .Errorf ("monitor memory: %w" ,memoryErr ))
82
+ }
83
+
84
+ if volumeErr := a .monitorVolumes (ctx ,req .Datapoints );volumeErr != nil {
85
+ err = errors .Join (err ,xerrors .Errorf ("monitor volume: %w" ,volumeErr ))
86
+ }
87
+
88
+ return & proto.PushResourcesMonitoringUsageResponse {},err
89
+ }
90
+
91
+ func (a * ResourcesMonitoringAPI )monitorMemory (ctx context.Context ,datapoints []* proto.PushResourcesMonitoringUsageRequest_Datapoint )error {
92
+ monitor ,err := a .Database .FetchMemoryResourceMonitorsByAgentID (ctx ,a .AgentID )
93
+ if err != nil {
94
+ // It is valid for an agent to not have a memory monitor, so we
95
+ // do not want to treat it as an error.
96
+ if errors .Is (err ,sql .ErrNoRows ) {
97
+ return nil
98
+ }
99
+
100
+ return xerrors .Errorf ("fetch memory resource monitor: %w" ,err )
101
+ }
102
+
103
+ if ! monitor .Enabled {
104
+ return nil
105
+ }
106
+
107
+ usageDatapoints := make ([]* proto.PushResourcesMonitoringUsageRequest_Datapoint_MemoryUsage ,0 ,len (datapoints ))
108
+ for _ ,datapoint := range datapoints {
109
+ usageDatapoints = append (usageDatapoints ,datapoint .Memory )
110
+ }
111
+
112
+ usageStates := resourcesmonitor .CalculateMemoryUsageStates (monitor ,usageDatapoints )
113
+
114
+ oldState := monitor .State
115
+ newState := resourcesmonitor .NextState (a .Config ,oldState ,usageStates )
116
+
117
+ debouncedUntil ,shouldNotify := monitor .Debounce (a .Debounce ,a .Clock .Now (),oldState ,newState )
118
+
119
+ //nolint:gocritic // We need to be able to update the resource monitor here.
120
+ err = a .Database .UpdateMemoryResourceMonitor (dbauthz .AsResourceMonitor (ctx ), database.UpdateMemoryResourceMonitorParams {
121
+ AgentID :a .AgentID ,
122
+ State :newState ,
123
+ UpdatedAt :dbtime .Time (a .Clock .Now ()),
124
+ DebouncedUntil :dbtime .Time (debouncedUntil ),
125
+ })
126
+ if err != nil {
127
+ return xerrors .Errorf ("update workspace monitor: %w" ,err )
128
+ }
129
+
130
+ if ! shouldNotify {
131
+ return nil
132
+ }
133
+
134
+ workspace ,err := a .Database .GetWorkspaceByID (ctx ,a .WorkspaceID )
135
+ if err != nil {
136
+ return xerrors .Errorf ("get workspace by id: %w" ,err )
137
+ }
138
+
139
+ _ ,err = a .NotificationsEnqueuer .EnqueueWithData (
140
+ // nolint:gocritic // We need to be able to send the notification.
141
+ dbauthz .AsNotifier (ctx ),
142
+ workspace .OwnerID ,
143
+ notifications .TemplateWorkspaceOutOfMemory ,
144
+ map [string ]string {
145
+ "workspace" :workspace .Name ,
146
+ "threshold" :fmt .Sprintf ("%d%%" ,monitor .Threshold ),
147
+ },
148
+ map [string ]any {
149
+ // NOTE(DanielleMaywood):
150
+ // When notifications are enqueued, they are checked to be
151
+ // unique within a single day. This means that if we attempt
152
+ // to send two OOM notifications for the same workspace on
153
+ // the same day, the enqueuer will prevent us from sending
154
+ // a second one. We are inject a timestamp to make the
155
+ // notifications appear different enough to circumvent this
156
+ // deduplication logic.
157
+ "timestamp" :a .Clock .Now (),
158
+ },
159
+ "workspace-monitor-memory" ,
160
+ )
161
+ if err != nil {
162
+ return xerrors .Errorf ("notify workspace OOM: %w" ,err )
163
+ }
164
+
165
+ return nil
166
+ }
167
+
168
+ func (a * ResourcesMonitoringAPI )monitorVolumes (ctx context.Context ,datapoints []* proto.PushResourcesMonitoringUsageRequest_Datapoint )error {
169
+ volumeMonitors ,err := a .Database .FetchVolumesResourceMonitorsByAgentID (ctx ,a .AgentID )
170
+ if err != nil {
171
+ return xerrors .Errorf ("get or insert volume monitor: %w" ,err )
172
+ }
173
+
174
+ outOfDiskVolumes := make ([]map [string ]any ,0 )
175
+
176
+ for _ ,monitor := range volumeMonitors {
177
+ if ! monitor .Enabled {
178
+ continue
179
+ }
180
+
181
+ usageDatapoints := make ([]* proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage ,0 ,len (datapoints ))
182
+ for _ ,datapoint := range datapoints {
183
+ var usage * proto.PushResourcesMonitoringUsageRequest_Datapoint_VolumeUsage
184
+
185
+ for _ ,volume := range datapoint .Volumes {
186
+ if volume .Volume == monitor .Path {
187
+ usage = volume
188
+ break
189
+ }
190
+ }
191
+
192
+ usageDatapoints = append (usageDatapoints ,usage )
193
+ }
194
+
195
+ usageStates := resourcesmonitor .CalculateVolumeUsageStates (monitor ,usageDatapoints )
196
+
197
+ oldState := monitor .State
198
+ newState := resourcesmonitor .NextState (a .Config ,oldState ,usageStates )
199
+
200
+ debouncedUntil ,shouldNotify := monitor .Debounce (a .Debounce ,a .Clock .Now (),oldState ,newState )
201
+
202
+ if shouldNotify {
203
+ outOfDiskVolumes = append (outOfDiskVolumes ,map [string ]any {
204
+ "path" :monitor .Path ,
205
+ "threshold" :fmt .Sprintf ("%d%%" ,monitor .Threshold ),
206
+ })
207
+ }
208
+
209
+ //nolint:gocritic // We need to be able to update the resource monitor here.
210
+ if err := a .Database .UpdateVolumeResourceMonitor (dbauthz .AsResourceMonitor (ctx ), database.UpdateVolumeResourceMonitorParams {
211
+ AgentID :a .AgentID ,
212
+ Path :monitor .Path ,
213
+ State :newState ,
214
+ UpdatedAt :dbtime .Time (a .Clock .Now ()),
215
+ DebouncedUntil :dbtime .Time (debouncedUntil ),
216
+ });err != nil {
217
+ return xerrors .Errorf ("update workspace monitor: %w" ,err )
218
+ }
219
+ }
220
+
221
+ if len (outOfDiskVolumes )== 0 {
222
+ return nil
223
+ }
224
+
225
+ workspace ,err := a .Database .GetWorkspaceByID (ctx ,a .WorkspaceID )
226
+ if err != nil {
227
+ return xerrors .Errorf ("get workspace by id: %w" ,err )
228
+ }
229
+
230
+ if _ ,err := a .NotificationsEnqueuer .EnqueueWithData (
231
+ // nolint:gocritic // We need to be able to send the notification.
232
+ dbauthz .AsNotifier (ctx ),
233
+ workspace .OwnerID ,
234
+ notifications .TemplateWorkspaceOutOfDisk ,
235
+ map [string ]string {
236
+ "workspace" :workspace .Name ,
237
+ },
238
+ map [string ]any {
239
+ "volumes" :outOfDiskVolumes ,
240
+ // NOTE(DanielleMaywood):
241
+ // When notifications are enqueued, they are checked to be
242
+ // unique within a single day. This means that if we attempt
243
+ // to send two OOM notifications for the same workspace on
244
+ // the same day, the enqueuer will prevent us from sending
245
+ // a second one. We are inject a timestamp to make the
246
+ // notifications appear different enough to circumvent this
247
+ // deduplication logic.
248
+ "timestamp" :a .Clock .Now (),
249
+ },
250
+ "workspace-monitor-volumes" ,
251
+ );err != nil {
252
+ return xerrors .Errorf ("notify workspace OOD: %w" ,err )
253
+ }
65
254
66
- return & proto. PushResourcesMonitoringUsageResponse {}, nil
255
+ return nil
67
256
}