Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: stop incrementing activity on empty agent stats#15204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
f0ssel merged 10 commits intomainfromf0ssel/last_used_at_inc_2
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletionscoderd/agentapi/stats_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -69,6 +69,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand DownExpand Up@@ -108,6 +113,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: wut,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand All@@ -125,28 +131,29 @@ func TestUpdateStates(t *testing.T) {
return now
},
}
defer wut.Close()

// Workspace gets fetched.
dbM.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Return(database.GetWorkspaceByAgentIDRow{
Workspace: workspace,
TemplateName: template.Name,
}, nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// We expect an activity bump because ConnectionCount > 0.
dbM.EXPECT().ActivityBumpWorkspace(gomock.Any(), database.ActivityBumpWorkspaceParams{
WorkspaceID: workspace.ID,
NextAutostart: time.Time{}.UTC(),
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID:workspace.ID,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs:[]uuid.UUID{workspace.ID},
LastUsedAt: now,
}).Return(nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
dbM.EXPECT().GetUserByID(gomock.Any(), user.ID).Return(user, nil)

// Ensure that pubsub notifications are sent.
notifyDescription := make(chan []byte)
ps.Subscribe(codersdk.WorkspaceNotifyChannel(workspace.ID), func(_ context.Context, description []byte) {
Expand All@@ -161,6 +168,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(10 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

batcher.Mu.Lock()
defer batcher.Mu.Unlock()
require.Equal(t, int64(1), batcher.Called)
Expand DownExpand Up@@ -213,6 +224,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: workspacestats.NewTracker(dbM),
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
Expand All@@ -230,12 +242,6 @@ func TestUpdateStates(t *testing.T) {
TemplateName: template.Name,
}, nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
}).Return(nil)

_, err := api.UpdateStats(context.Background(), req)
require.NoError(t, err)
})
Expand DownExpand Up@@ -311,6 +317,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand All@@ -330,6 +341,7 @@ func TestUpdateStates(t *testing.T) {
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
UsageTracker: wut,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
Expand All@@ -348,6 +360,7 @@ func TestUpdateStates(t *testing.T) {
return now
},
}
defer wut.Close()

// Workspace gets fetched.
dbM.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Return(database.GetWorkspaceByAgentIDRow{
Expand All@@ -363,9 +376,9 @@ func TestUpdateStates(t *testing.T) {
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID:workspace.ID,
LastUsedAt: now,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs:[]uuid.UUID{workspace.ID},
LastUsedAt: now.UTC(),
}).Return(nil)

// User gets fetched to hit the UpdateAgentMetricsFn.
Expand All@@ -377,6 +390,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(15 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

require.True(t, updateAgentMetricsFnCalled)
})

Expand All@@ -400,6 +417,11 @@ func TestUpdateStates(t *testing.T) {
}
batcher = &workspacestatstest.StatsBatcher{}
updateAgentMetricsFnCalled = false
tickCh = make(chan time.Time)
flushCh = make(chan int, 1)
wut = workspacestats.NewTracker(dbM,
workspacestats.TrackerWithTickFlush(tickCh, flushCh),
)

req = &agentproto.UpdateStatsRequest{
Stats: &agentproto.Stats{
Expand DownExpand Up@@ -430,6 +452,7 @@ func TestUpdateStates(t *testing.T) {
},
}
)
defer wut.Close()
api := agentapi.StatsAPI{
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
Expand All@@ -439,6 +462,7 @@ func TestUpdateStates(t *testing.T) {
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
UsageTracker: wut,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
Expand DownExpand Up@@ -473,8 +497,8 @@ func TestUpdateStates(t *testing.T) {
}).Return(nil)

// Workspace last used at gets bumped.
dbM.EXPECT().UpdateWorkspaceLastUsedAt(gomock.Any(), database.UpdateWorkspaceLastUsedAtParams{
ID:workspace.ID,
dbM.EXPECT().BatchUpdateWorkspaceLastUsedAt(gomock.Any(), database.BatchUpdateWorkspaceLastUsedAtParams{
IDs:[]uuid.UUID{workspace.ID},
LastUsedAt: now,
}).Return(nil)

Expand All@@ -495,6 +519,10 @@ func TestUpdateStates(t *testing.T) {
ReportInterval: durationpb.New(10 * time.Second),
}, resp)

tickCh <- now
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

batcher.Mu.Lock()
defer batcher.Mu.Unlock()
require.EqualValues(t, 1, batcher.Called)
Expand Down
23 changes: 20 additions & 3 deletionscoderd/httpapi/websocket.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2,8 +2,10 @@ package httpapi

import (
"context"
"errors"
"time"

"golang.org/x/xerrors"
"nhooyr.io/websocket"

"cdr.dev/slog"
Expand DownExpand Up@@ -31,7 +33,8 @@ func Heartbeat(ctx context.Context, conn *websocket.Conn) {
// Heartbeat loops to ping a WebSocket to keep it alive. It calls `exit` on ping
// failure.
func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *websocket.Conn) {
ticker := time.NewTicker(15 * time.Second)
interval := 15 * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
Expand All@@ -40,12 +43,26 @@ func HeartbeatClose(ctx context.Context, logger slog.Logger, exit func(), conn *
return
case <-ticker.C:
}
err :=conn.Ping(ctx)
err :=pingWithTimeout(ctx, conn, interval)
if err != nil {
// context.DeadlineExceeded is expected when the client disconnects without sending a close frame
if !errors.Is(err, context.DeadlineExceeded) {
logger.Error(ctx, "failed to heartbeat ping", slog.Error(err))
}
_ = conn.Close(websocket.StatusGoingAway, "Ping failed")
logger.Info(ctx, "failed to heartbeat ping", slog.Error(err))
exit()
return
}
}
}

func pingWithTimeout(ctx context.Context, conn *websocket.Conn, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err := conn.Ping(ctx)
if err != nil {
return xerrors.Errorf("failed to ping: %w", err)
}

return nil
}
6 changes: 2 additions & 4 deletionscoderd/insights_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -700,14 +700,13 @@ func TestTemplateInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err =batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand DownExpand Up@@ -1599,14 +1598,13 @@ func TestUserActivityInsights_Golden(t *testing.T) {
connectionCount = 0
}
for createdAt.Before(stat.endedAt) {
err =batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
batcher.Add(createdAt, workspace.agentID, workspace.template.id, workspace.user.(*testUser).sdk.ID, workspace.id, &agentproto.Stats{
ConnectionCount: connectionCount,
SessionCountVscode: stat.sessionCountVSCode,
SessionCountJetbrains: stat.sessionCountJetBrains,
SessionCountReconnectingPty: stat.sessionCountReconnectingPTY,
SessionCountSsh: stat.sessionCountSSH,
}, false)
require.NoError(t, err, "want no error inserting agent stats")
createdAt = createdAt.Add(30 * time.Second)
}
}
Expand Down
13 changes: 12 additions & 1 deletioncoderd/workspaceagentsrpc_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,6 +3,7 @@ package coderd_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All@@ -11,6 +12,7 @@ import (
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbfake"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/testutil"
Expand All@@ -20,7 +22,12 @@ import (
func TestWorkspaceAgentReportStats(t *testing.T) {
t.Parallel()

client, db := coderdtest.NewWithDatabase(t, nil)
tickCh := make(chan time.Time)
flushCh := make(chan int, 1)
client, db := coderdtest.NewWithDatabase(t, &coderdtest.Options{
WorkspaceUsageTrackerFlush: flushCh,
WorkspaceUsageTrackerTick: tickCh,
})
user := coderdtest.CreateFirstUser(t, client)
r := dbfake.WorkspaceBuild(t, db, database.Workspace{
OrganizationID: user.OrganizationID,
Expand DownExpand Up@@ -53,6 +60,10 @@ func TestWorkspaceAgentReportStats(t *testing.T) {
})
require.NoError(t, err)

tickCh <- dbtime.Now()
count := <-flushCh
require.Equal(t, 1, count, "expected one flush with one id")

newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID)
require.NoError(t, err)

Expand Down
7 changes: 3 additions & 4 deletionscoderd/workspaceapps/proxy.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -593,7 +593,6 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))

report := newStatsReportFromSignedToken(appToken)
s.collectStats(report)
defer func() {
// We must use defer here because ServeHTTP may panic.
report.SessionEndedAt = dbtime.Now()
Expand All@@ -614,7 +613,8 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
// @Success 101
// @Router /workspaceagents/{workspaceagent}/pty [get]
func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()

s.websocketWaitMutex.Lock()
s.websocketWaitGroup.Add(1)
Expand DownExpand Up@@ -670,12 +670,11 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
})
return
}
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)

ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.

go httpapi.Heartbeat(ctx, conn)

agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
if err != nil {
log.Debug(ctx, "dial workspace agent", slog.Error(err))
Expand Down
5 changes: 2 additions & 3 deletionscoderd/workspacestats/batcher.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -25,7 +25,7 @@ const (
)

type Batcher interface {
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool) error
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats, usage bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

praise: This does make the whole interface more convenient to use 👍

}

// DBBatcher holds a buffer of agent stats and periodically flushes them to
Expand DownExpand Up@@ -139,7 +139,7 @@ func (b *DBBatcher) Add(
workspaceID uuid.UUID,
st *agentproto.Stats,
usage bool,
)error{
) {
b.mu.Lock()
defer b.mu.Unlock()

Expand DownExpand Up@@ -176,7 +176,6 @@ func (b *DBBatcher) Add(
b.flushLever <- struct{}{}
b.flushForced.Store(true)
}
return nil
}

// Run runs the batcher.
Expand Down
6 changes: 3 additions & 3 deletionscoderd/workspacestats/batcher_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -63,7 +63,7 @@ func TestBatchStats(t *testing.T) {
// Given: a single data point is added for workspace
t2 := t1.Add(time.Second)
t.Logf("inserting 1 stat")
require.NoError(t,b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)

// When: it becomes time to report stats
// Signal a tick and wait for a flush to complete.
Expand All@@ -87,9 +87,9 @@ func TestBatchStats(t *testing.T) {
t.Logf("inserting %d stats", defaultBufferSize)
for i := 0; i < defaultBufferSize; i++ {
if i%2 == 0 {
require.NoError(t,b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randStats(t), false)
} else {
require.NoError(t,b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false))
b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randStats(t), false)
}
}
}()
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp