Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: refactor agent resource monitoring API to avoid excessive calls to DB#20430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
cstyan merged 6 commits intomainfromcallum/workspace-agent-call-volume
Oct 28, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletionscoderd/agentapi/api.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -239,6 +239,10 @@ func (a *API) Serve(ctx context.Context, l net.Listener) error {
return xerrors.Errorf("create agent API server: %w", err)
}

if err := a.ResourcesMonitoringAPI.InitMonitors(ctx); err != nil {
return xerrors.Errorf("initialize resource monitoring: %w", err)
}

return server.Serve(ctx, l)
}

Expand Down
87 changes: 52 additions & 35 deletionscoderd/agentapi/resources_monitoring.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"
"time"

"golang.org/x/xerrors"
Expand DownExpand Up@@ -33,42 +34,60 @@ type ResourcesMonitoringAPI struct {

Debounce time.Duration
Config resourcesmonitor.Config

// Cache resource monitors on first call to avoid millions of DB queries per day.
memoryMonitor database.WorkspaceAgentMemoryResourceMonitor
volumeMonitors []database.WorkspaceAgentVolumeResourceMonitor
monitorsLock sync.RWMutex
}

func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
memoryMonitor, memoryErr := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if memoryErr != nil && !errors.Is(memoryErr, sql.ErrNoRows) {
return nil, xerrors.Errorf("failed to fetch memory resource monitor: %w", memoryErr)
// InitMonitors fetches resource monitors from the database and caches them.
// This must be called once after creating a ResourcesMonitoringAPI, the context should be
// the agent per-RPC connection context. If fetching fails with a real error (not sql.ErrNoRows), the
// connection should be torn down.
func (a *ResourcesMonitoringAPI) InitMonitors(ctx context.Context) error {
memMon, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("fetch memory resource monitor: %w", err)
}
// If sql.ErrNoRows, memoryMonitor stays as zero value (CreatedAt.IsZero() = true).
// Otherwise, store the fetched monitor.
if err == nil {
a.memoryMonitor = memMon
}

volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
volMons, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
returnnil,xerrors.Errorf("failed tofetch volume resource monitors: %w", err)
return xerrors.Errorf("fetch volume resource monitors: %w", err)
}
// 0 length is valid, indicating none configured, since the volume monitors in the DB can be many.
a.volumeMonitors = volMons

return nil
}

func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *proto.GetResourcesMonitoringConfigurationRequest) (*proto.GetResourcesMonitoringConfigurationResponse, error) {
return &proto.GetResourcesMonitoringConfigurationResponse{
Config: &proto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: int32(a.Config.CollectionInterval.Seconds()),
NumDatapoints: a.Config.NumDatapoints,
},
Memory: func() *proto.GetResourcesMonitoringConfigurationResponse_Memory {
ifmemoryErr != nil {
ifa.memoryMonitor.CreatedAt.IsZero() {
return nil
}

return &proto.GetResourcesMonitoringConfigurationResponse_Memory{
Enabled: memoryMonitor.Enabled,
Enabled:a.memoryMonitor.Enabled,
}
}(),
Volumes: func() []*proto.GetResourcesMonitoringConfigurationResponse_Volume {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(volumeMonitors))
for _, monitor := range volumeMonitors {
volumes := make([]*proto.GetResourcesMonitoringConfigurationResponse_Volume, 0, len(a.volumeMonitors))
for _, monitor := rangea.volumeMonitors {
volumes = append(volumes, &proto.GetResourcesMonitoringConfigurationResponse_Volume{
Enabled: monitor.Enabled,
Path: monitor.Path,
})
}

return volumes
}(),
}, nil
Expand All@@ -77,6 +96,10 @@ func (a *ResourcesMonitoringAPI) GetResourcesMonitoringConfiguration(ctx context
func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Context, req *proto.PushResourcesMonitoringUsageRequest) (*proto.PushResourcesMonitoringUsageResponse, error) {
var err error

// Lock for the entire push operation since calls are sequential from the agent
a.monitorsLock.Lock()
defer a.monitorsLock.Unlock()

if memoryErr := a.monitorMemory(ctx, req.Datapoints); memoryErr != nil {
err = errors.Join(err, xerrors.Errorf("monitor memory: %w", memoryErr))
}
Expand All@@ -89,18 +112,7 @@ func (a *ResourcesMonitoringAPI) PushResourcesMonitoringUsage(ctx context.Contex
}

func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
monitor, err := a.Database.FetchMemoryResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
// It is valid for an agent to not have a memory monitor, so we
// do not want to treat it as an error.
if errors.Is(err, sql.ErrNoRows) {
return nil
}

return xerrors.Errorf("fetch memory resource monitor: %w", err)
}

if !monitor.Enabled {
if !a.memoryMonitor.Enabled {
return nil
}

Expand All@@ -109,15 +121,15 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
usageDatapoints = append(usageDatapoints, datapoint.Memory)
}

usageStates := resourcesmonitor.CalculateMemoryUsageStates(monitor, usageDatapoints)
usageStates := resourcesmonitor.CalculateMemoryUsageStates(a.memoryMonitor, usageDatapoints)

oldState :=monitor.State
oldState :=a.memoryMonitor.State
newState := resourcesmonitor.NextState(a.Config, oldState, usageStates)

debouncedUntil, shouldNotify :=monitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)
debouncedUntil, shouldNotify :=a.memoryMonitor.Debounce(a.Debounce, a.Clock.Now(), oldState, newState)

//nolint:gocritic // We need to be able to update the resource monitor here.
err = a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
err:= a.Database.UpdateMemoryResourceMonitor(dbauthz.AsResourceMonitor(ctx), database.UpdateMemoryResourceMonitorParams{
AgentID: a.AgentID,
State: newState,
UpdatedAt: dbtime.Time(a.Clock.Now()),
Expand All@@ -127,6 +139,11 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
return xerrors.Errorf("update workspace monitor: %w", err)
}

// Update cached state
a.memoryMonitor.State = newState
a.memoryMonitor.DebouncedUntil = dbtime.Time(debouncedUntil)
a.memoryMonitor.UpdatedAt = dbtime.Time(a.Clock.Now())

if !shouldNotify {
return nil
}
Expand All@@ -143,7 +160,7 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
notifications.TemplateWorkspaceOutOfMemory,
map[string]string{
"workspace": workspace.Name,
"threshold": fmt.Sprintf("%d%%",monitor.Threshold),
"threshold": fmt.Sprintf("%d%%",a.memoryMonitor.Threshold),
},
map[string]any{
// NOTE(DanielleMaywood):
Expand All@@ -169,14 +186,9 @@ func (a *ResourcesMonitoringAPI) monitorMemory(ctx context.Context, datapoints [
}

func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints []*proto.PushResourcesMonitoringUsageRequest_Datapoint) error {
volumeMonitors, err := a.Database.FetchVolumesResourceMonitorsByAgentID(ctx, a.AgentID)
if err != nil {
return xerrors.Errorf("get or insert volume monitor: %w", err)
}

outOfDiskVolumes := make([]map[string]any, 0)

for_, monitor := range volumeMonitors {
fori, monitor := rangea.volumeMonitors {
if !monitor.Enabled {
continue
}
Expand DownExpand Up@@ -219,6 +231,11 @@ func (a *ResourcesMonitoringAPI) monitorVolumes(ctx context.Context, datapoints
}); err != nil {
return xerrors.Errorf("update workspace monitor: %w", err)
}

// Update cached state
a.volumeMonitors[i].State = newState
a.volumeMonitors[i].DebouncedUntil = dbtime.Time(debouncedUntil)
a.volumeMonitors[i].UpdatedAt = dbtime.Time(a.Clock.Now())
}

if len(outOfDiskVolumes) == 0 {
Expand Down
26 changes: 26 additions & 0 deletionscoderd/agentapi/resources_monitoring_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -101,6 +101,9 @@ func TestMemoryResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: The monitor is given a state that will trigger NOK
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand DownExpand Up@@ -304,6 +307,9 @@ func TestMemoryResourceMonitor(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
Expand DownExpand Up@@ -337,6 +343,8 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
State: database.WorkspaceAgentMonitorStateOK,
Threshold: 80,
})
// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Expand DownExpand Up@@ -387,6 +395,9 @@ func TestMemoryResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand DownExpand Up@@ -466,6 +477,9 @@ func TestVolumeResourceMonitorDebounce(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When:
// - First monitor is in a NOK state
// - Second monitor is in an OK state
Expand DownExpand Up@@ -742,6 +756,9 @@ func TestVolumeResourceMonitor(t *testing.T) {
Threshold: tt.thresholdPercent,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

clock.Set(collectedAt)
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: datapoints,
Expand DownExpand Up@@ -780,6 +797,9 @@ func TestVolumeResourceMonitorMultiple(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: both of them move to a NOK state
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand DownExpand Up@@ -832,6 +852,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two NOK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand DownExpand Up@@ -891,6 +914,9 @@ func TestVolumeResourceMonitorMissingData(t *testing.T) {
Threshold: 80,
})

// Initialize API to fetch and cache the monitors
require.NoError(t, api.InitMonitors(context.Background()))

// When: A datapoint is missing, surrounded by two OK datapoints.
_, err := api.PushResourcesMonitoringUsage(context.Background(), &agentproto.PushResourcesMonitoringUsageRequest{
Datapoints: []*agentproto.PushResourcesMonitoringUsageRequest_Datapoint{
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp