Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: add task status reporting load generator runner#20538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
spikecurtis wants to merge1 commit intomain
base:main
Choose a base branch
Loading
fromspike/internal-913-taskstatus-runner
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletionsscaletest/taskstatus/client.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
package taskstatus

import (
"context"

"github.com/google/uuid"

"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)

// client abstracts the details of using codersdk.Client and agentsdk.Client
// for the taskstatus runner. This interface allows for easier testing by enabling
// mock implementations and provides a cleaner separation of concerns.
//
// The interface is designed to be initialized in two phases:
// 1. Create the client with NewClient(coderClient)
// 2. Configure logging when the io.Writer is available in Run()
typeclientinterface {
// WatchWorkspace watches for updates to a workspace.
WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error)

// PatchAppStatus updates the status of a workspace app.
PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error

// initialize sets up the client with the provided logger, which is only available after Run() is called.
initialize(logger slog.Logger)
}

// sdkClient is the concrete implementation of the client interface using
// codersdk.Client and agentsdk.Client.
typesdkClientstruct {
coderClient*codersdk.Client
agentClient*agentsdk.Client
}

// newClient creates a new client implementation using the provided codersdk.Client.
funcnewClient(coderClient*codersdk.Client)client {
return&sdkClient{
coderClient:coderClient,
}
}

func (c*sdkClient)WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error) {
returnc.coderClient.WatchWorkspace(ctx,workspaceID)
}

func (c*sdkClient)PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error {
ifc.agentClient==nil {
panic("agentClient not initialized - call initialize first")
}
returnc.agentClient.PatchAppStatus(ctx,req)
}

func (c*sdkClient)initialize(logger slog.Logger) {
// Configure the coder client logging
c.coderClient.SetLogger(logger)
c.coderClient.SetLogBodies(true)

// Create and configure the agent client with the same logging settings
c.agentClient=agentsdk.New(
c.coderClient.URL,
agentsdk.WithFixedToken(c.coderClient.SessionTokenProvider.GetSessionToken()),
codersdk.WithLogger(logger),
codersdk.WithLogBodies(),
)
}

// Ensure sdkClient implements the client interface.
var_client= (*sdkClient)(nil)
69 changes: 69 additions & 0 deletionsscaletest/taskstatus/config.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
package taskstatus

import (
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/xerrors"
)

type Config struct {
// AgentID is the workspace agent ID to which to connect.
AgentID uuid.UUID `json:"agent_id"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Looks unused since task statuses are reported per-workspace

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Good catch.

It's actually worse than this though. The API route to report status only works with an Agent Token; a regular API token won't work. So, we need some way to either run this in the workspace or otherwise get ahold of the key.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't recall the details of how external workspaces work - could we just register ourselves as an external workspace?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.


// WorkspaceID is the workspace ID to watch.
WorkspaceID uuid.UUID `json:"workspace_id"`

// AppSlug is the slug of the app designated as the AI Agent.
AppSlug string `json:"app_slug"`

// When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to
// coordinate multiple runners from the higher layer.
ConnectedWaitGroup *sync.WaitGroup `json:"-"`

// We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the
// higher layer.
StartReporting chan struct{} `json:"-"`

// Time between reporting task statuses.
ReportStatusPeriod time.Duration `json:"report_status_period"`

// Total time to report task statuses, starting from when we successfully read from the StartReporing channel.
ReportStatusDuration time.Duration `json:"report_status_duration"`

Metrics *Metrics `json:"-"`
MetricLabelValues []string `json:"metric_label_values"`
}

func (c *Config) Validate() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Looks unused

if c.AgentID == uuid.Nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
ifc.AgentID== uuid.Nil {
ifc.WorkspaceID== uuid.Nil {

return xerrors.Errorf("validate agent_id: must not be nil")
}

if c.AppSlug == "" {
return xerrors.Errorf("validate app_slug: must not be empty")
}

if c.ConnectedWaitGroup == nil {
return xerrors.Errorf("validate connected_wait_group: must not be nil")
}

if c.StartReporting == nil {
return xerrors.Errorf("validate start_reporting: must not be nil")
}

if c.ReportStatusPeriod <= 0 {
return xerrors.Errorf("validate report_status_period: must be greater than zero")
}

if c.ReportStatusDuration <= 0 {
return xerrors.Errorf("validate report_status_duration: must be greater than zero")
}

if c.Metrics == nil {
return xerrors.Errorf("validate metrics: must not be nil")
}

return nil
}
36 changes: 36 additions & 0 deletionsscaletest/taskstatus/metrics.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
package taskstatus

import"github.com/prometheus/client_golang/prometheus"

typeMetricsstruct {
TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec
MissingStatusUpdatesTotal prometheus.CounterVec
ReportTaskStatusErrorsTotal prometheus.CounterVec
}

funcNewMetrics(reg prometheus.Registerer,labelNames...string)*Metrics {
m:=&Metrics{
TaskStatusToWorkspaceUpdateLatencySeconds:*prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace:"coderd",
Subsystem:"scaletest",
Name:"task_status_to_workspace_update_latency_seconds",
Help:"Time in seconds between reporting a task status and receiving the workspace update.",
},labelNames),
MissingStatusUpdatesTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace:"coderd",
Subsystem:"scaletest",
Name:"missing_status_updates_total",
Help:"Total number of missing status updates.",
},labelNames),
ReportTaskStatusErrorsTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace:"coderd",
Subsystem:"scaletest",
Name:"report_task_status_errors_total",
Help:"Total number of errors when reporting task status.",
},labelNames),
}
reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds)
reg.MustRegister(m.MissingStatusUpdatesTotal)
reg.MustRegister(m.ReportTaskStatusErrorsTotal)
returnm
}
190 changes: 190 additions & 0 deletionsscaletest/taskstatus/run.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
package taskstatus

import (
"context"
"io"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/xerrors"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/scaletest/harness"
"github.com/coder/coder/v2/scaletest/loadtestutil"
"github.com/coder/quartz"
)

const statusUpdatePrefix = "scaletest status update:"

type Runner struct {
client client
cfg Config

logger slog.Logger

mu sync.Mutex
reportTimes map[int]time.Time
doneReporting bool

// testing only
clock quartz.Clock
}

var _ harness.Runnable = &Runner{}

// NewRunner creates a new Runner with the provided codersdk.Client and configuration.
func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: newClient(coderClient),
cfg: cfg,
clock: quartz.NewReal(),
reportTimes: make(map[int]time.Time),
}
}

func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error {
logs = loadtestutil.NewSyncWriter(logs)
r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name)
r.client.initialize(r.logger)

// ensure these labels are initialized, so we see the time series right away in prometheus.
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)

workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx)
defer cancelWorkspaceUpdates()
workspaceUpdatesResult := make(chan error, 1)
go func() {
workspaceUpdatesResult <- r.watchWorkspaceUpdates(workspaceUpdatesCtx)
}()

err := r.reportTaskStatus(ctx)
if err != nil {
return xerrors.Errorf("report task status: %w", err)
}

err = <-workspaceUpdatesResult
if err != nil {
return xerrors.Errorf("watch workspace: %w", err)
}
return nil
}

func (r *Runner) watchWorkspaceUpdates(ctx context.Context) error {
updates, err := r.client.WatchWorkspace(ctx, r.cfg.WorkspaceID)
if err != nil {
return xerrors.Errorf("watch workspace: %w", err)
}
r.cfg.ConnectedWaitGroup.Done()
defer func() {
r.mu.Lock()
defer r.mu.Unlock()
r.cfg.Metrics.MissingStatusUpdatesTotal.
WithLabelValues(r.cfg.MetricLabelValues...).
Add(float64(len(r.reportTimes)))
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case workspace := <-updates:
if workspace.LatestAppStatus == nil {
continue
}
msgNo, ok := parseStatusMessage(workspace.LatestAppStatus.Message)
if !ok {
continue
}

r.mu.Lock()
reportTime, ok := r.reportTimes[msgNo]
delete(r.reportTimes, msgNo)
allDone := r.doneReporting && len(r.reportTimes) == 0
r.mu.Unlock()

if !ok {
return xerrors.Errorf("report time not found for message %d", msgNo)
}
latency := r.clock.Since(reportTime, "watchWorkspaceUpdates")
r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds.
WithLabelValues(r.cfg.MetricLabelValues...).
Observe(latency.Seconds())
if allDone {
return nil
}
}
}
}

func (r *Runner) reportTaskStatus(ctx context.Context) error {
defer func() {
r.mu.Lock()
defer r.mu.Unlock()
r.doneReporting = true
}()

select {
case <-ctx.Done():
return ctx.Err()
case <-r.cfg.StartReporting:
r.logger.Info(ctx, "starting to report task status")
Comment on lines +133 to +136

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IfwatchWorkspaceUpdates always callsDone on the waitgroup, even if it fails, we wouldn't need this timeout right? Then you could get rid of theStartReporting channel, and just have this function wait on the waitgroup.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

TheRun Sequence calls for waiting to establish a baseline after all updates websockets are connected before we start sending status reports. That's the purpose of thisStartReporting channel: it allows the calling code to wait for the baseline, then start all the status reports.

}
startedReporting := r.clock.Now("reportTaskStatus", "startedReporting")
msgNo := 0

done := xerrors.New("done reporting task status") // sentinel error
waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error {
r.mu.Lock()
now := r.clock.Now("reportTaskStatus", "tick")
r.reportTimes[msgNo] = now
// It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine
// needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but
// it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the
// report status duration, so one extra tick is not a big deal.
if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) {
r.doneReporting = true
}
r.mu.Unlock()

err := r.client.PatchAppStatus(ctx, agentsdk.PatchAppStatus{
AppSlug: r.cfg.AppSlug,
Message: statusUpdatePrefix + strconv.Itoa(msgNo),
State: codersdk.WorkspaceAppStatusStateWorking,
URI: "https://example.com/example-status/",
})
if err != nil {
r.logger.Error(ctx, "failed to report task status", slog.Error(err))
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc()
}
msgNo++
// note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets
// it.
if r.doneReporting {
return done // causes the ticker to exit due to the sentinel error
}
return nil
}, "reportTaskStatus")
err := waiter.Wait()
if xerrors.Is(err, done) {
return nil
}
return err
}

func parseStatusMessage(message string) (int, bool) {
if !strings.HasPrefix(message, statusUpdatePrefix) {
return 0, false
}
message = strings.TrimPrefix(message, statusUpdatePrefix)
msgNo, err := strconv.Atoi(message)
if err != nil {
return 0, false
}
return msgNo, true
}
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp