Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6301335

Browse files
committed
feat: add task status reporting load generator runner
1 parent8c9b24f commit6301335

File tree

6 files changed

+837
-0
lines changed

6 files changed

+837
-0
lines changed

‎scaletest/taskstatus/client.go‎

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package taskstatus
2+
3+
import (
4+
"context"
5+
6+
"github.com/google/uuid"
7+
8+
"cdr.dev/slog"
9+
"github.com/coder/coder/v2/codersdk"
10+
"github.com/coder/coder/v2/codersdk/agentsdk"
11+
)
12+
13+
// client abstracts the details of using codersdk.Client and agentsdk.Client
14+
// for the taskstatus runner. This interface allows for easier testing by enabling
15+
// mock implementations and provides a cleaner separation of concerns.
16+
//
17+
// The interface is designed to be initialized in two phases:
18+
// 1. Create the client with NewClient(coderClient)
19+
// 2. Configure logging when the io.Writer is available in Run()
20+
typeclientinterface {
21+
// WatchWorkspace watches for updates to a workspace.
22+
WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error)
23+
24+
// PatchAppStatus updates the status of a workspace app.
25+
PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error
26+
27+
// initialize sets up the client with the provided logger, which is only available after Run() is called.
28+
initialize(logger slog.Logger)
29+
}
30+
31+
// sdkClient is the concrete implementation of the client interface using
32+
// codersdk.Client and agentsdk.Client.
33+
typesdkClientstruct {
34+
coderClient*codersdk.Client
35+
agentClient*agentsdk.Client
36+
}
37+
38+
// newClient creates a new client implementation using the provided codersdk.Client.
39+
funcnewClient(coderClient*codersdk.Client)client {
40+
return&sdkClient{
41+
coderClient:coderClient,
42+
}
43+
}
44+
45+
func (c*sdkClient)WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error) {
46+
returnc.coderClient.WatchWorkspace(ctx,workspaceID)
47+
}
48+
49+
func (c*sdkClient)PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error {
50+
ifc.agentClient==nil {
51+
panic("agentClient not initialized - call initialize first")
52+
}
53+
returnc.agentClient.PatchAppStatus(ctx,req)
54+
}
55+
56+
func (c*sdkClient)initialize(logger slog.Logger) {
57+
// Configure the coder client logging
58+
c.coderClient.SetLogger(logger)
59+
c.coderClient.SetLogBodies(true)
60+
61+
// Create and configure the agent client with the same logging settings
62+
c.agentClient=agentsdk.New(
63+
c.coderClient.URL,
64+
agentsdk.WithFixedToken(c.coderClient.SessionTokenProvider.GetSessionToken()),
65+
codersdk.WithLogger(logger),
66+
codersdk.WithLogBodies(),
67+
)
68+
}
69+
70+
// Ensure sdkClient implements the client interface.
71+
var_client= (*sdkClient)(nil)

‎scaletest/taskstatus/config.go‎

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package taskstatus
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/google/uuid"
8+
"golang.org/x/xerrors"
9+
)
10+
11+
typeConfigstruct {
12+
// AgentID is the workspace agent ID to which to connect.
13+
AgentID uuid.UUID`json:"agent_id"`
14+
15+
// WorkspaceID is the workspace ID to watch.
16+
WorkspaceID uuid.UUID`json:"workspace_id"`
17+
18+
// AppSlug is the slug of the app designated as the AI Agent.
19+
AppSlugstring`json:"app_slug"`
20+
21+
// When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to
22+
// coordinate multiple runners from the higher layer.
23+
ConnectedWaitGroup*sync.WaitGroup`json:"-"`
24+
25+
// We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the
26+
// higher layer.
27+
StartReporingchanstruct{}`json:"-"`
28+
29+
// Time between reporting task statuses.
30+
ReportStatusPeriod time.Duration`json:"report_status_period"`
31+
32+
// Total time to report task statuses, starting from when we successfully read from the StartReporing channel.
33+
ReportStatusDuration time.Duration`json:"report_status_duration"`
34+
35+
Metrics*Metrics`json:"-"`
36+
MetricLabelValues []string`json:"metric_label_values"`
37+
}
38+
39+
func (c*Config)Validate()error {
40+
ifc.AgentID==uuid.Nil {
41+
returnxerrors.Errorf("validate agent_id: must not be nil")
42+
}
43+
44+
ifc.AppSlug=="" {
45+
returnxerrors.Errorf("validate app_slug: must not be empty")
46+
}
47+
48+
ifc.ConnectedWaitGroup==nil {
49+
returnxerrors.Errorf("validate connected_wait_group: must not be nil")
50+
}
51+
52+
ifc.StartReporing==nil {
53+
returnxerrors.Errorf("validate start_reporting: must not be nil")
54+
}
55+
56+
ifc.ReportStatusPeriod<=0 {
57+
returnxerrors.Errorf("validate report_status_period: must be greater than zero")
58+
}
59+
60+
ifc.ReportStatusDuration<=0 {
61+
returnxerrors.Errorf("validate report_status_duration: must be greater than zero")
62+
}
63+
64+
ifc.Metrics==nil {
65+
returnxerrors.Errorf("validate metrics: must not be nil")
66+
}
67+
68+
returnnil
69+
}

‎scaletest/taskstatus/metrics.go‎

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package taskstatus
2+
3+
import"github.com/prometheus/client_golang/prometheus"
4+
5+
typeMetricsstruct {
6+
TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec
7+
MissingStatusUpdatesTotal prometheus.CounterVec
8+
ReportTaskStatusErrorsTotal prometheus.CounterVec
9+
}
10+
11+
funcNewMetrics(reg prometheus.Registerer,labelNames...string)*Metrics {
12+
m:=&Metrics{
13+
TaskStatusToWorkspaceUpdateLatencySeconds:*prometheus.NewHistogramVec(prometheus.HistogramOpts{
14+
Namespace:"coderd",
15+
Subsystem:"scaletest",
16+
Name:"task_status_to_workspace_update_latency_seconds",
17+
Help:"Time in seconds between reporting a task status and receiving the workspace update.",
18+
},labelNames),
19+
MissingStatusUpdatesTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{
20+
Namespace:"coderd",
21+
Subsystem:"scaletest",
22+
Name:"missing_status_updates_total",
23+
Help:"Total number of missing status updates.",
24+
},labelNames),
25+
ReportTaskStatusErrorsTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{
26+
Namespace:"coderd",
27+
Subsystem:"scaletest",
28+
Name:"report_task_status_errors_total",
29+
Help:"Total number of errors when reporting task status.",
30+
},labelNames),
31+
}
32+
reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds)
33+
reg.MustRegister(m.MissingStatusUpdatesTotal)
34+
reg.MustRegister(m.ReportTaskStatusErrorsTotal)
35+
returnm
36+
}

‎scaletest/taskstatus/run.go‎

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package taskstatus
2+
3+
import (
4+
"context"
5+
"io"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"golang.org/x/xerrors"
12+
13+
"cdr.dev/slog"
14+
"cdr.dev/slog/sloggers/sloghuman"
15+
16+
"github.com/coder/coder/v2/codersdk"
17+
"github.com/coder/coder/v2/codersdk/agentsdk"
18+
"github.com/coder/coder/v2/scaletest/harness"
19+
"github.com/coder/coder/v2/scaletest/loadtestutil"
20+
"github.com/coder/quartz"
21+
)
22+
23+
conststatusUpdatePrefix="scaletest status update:"
24+
25+
typeRunnerstruct {
26+
clientclient
27+
cfgConfig
28+
29+
logger slog.Logger
30+
31+
mu sync.Mutex
32+
reportTimesmap[int]time.Time
33+
doneReportingbool
34+
35+
// testing only
36+
clock quartz.Clock
37+
}
38+
39+
var_ harness.Runnable=&Runner{}
40+
41+
// NewRunner creates a new Runner with the provided codersdk.Client and configuration.
42+
funcNewRunner(coderClient*codersdk.Client,cfgConfig)*Runner {
43+
return&Runner{
44+
client:newClient(coderClient),
45+
cfg:cfg,
46+
clock:quartz.NewReal(),
47+
reportTimes:make(map[int]time.Time),
48+
}
49+
}
50+
51+
func (r*Runner)Run(ctx context.Context,namestring,logs io.Writer)error {
52+
logs=loadtestutil.NewSyncWriter(logs)
53+
r.logger=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name)
54+
r.client.initialize(r.logger)
55+
56+
// ensure these labels are initialized, so we see the time series right away in prometheus.
57+
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
58+
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
59+
60+
workspaceUpdatesCtx,cancelWorkspaceUpdates:=context.WithCancel(ctx)
61+
defercancelWorkspaceUpdates()
62+
workspaceUpdatesResult:=make(chanerror,1)
63+
gofunc() {
64+
workspaceUpdatesResult<-r.watchWorkspaceUpdates(workspaceUpdatesCtx)
65+
}()
66+
67+
err:=r.reportTaskStatus(ctx)
68+
iferr!=nil {
69+
returnxerrors.Errorf("report task status: %w",err)
70+
}
71+
72+
err=<-workspaceUpdatesResult
73+
iferr!=nil {
74+
returnxerrors.Errorf("watch workspace: %w",err)
75+
}
76+
returnnil
77+
}
78+
79+
func (r*Runner)watchWorkspaceUpdates(ctx context.Context)error {
80+
updates,err:=r.client.WatchWorkspace(ctx,r.cfg.WorkspaceID)
81+
iferr!=nil {
82+
returnxerrors.Errorf("watch workspace: %w",err)
83+
}
84+
r.cfg.ConnectedWaitGroup.Done()
85+
deferfunc() {
86+
r.mu.Lock()
87+
deferr.mu.Unlock()
88+
r.cfg.Metrics.MissingStatusUpdatesTotal.
89+
WithLabelValues(r.cfg.MetricLabelValues...).
90+
Add(float64(len(r.reportTimes)))
91+
}()
92+
for {
93+
select {
94+
case<-ctx.Done():
95+
returnctx.Err()
96+
caseworkspace:=<-updates:
97+
ifworkspace.LatestAppStatus==nil {
98+
continue
99+
}
100+
msgNo,ok:=parseStatusMessage(workspace.LatestAppStatus.Message)
101+
if!ok {
102+
continue
103+
}
104+
105+
r.mu.Lock()
106+
reportTime,ok:=r.reportTimes[msgNo]
107+
delete(r.reportTimes,msgNo)
108+
allDone:=r.doneReporting&&len(r.reportTimes)==0
109+
r.mu.Unlock()
110+
111+
if!ok {
112+
returnxerrors.Errorf("report time not found for message %d",msgNo)
113+
}
114+
latency:=r.clock.Since(reportTime,"watchWorkspaceUpdates")
115+
r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds.
116+
WithLabelValues(r.cfg.MetricLabelValues...).
117+
Observe(latency.Seconds())
118+
ifallDone {
119+
returnnil
120+
}
121+
}
122+
}
123+
}
124+
125+
func (r*Runner)reportTaskStatus(ctx context.Context)error {
126+
deferfunc() {
127+
r.mu.Lock()
128+
deferr.mu.Unlock()
129+
r.doneReporting=true
130+
}()
131+
132+
select {
133+
case<-ctx.Done():
134+
returnctx.Err()
135+
case<-r.cfg.StartReporing:
136+
r.logger.Info(ctx,"starting to report task status")
137+
}
138+
startedReporting:=r.clock.Now("reportTaskStatus","startedReporting")
139+
msgNo:=0
140+
141+
done:=xerrors.New("done reporting task status")// sentinal error
142+
waiter:=r.clock.TickerFunc(ctx,r.cfg.ReportStatusPeriod,func()error {
143+
r.mu.Lock()
144+
now:=r.clock.Now("reportTaskStatus","tick")
145+
r.reportTimes[msgNo]=now
146+
// It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine
147+
// needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but
148+
// it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the
149+
// report status duration, so one extra tick is not a big deal.
150+
ifnow.After(startedReporting.Add(r.cfg.ReportStatusDuration)) {
151+
r.doneReporting=true
152+
}
153+
r.mu.Unlock()
154+
155+
err:=r.client.PatchAppStatus(ctx, agentsdk.PatchAppStatus{
156+
AppSlug:r.cfg.AppSlug,
157+
Message:statusUpdatePrefix+strconv.Itoa(msgNo),
158+
State:codersdk.WorkspaceAppStatusStateWorking,
159+
URI:"https://example.com/example-status/",
160+
})
161+
iferr!=nil {
162+
r.logger.Error(ctx,"failed to report task status",slog.Error(err))
163+
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc()
164+
}
165+
msgNo++
166+
// note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets
167+
// it.
168+
ifr.doneReporting {
169+
returndone// causes the ticker to exit due to the sentinel error
170+
}
171+
returnnil
172+
},"reportTaskStatus")
173+
err:=waiter.Wait()
174+
ifxerrors.Is(err,done) {
175+
returnnil
176+
}
177+
returnerr
178+
}
179+
180+
funcparseStatusMessage(messagestring) (int,bool) {
181+
if!strings.HasPrefix(message,statusUpdatePrefix) {
182+
return0,false
183+
}
184+
message=strings.TrimPrefix(message,statusUpdatePrefix)
185+
msgNo,err:=strconv.Atoi(message)
186+
iferr!=nil {
187+
return0,false
188+
}
189+
returnmsgNo,true
190+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp