- Notifications
You must be signed in to change notification settings - Fork1.1k
feat: add task status reporting load generator runner#20538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| package taskstatus | ||
| import ( | ||
| "context" | ||
| "github.com/google/uuid" | ||
| "cdr.dev/slog" | ||
| "github.com/coder/coder/v2/codersdk" | ||
| "github.com/coder/coder/v2/codersdk/agentsdk" | ||
| ) | ||
| // client abstracts the details of using codersdk.Client and agentsdk.Client | ||
| // for the taskstatus runner. This interface allows for easier testing by enabling | ||
| // mock implementations and provides a cleaner separation of concerns. | ||
| // | ||
| // The interface is designed to be initialized in two phases: | ||
| // 1. Create the client with NewClient(coderClient) | ||
| // 2. Configure logging when the io.Writer is available in Run() | ||
| typeclientinterface { | ||
| // WatchWorkspace watches for updates to a workspace. | ||
| WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error) | ||
| // PatchAppStatus updates the status of a workspace app. | ||
| PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error | ||
| // initialize sets up the client with the provided logger, which is only available after Run() is called. | ||
| initialize(logger slog.Logger) | ||
| } | ||
| // sdkClient is the concrete implementation of the client interface using | ||
| // codersdk.Client and agentsdk.Client. | ||
| typesdkClientstruct { | ||
| coderClient*codersdk.Client | ||
| agentClient*agentsdk.Client | ||
| } | ||
| // newClient creates a new client implementation using the provided codersdk.Client. | ||
| funcnewClient(coderClient*codersdk.Client)client { | ||
| return&sdkClient{ | ||
| coderClient:coderClient, | ||
| } | ||
| } | ||
| func (c*sdkClient)WatchWorkspace(ctx context.Context,workspaceID uuid.UUID) (<-chan codersdk.Workspace,error) { | ||
| returnc.coderClient.WatchWorkspace(ctx,workspaceID) | ||
| } | ||
| func (c*sdkClient)PatchAppStatus(ctx context.Context,req agentsdk.PatchAppStatus)error { | ||
| ifc.agentClient==nil { | ||
| panic("agentClient not initialized - call initialize first") | ||
| } | ||
| returnc.agentClient.PatchAppStatus(ctx,req) | ||
| } | ||
| func (c*sdkClient)initialize(logger slog.Logger) { | ||
| // Configure the coder client logging | ||
| c.coderClient.SetLogger(logger) | ||
| c.coderClient.SetLogBodies(true) | ||
| // Create and configure the agent client with the same logging settings | ||
| c.agentClient=agentsdk.New( | ||
| c.coderClient.URL, | ||
| agentsdk.WithFixedToken(c.coderClient.SessionTokenProvider.GetSessionToken()), | ||
| codersdk.WithLogger(logger), | ||
| codersdk.WithLogBodies(), | ||
| ) | ||
| } | ||
| // Ensure sdkClient implements the client interface. | ||
| var_client= (*sdkClient)(nil) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,69 @@ | ||||||
| package taskstatus | ||||||
| import ( | ||||||
| "sync" | ||||||
| "time" | ||||||
| "github.com/google/uuid" | ||||||
| "golang.org/x/xerrors" | ||||||
| ) | ||||||
| type Config struct { | ||||||
| // AgentID is the workspace agent ID to which to connect. | ||||||
| AgentID uuid.UUID `json:"agent_id"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Looks unused since task statuses are reported per-workspace There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Good catch. It's actually worse than this though. The API route to report status only works with an Agent Token; a regular API token won't work. So, we need some way to either run this in the workspace or otherwise get ahold of the key. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I don't recall the details of how external workspaces work - could we just register ourselves as an external workspace? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. | ||||||
| // WorkspaceID is the workspace ID to watch. | ||||||
| WorkspaceID uuid.UUID `json:"workspace_id"` | ||||||
| // AppSlug is the slug of the app designated as the AI Agent. | ||||||
| AppSlug string `json:"app_slug"` | ||||||
| // When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to | ||||||
| // coordinate multiple runners from the higher layer. | ||||||
| ConnectedWaitGroup *sync.WaitGroup `json:"-"` | ||||||
| // We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the | ||||||
| // higher layer. | ||||||
| StartReporting chan struct{} `json:"-"` | ||||||
| // Time between reporting task statuses. | ||||||
| ReportStatusPeriod time.Duration `json:"report_status_period"` | ||||||
| // Total time to report task statuses, starting from when we successfully read from the StartReporing channel. | ||||||
| ReportStatusDuration time.Duration `json:"report_status_duration"` | ||||||
| Metrics *Metrics `json:"-"` | ||||||
| MetricLabelValues []string `json:"metric_label_values"` | ||||||
| } | ||||||
| func (c *Config) Validate() error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Looks unused | ||||||
| if c.AgentID == uuid.Nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Suggested change
| ||||||
| return xerrors.Errorf("validate agent_id: must not be nil") | ||||||
| } | ||||||
| if c.AppSlug == "" { | ||||||
| return xerrors.Errorf("validate app_slug: must not be empty") | ||||||
| } | ||||||
| if c.ConnectedWaitGroup == nil { | ||||||
| return xerrors.Errorf("validate connected_wait_group: must not be nil") | ||||||
| } | ||||||
| if c.StartReporting == nil { | ||||||
| return xerrors.Errorf("validate start_reporting: must not be nil") | ||||||
| } | ||||||
| if c.ReportStatusPeriod <= 0 { | ||||||
| return xerrors.Errorf("validate report_status_period: must be greater than zero") | ||||||
| } | ||||||
| if c.ReportStatusDuration <= 0 { | ||||||
| return xerrors.Errorf("validate report_status_duration: must be greater than zero") | ||||||
| } | ||||||
| if c.Metrics == nil { | ||||||
| return xerrors.Errorf("validate metrics: must not be nil") | ||||||
| } | ||||||
| return nil | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| package taskstatus | ||
| import"github.com/prometheus/client_golang/prometheus" | ||
| typeMetricsstruct { | ||
| TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec | ||
| MissingStatusUpdatesTotal prometheus.CounterVec | ||
| ReportTaskStatusErrorsTotal prometheus.CounterVec | ||
| } | ||
| funcNewMetrics(reg prometheus.Registerer,labelNames...string)*Metrics { | ||
| m:=&Metrics{ | ||
| TaskStatusToWorkspaceUpdateLatencySeconds:*prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
| Namespace:"coderd", | ||
| Subsystem:"scaletest", | ||
| Name:"task_status_to_workspace_update_latency_seconds", | ||
| Help:"Time in seconds between reporting a task status and receiving the workspace update.", | ||
| },labelNames), | ||
| MissingStatusUpdatesTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace:"coderd", | ||
| Subsystem:"scaletest", | ||
| Name:"missing_status_updates_total", | ||
| Help:"Total number of missing status updates.", | ||
| },labelNames), | ||
| ReportTaskStatusErrorsTotal:*prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace:"coderd", | ||
| Subsystem:"scaletest", | ||
| Name:"report_task_status_errors_total", | ||
| Help:"Total number of errors when reporting task status.", | ||
| },labelNames), | ||
| } | ||
| reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds) | ||
| reg.MustRegister(m.MissingStatusUpdatesTotal) | ||
| reg.MustRegister(m.ReportTaskStatusErrorsTotal) | ||
| returnm | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| package taskstatus | ||
| import ( | ||
| "context" | ||
| "io" | ||
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
| "golang.org/x/xerrors" | ||
| "cdr.dev/slog" | ||
| "cdr.dev/slog/sloggers/sloghuman" | ||
| "github.com/coder/coder/v2/codersdk" | ||
| "github.com/coder/coder/v2/codersdk/agentsdk" | ||
| "github.com/coder/coder/v2/scaletest/harness" | ||
| "github.com/coder/coder/v2/scaletest/loadtestutil" | ||
| "github.com/coder/quartz" | ||
| ) | ||
| const statusUpdatePrefix = "scaletest status update:" | ||
| type Runner struct { | ||
| client client | ||
| cfg Config | ||
| logger slog.Logger | ||
| mu sync.Mutex | ||
| reportTimes map[int]time.Time | ||
| doneReporting bool | ||
| // testing only | ||
| clock quartz.Clock | ||
| } | ||
| var _ harness.Runnable = &Runner{} | ||
| // NewRunner creates a new Runner with the provided codersdk.Client and configuration. | ||
| func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner { | ||
| return &Runner{ | ||
| client: newClient(coderClient), | ||
| cfg: cfg, | ||
| clock: quartz.NewReal(), | ||
| reportTimes: make(map[int]time.Time), | ||
| } | ||
| } | ||
| func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error { | ||
| logs = loadtestutil.NewSyncWriter(logs) | ||
| r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name) | ||
| r.client.initialize(r.logger) | ||
| // ensure these labels are initialized, so we see the time series right away in prometheus. | ||
| r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) | ||
| r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0) | ||
| workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx) | ||
| defer cancelWorkspaceUpdates() | ||
| workspaceUpdatesResult := make(chan error, 1) | ||
| go func() { | ||
| workspaceUpdatesResult <- r.watchWorkspaceUpdates(workspaceUpdatesCtx) | ||
| }() | ||
| err := r.reportTaskStatus(ctx) | ||
| if err != nil { | ||
| return xerrors.Errorf("report task status: %w", err) | ||
| } | ||
| err = <-workspaceUpdatesResult | ||
| if err != nil { | ||
| return xerrors.Errorf("watch workspace: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
| func (r *Runner) watchWorkspaceUpdates(ctx context.Context) error { | ||
| updates, err := r.client.WatchWorkspace(ctx, r.cfg.WorkspaceID) | ||
| if err != nil { | ||
| return xerrors.Errorf("watch workspace: %w", err) | ||
| } | ||
| r.cfg.ConnectedWaitGroup.Done() | ||
| defer func() { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| r.cfg.Metrics.MissingStatusUpdatesTotal. | ||
| WithLabelValues(r.cfg.MetricLabelValues...). | ||
| Add(float64(len(r.reportTimes))) | ||
| }() | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case workspace := <-updates: | ||
| if workspace.LatestAppStatus == nil { | ||
| continue | ||
| } | ||
| msgNo, ok := parseStatusMessage(workspace.LatestAppStatus.Message) | ||
| if !ok { | ||
| continue | ||
| } | ||
| r.mu.Lock() | ||
| reportTime, ok := r.reportTimes[msgNo] | ||
| delete(r.reportTimes, msgNo) | ||
| allDone := r.doneReporting && len(r.reportTimes) == 0 | ||
| r.mu.Unlock() | ||
| if !ok { | ||
| return xerrors.Errorf("report time not found for message %d", msgNo) | ||
| } | ||
| latency := r.clock.Since(reportTime, "watchWorkspaceUpdates") | ||
| r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds. | ||
| WithLabelValues(r.cfg.MetricLabelValues...). | ||
| Observe(latency.Seconds()) | ||
| if allDone { | ||
| return nil | ||
| } | ||
| } | ||
| } | ||
| } | ||
| func (r *Runner) reportTaskStatus(ctx context.Context) error { | ||
| defer func() { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| r.doneReporting = true | ||
| }() | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-r.cfg.StartReporting: | ||
| r.logger.Info(ctx, "starting to report task status") | ||
Comment on lines +133 to +136 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. TheRun Sequence calls for waiting to establish a baseline after all updates websockets are connected before we start sending status reports. That's the purpose of this | ||
| } | ||
| startedReporting := r.clock.Now("reportTaskStatus", "startedReporting") | ||
| msgNo := 0 | ||
| done := xerrors.New("done reporting task status") // sentinel error | ||
| waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error { | ||
| r.mu.Lock() | ||
| now := r.clock.Now("reportTaskStatus", "tick") | ||
| r.reportTimes[msgNo] = now | ||
| // It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine | ||
| // needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but | ||
| // it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the | ||
| // report status duration, so one extra tick is not a big deal. | ||
| if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) { | ||
| r.doneReporting = true | ||
| } | ||
| r.mu.Unlock() | ||
| err := r.client.PatchAppStatus(ctx, agentsdk.PatchAppStatus{ | ||
| AppSlug: r.cfg.AppSlug, | ||
| Message: statusUpdatePrefix + strconv.Itoa(msgNo), | ||
| State: codersdk.WorkspaceAppStatusStateWorking, | ||
| URI: "https://example.com/example-status/", | ||
| }) | ||
spikecurtis marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| if err != nil { | ||
| r.logger.Error(ctx, "failed to report task status", slog.Error(err)) | ||
| r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc() | ||
| } | ||
| msgNo++ | ||
| // note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets | ||
| // it. | ||
| if r.doneReporting { | ||
| return done // causes the ticker to exit due to the sentinel error | ||
| } | ||
| return nil | ||
| }, "reportTaskStatus") | ||
| err := waiter.Wait() | ||
| if xerrors.Is(err, done) { | ||
| return nil | ||
| } | ||
| return err | ||
| } | ||
| func parseStatusMessage(message string) (int, bool) { | ||
| if !strings.HasPrefix(message, statusUpdatePrefix) { | ||
| return 0, false | ||
| } | ||
| message = strings.TrimPrefix(message, statusUpdatePrefix) | ||
| msgNo, err := strconv.Atoi(message) | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return msgNo, true | ||
| } | ||
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.