Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit05f8f67

Browse files
authored
feat(scaletest): add runner for notifications delivery (#20091)
Relates tocoder/internal#910This PR adds a scaletest runner that simulates users receiving notifications through WebSocket connections.An instance of this notification runner does the following:1. Creates a user (optionally with specific roles like owner).2. Connects to /api/v2/notifications/inbox/watch via WebSocket to receive notifications in real-time.3. Waits for all other concurrently executing runners (per the DialBarrier WaitGroup) to also connect their websockets.4. For receiving users: Watches the WebSocket for expected notifications and records delivery latency for each notification type.5. For regular users: Maintains WebSocket connections to simulate concurrent load while receiving users wait for notifications.6. Waits on the ReceivingWatchBarrier to coordinate between receiving and regular users.7. Cleans up the created user after the test completes.Exposes three prometheus metrics:1. notification_delivery_latency_seconds - HistogramVec. Labels = {username, notification_type}2. notification_delivery_errors_total - CounterVec. Labels = {username, action}3. notification_delivery_missed_total - CounterVec. Labels = {username}The runner measures end-to-end notification latency from when a notification-triggering event occurs (e.g., user creation/deletion) to when the notification is received by a WebSocket client.
1 parent156f985 commit05f8f67

File tree

4 files changed

+613
-0
lines changed

4 files changed

+613
-0
lines changed

‎scaletest/notifications/config.go‎

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package notifications
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"golang.org/x/xerrors"
8+
9+
"github.com/google/uuid"
10+
11+
"github.com/coder/coder/v2/scaletest/createusers"
12+
)
13+
14+
typeConfigstruct {
15+
// User is the configuration for the user to create.
16+
User createusers.Config`json:"user"`
17+
18+
// Roles are the roles to assign to the user.
19+
Roles []string`json:"roles"`
20+
21+
// NotificationTimeout is how long to wait for notifications after triggering.
22+
NotificationTimeout time.Duration`json:"notification_timeout"`
23+
24+
// DialTimeout is how long to wait for websocket connection.
25+
DialTimeout time.Duration`json:"dial_timeout"`
26+
27+
// ExpectedNotifications maps notification template IDs to channels
28+
// that receive the trigger time for each notification.
29+
ExpectedNotificationsmap[uuid.UUID]chan time.Time`json:"-"`
30+
31+
Metrics*Metrics`json:"-"`
32+
33+
// DialBarrier ensures all runners are connected before notifications are triggered.
34+
DialBarrier*sync.WaitGroup`json:"-"`
35+
36+
// ReceivingWatchBarrier is the barrier for receiving users. Regular users wait on this to disconnect after receiving users complete.
37+
ReceivingWatchBarrier*sync.WaitGroup`json:"-"`
38+
}
39+
40+
func (cConfig)Validate()error {
41+
// The runner always needs an org; ensure we propagate it into the user config.
42+
ifc.User.OrganizationID==uuid.Nil {
43+
returnxerrors.New("user organization_id must be set")
44+
}
45+
46+
iferr:=c.User.Validate();err!=nil {
47+
returnxerrors.Errorf("user config: %w",err)
48+
}
49+
50+
ifc.DialBarrier==nil {
51+
returnxerrors.New("dial barrier must be set")
52+
}
53+
54+
ifc.ReceivingWatchBarrier==nil {
55+
returnxerrors.New("receiving_watch_barrier must be set")
56+
}
57+
58+
ifc.NotificationTimeout<=0 {
59+
returnxerrors.New("notification_timeout must be greater than 0")
60+
}
61+
62+
ifc.DialTimeout<=0 {
63+
returnxerrors.New("dial_timeout must be greater than 0")
64+
}
65+
66+
ifc.Metrics==nil {
67+
returnxerrors.New("metrics must be set")
68+
}
69+
70+
returnnil
71+
}

‎scaletest/notifications/metrics.go‎

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package notifications
2+
3+
import (
4+
"time"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
typeMetricsstruct {
10+
notificationLatency*prometheus.HistogramVec
11+
notificationErrors*prometheus.CounterVec
12+
missedNotifications*prometheus.CounterVec
13+
}
14+
15+
funcNewMetrics(reg prometheus.Registerer)*Metrics {
16+
ifreg==nil {
17+
reg=prometheus.DefaultRegisterer
18+
}
19+
20+
latency:=prometheus.NewHistogramVec(prometheus.HistogramOpts{
21+
Namespace:"coderd",
22+
Subsystem:"scaletest",
23+
Name:"notification_delivery_latency_seconds",
24+
Help:"Time between notification-creating action and receipt of notification by client",
25+
}, []string{"username","notification_type"})
26+
errors:=prometheus.NewCounterVec(prometheus.CounterOpts{
27+
Namespace:"coderd",
28+
Subsystem:"scaletest",
29+
Name:"notification_delivery_errors_total",
30+
Help:"Total number of notification delivery errors",
31+
}, []string{"username","action"})
32+
missed:=prometheus.NewCounterVec(prometheus.CounterOpts{
33+
Namespace:"coderd",
34+
Subsystem:"scaletest",
35+
Name:"notification_delivery_missed_total",
36+
Help:"Total number of missed notifications",
37+
}, []string{"username"})
38+
39+
reg.MustRegister(latency,errors,missed)
40+
41+
return&Metrics{
42+
notificationLatency:latency,
43+
notificationErrors:errors,
44+
missedNotifications:missed,
45+
}
46+
}
47+
48+
func (m*Metrics)RecordLatency(latency time.Duration,username,notificationTypestring) {
49+
m.notificationLatency.WithLabelValues(username,notificationType).Observe(latency.Seconds())
50+
}
51+
52+
func (m*Metrics)AddError(username,actionstring) {
53+
m.notificationErrors.WithLabelValues(username,action).Inc()
54+
}
55+
56+
func (m*Metrics)RecordMissed(usernamestring) {
57+
m.missedNotifications.WithLabelValues(username).Inc()
58+
}

‎scaletest/notifications/run.go‎

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"golang.org/x/xerrors"
13+
14+
"cdr.dev/slog"
15+
"cdr.dev/slog/sloggers/sloghuman"
16+
17+
"github.com/coder/coder/v2/coderd/tracing"
18+
"github.com/coder/coder/v2/codersdk"
19+
"github.com/coder/coder/v2/scaletest/createusers"
20+
"github.com/coder/coder/v2/scaletest/harness"
21+
"github.com/coder/coder/v2/scaletest/loadtestutil"
22+
"github.com/coder/websocket"
23+
)
24+
25+
typeRunnerstruct {
26+
client*codersdk.Client
27+
cfgConfig
28+
29+
createUserRunner*createusers.Runner
30+
31+
// notificationLatencies stores the latency for each notification type
32+
notificationLatenciesmap[uuid.UUID]time.Duration
33+
}
34+
35+
funcNewRunner(client*codersdk.Client,cfgConfig)*Runner {
36+
return&Runner{
37+
client:client,
38+
cfg:cfg,
39+
notificationLatencies:make(map[uuid.UUID]time.Duration),
40+
}
41+
}
42+
43+
var (
44+
_ harness.Runnable=&Runner{}
45+
_ harness.Cleanable=&Runner{}
46+
_ harness.Collectable=&Runner{}
47+
)
48+
49+
func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error {
50+
ctx,span:=tracing.StartSpan(ctx)
51+
deferspan.End()
52+
53+
reachedBarrier:=false
54+
deferfunc() {
55+
if!reachedBarrier {
56+
r.cfg.DialBarrier.Done()
57+
}
58+
}()
59+
60+
reachedReceivingWatchBarrier:=false
61+
deferfunc() {
62+
iflen(r.cfg.ExpectedNotifications)>0&&!reachedReceivingWatchBarrier {
63+
r.cfg.ReceivingWatchBarrier.Done()
64+
}
65+
}()
66+
67+
logs=loadtestutil.NewSyncWriter(logs)
68+
logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
69+
r.client.SetLogger(logger)
70+
r.client.SetLogBodies(true)
71+
72+
r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User)
73+
newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs)
74+
iferr!=nil {
75+
r.cfg.Metrics.AddError("","create_user")
76+
returnxerrors.Errorf("create user: %w",err)
77+
}
78+
newUser:=newUserAndToken.User
79+
newUserClient:=codersdk.New(r.client.URL,
80+
codersdk.WithSessionToken(newUserAndToken.SessionToken),
81+
codersdk.WithLogger(logger),
82+
codersdk.WithLogBodies())
83+
84+
logger.Info(ctx,"runner user created",slog.F("username",newUser.Username),slog.F("user_id",newUser.ID.String()))
85+
86+
iflen(r.cfg.Roles)>0 {
87+
logger.Info(ctx,"assigning roles to user",slog.F("roles",r.cfg.Roles))
88+
89+
_,err:=r.client.UpdateUserRoles(ctx,newUser.ID.String(), codersdk.UpdateRoles{
90+
Roles:r.cfg.Roles,
91+
})
92+
iferr!=nil {
93+
r.cfg.Metrics.AddError(newUser.Username,"assign_roles")
94+
returnxerrors.Errorf("assign roles: %w",err)
95+
}
96+
}
97+
98+
logger.Info(ctx,"notification runner is ready")
99+
100+
dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout)
101+
defercancel()
102+
103+
logger.Info(ctx,"connecting to notification websocket")
104+
conn,err:=r.dialNotificationWebsocket(dialCtx,newUserClient,newUser,logger)
105+
iferr!=nil {
106+
returnxerrors.Errorf("dial notification websocket: %w",err)
107+
}
108+
deferconn.Close(websocket.StatusNormalClosure,"done")
109+
logger.Info(ctx,"connected to notification websocket")
110+
111+
reachedBarrier=true
112+
r.cfg.DialBarrier.Done()
113+
r.cfg.DialBarrier.Wait()
114+
115+
iflen(r.cfg.ExpectedNotifications)==0 {
116+
logger.Info(ctx,"maintaining websocket connection, waiting for receiving users to complete")
117+
118+
// Wait for receiving users to complete
119+
done:=make(chanstruct{})
120+
gofunc() {
121+
r.cfg.ReceivingWatchBarrier.Wait()
122+
close(done)
123+
}()
124+
125+
select {
126+
case<-done:
127+
logger.Info(ctx,"receiving users complete, closing connection")
128+
case<-ctx.Done():
129+
logger.Info(ctx,"context canceled, closing connection")
130+
}
131+
returnnil
132+
}
133+
134+
logger.Info(ctx,"waiting for notifications",slog.F("timeout",r.cfg.NotificationTimeout))
135+
136+
watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout)
137+
defercancel()
138+
139+
iferr:=r.watchNotifications(watchCtx,conn,newUser,logger,r.cfg.ExpectedNotifications);err!=nil {
140+
returnxerrors.Errorf("notification watch failed: %w",err)
141+
}
142+
143+
reachedReceivingWatchBarrier=true
144+
r.cfg.ReceivingWatchBarrier.Done()
145+
146+
returnnil
147+
}
148+
149+
func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error {
150+
ifr.createUserRunner!=nil {
151+
_,_=fmt.Fprintln(logs,"Cleaning up user...")
152+
iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil {
153+
returnxerrors.Errorf("cleanup user: %w",err)
154+
}
155+
}
156+
157+
returnnil
158+
}
159+
160+
constNotificationDeliveryLatencyMetric="notification_delivery_latency_seconds"
161+
162+
func (r*Runner)GetMetrics()map[string]any {
163+
returnmap[string]any{
164+
NotificationDeliveryLatencyMetric:r.notificationLatencies,
165+
}
166+
}
167+
168+
func (r*Runner)dialNotificationWebsocket(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*websocket.Conn,error) {
169+
u,err:=client.URL.Parse("/api/v2/notifications/inbox/watch")
170+
iferr!=nil {
171+
logger.Error(ctx,"parse notification URL",slog.Error(err))
172+
r.cfg.Metrics.AddError(user.Username,"parse_url")
173+
returnnil,xerrors.Errorf("parse notification URL: %w",err)
174+
}
175+
176+
conn,resp,err:=websocket.Dial(ctx,u.String(),&websocket.DialOptions{
177+
HTTPHeader: http.Header{
178+
"Coder-Session-Token": []string{client.SessionToken()},
179+
},
180+
})
181+
iferr!=nil {
182+
ifresp!=nil {
183+
deferresp.Body.Close()
184+
ifresp.StatusCode!=http.StatusSwitchingProtocols {
185+
err=codersdk.ReadBodyAsError(resp)
186+
}
187+
}
188+
logger.Error(ctx,"dial notification websocket",slog.Error(err))
189+
r.cfg.Metrics.AddError(user.Username,"dial")
190+
returnnil,xerrors.Errorf("dial notification websocket: %w",err)
191+
}
192+
193+
returnconn,nil
194+
}
195+
196+
// watchNotifications reads notifications from the websocket and returns error or nil
197+
// once all expected notifications are received.
198+
func (r*Runner)watchNotifications(ctx context.Context,conn*websocket.Conn,user codersdk.User,logger slog.Logger,expectedNotificationsmap[uuid.UUID]chan time.Time)error {
199+
logger.Info(ctx,"waiting for notifications",
200+
slog.F("username",user.Username),
201+
slog.F("expected_count",len(expectedNotifications)))
202+
203+
receivedNotifications:=make(map[uuid.UUID]struct{})
204+
205+
for {
206+
select {
207+
case<-ctx.Done():
208+
returnxerrors.Errorf("context canceled while waiting for notifications: %w",ctx.Err())
209+
default:
210+
}
211+
212+
iflen(receivedNotifications)==len(expectedNotifications) {
213+
logger.Info(ctx,"received all expected notifications")
214+
returnnil
215+
}
216+
217+
notif,err:=readNotification(ctx,conn)
218+
iferr!=nil {
219+
logger.Error(ctx,"read notification",slog.Error(err))
220+
r.cfg.Metrics.AddError(user.Username,"read_notification")
221+
returnxerrors.Errorf("read notification: %w",err)
222+
}
223+
224+
templateID:=notif.Notification.TemplateID
225+
iftriggerTimeChan,exists:=expectedNotifications[templateID];exists {
226+
if_,exists:=receivedNotifications[templateID];!exists {
227+
receiptTime:=time.Now()
228+
select {
229+
casetriggerTime:=<-triggerTimeChan:
230+
latency:=receiptTime.Sub(triggerTime)
231+
r.notificationLatencies[templateID]=latency
232+
r.cfg.Metrics.RecordLatency(latency,user.Username,templateID.String())
233+
receivedNotifications[templateID]=struct{}{}
234+
235+
logger.Info(ctx,"received expected notification",
236+
slog.F("template_id",templateID),
237+
slog.F("title",notif.Notification.Title),
238+
slog.F("latency",latency))
239+
case<-ctx.Done():
240+
returnxerrors.Errorf("context canceled while waiting for trigger time: %w",ctx.Err())
241+
}
242+
}
243+
}else {
244+
logger.Debug(ctx,"received notification not being tested",
245+
slog.F("template_id",templateID),
246+
slog.F("title",notif.Notification.Title))
247+
}
248+
}
249+
}
250+
251+
funcreadNotification(ctx context.Context,conn*websocket.Conn) (codersdk.GetInboxNotificationResponse,error) {
252+
_,message,err:=conn.Read(ctx)
253+
iferr!=nil {
254+
return codersdk.GetInboxNotificationResponse{},err
255+
}
256+
257+
varnotif codersdk.GetInboxNotificationResponse
258+
iferr:=json.Unmarshal(message,&notif);err!=nil {
259+
return codersdk.GetInboxNotificationResponse{},xerrors.Errorf("unmarshal notification: %w",err)
260+
}
261+
262+
returnnotif,nil
263+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp