Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e2df4d

Browse files
committed
Add notification runner to test delivery latency
1 parent98262d8 commit1e2df4d

File tree

4 files changed

+530
-0
lines changed

4 files changed

+530
-0
lines changed

‎scaletest/notifications/config.go‎

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package notifications
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"golang.org/x/xerrors"
8+
9+
"github.com/google/uuid"
10+
11+
"github.com/coder/coder/v2/scaletest/createusers"
12+
)
13+
14+
typeConfigstruct {
15+
// User is the configuration for the user to create.
16+
User createusers.Config`json:"user"`
17+
18+
// IsOwner indicates if this user should be assigned Owner role.
19+
// If true, the user will receive notifications.
20+
IsOwnerbool`json:"is_owner"`
21+
22+
// NotificationTimeout is how long to wait for notifications after triggering.
23+
NotificationTimeout time.Duration`json:"notification_timeout"`
24+
25+
// DialTimeout is how long to wait for websocket connection.
26+
DialTimeout time.Duration`json:"dial_timeout"`
27+
28+
Metrics*Metrics`json:"-"`
29+
30+
// DialBarrier ensures all runners are connected before notifications are triggered.
31+
DialBarrier*sync.WaitGroup`json:"-"`
32+
}
33+
34+
func (cConfig)Validate()error {
35+
// The runner always needs an org; ensure we propagate it into the user config.
36+
ifc.User.OrganizationID==uuid.Nil {
37+
returnxerrors.New("user organization_id must be set")
38+
}
39+
40+
iferr:=c.User.Validate();err!=nil {
41+
returnxerrors.Errorf("user config: %w",err)
42+
}
43+
44+
ifc.DialBarrier==nil {
45+
returnxerrors.New("dial barrier must be set")
46+
}
47+
48+
ifc.NotificationTimeout<=0 {
49+
returnxerrors.New("notification_timeout must be greater than 0")
50+
}
51+
52+
ifc.DialTimeout<=0 {
53+
returnxerrors.New("dial_timeout must be greater than 0")
54+
}
55+
56+
ifc.Metrics==nil {
57+
returnxerrors.New("metrics must be set")
58+
}
59+
60+
returnnil
61+
}

‎scaletest/notifications/metrics.go‎

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package notifications
2+
3+
import (
4+
"time"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
typeMetricsstruct {
10+
notificationLatency*prometheus.HistogramVec
11+
notificationErrors*prometheus.CounterVec
12+
missedNotifications*prometheus.CounterVec
13+
}
14+
15+
funcNewMetrics(reg prometheus.Registerer)*Metrics {
16+
ifreg==nil {
17+
reg=prometheus.DefaultRegisterer
18+
}
19+
20+
latency:=prometheus.NewHistogramVec(prometheus.HistogramOpts{
21+
Namespace:"coderd",
22+
Subsystem:"scaletest",
23+
Name:"notification_delivery_latency_seconds",
24+
Help:"Time between test trigger and receipt of notification by client",
25+
}, []string{"username","notification_type"})
26+
errors:=prometheus.NewCounterVec(prometheus.CounterOpts{
27+
Namespace:"coderd",
28+
Subsystem:"scaletest",
29+
Name:"notification_delivery_errors_total",
30+
Help:"Total number of notification delivery errors",
31+
}, []string{"username","action"})
32+
missed:=prometheus.NewCounterVec(prometheus.CounterOpts{
33+
Namespace:"coderd",
34+
Subsystem:"scaletest",
35+
Name:"notification_delivery_missed_total",
36+
Help:"Total number of missed notifications",
37+
}, []string{"username"})
38+
39+
reg.MustRegister(latency,errors,missed)
40+
41+
return&Metrics{
42+
notificationLatency:latency,
43+
notificationErrors:errors,
44+
missedNotifications:missed,
45+
}
46+
}
47+
48+
func (m*Metrics)RecordLatency(latency time.Duration,username,notificationTypestring) {
49+
m.notificationLatency.WithLabelValues(username,notificationType).Observe(latency.Seconds())
50+
}
51+
52+
func (m*Metrics)AddError(username,actionstring) {
53+
m.notificationErrors.WithLabelValues(username,action).Inc()
54+
}
55+
56+
func (m*Metrics)RecordMissed(usernamestring) {
57+
m.missedNotifications.WithLabelValues(username).Inc()
58+
}

‎scaletest/notifications/run.go‎

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"time"
10+
11+
"golang.org/x/xerrors"
12+
13+
"cdr.dev/slog"
14+
"cdr.dev/slog/sloggers/sloghuman"
15+
16+
"github.com/coder/coder/v2/coderd/notifications"
17+
"github.com/coder/coder/v2/coderd/tracing"
18+
"github.com/coder/coder/v2/codersdk"
19+
"github.com/coder/coder/v2/scaletest/createusers"
20+
"github.com/coder/coder/v2/scaletest/harness"
21+
"github.com/coder/coder/v2/scaletest/loadtestutil"
22+
"github.com/coder/websocket"
23+
)
24+
25+
typeRunnerstruct {
26+
client*codersdk.Client
27+
cfgConfig
28+
29+
createUserRunner*createusers.Runner
30+
31+
userCreatedNotificationLatency time.Duration
32+
userDeletedNotificationLatency time.Duration
33+
}
34+
35+
funcNewRunner(client*codersdk.Client,cfgConfig)*Runner {
36+
return&Runner{
37+
client:client,
38+
cfg:cfg,
39+
}
40+
}
41+
42+
var (
43+
_ harness.Runnable=&Runner{}
44+
_ harness.Cleanable=&Runner{}
45+
_ harness.Collectable=&Runner{}
46+
)
47+
48+
func (r*Runner)Run(ctx context.Context,idstring,logs io.Writer)error {
49+
ctx,span:=tracing.StartSpan(ctx)
50+
deferspan.End()
51+
52+
reachedBarrier:=false
53+
deferfunc() {
54+
if!reachedBarrier {
55+
r.cfg.DialBarrier.Done()
56+
}
57+
}()
58+
59+
logs=loadtestutil.NewSyncWriter(logs)
60+
logger:=slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug)
61+
r.client.SetLogger(logger)
62+
r.client.SetLogBodies(true)
63+
64+
r.createUserRunner=createusers.NewRunner(r.client,r.cfg.User)
65+
newUserAndToken,err:=r.createUserRunner.RunReturningUser(ctx,id,logs)
66+
iferr!=nil {
67+
r.cfg.Metrics.AddError("","create_user")
68+
returnxerrors.Errorf("create user: %w",err)
69+
}
70+
newUser:=newUserAndToken.User
71+
newUserClient:=codersdk.New(r.client.URL,
72+
codersdk.WithSessionToken(newUserAndToken.SessionToken),
73+
codersdk.WithLogger(logger),
74+
codersdk.WithLogBodies())
75+
76+
logger.Info(ctx,fmt.Sprintf("user %q created",newUser.Username),slog.F("id",newUser.ID.String()))
77+
78+
ifr.cfg.IsOwner {
79+
logger.Info(ctx,"assigning Owner role to user")
80+
81+
_,err:=r.client.UpdateUserRoles(ctx,newUser.ID.String(), codersdk.UpdateRoles{
82+
Roles: []string{codersdk.RoleOwner},
83+
})
84+
iferr!=nil {
85+
r.cfg.Metrics.AddError(newUser.Username,"assign_owner_role")
86+
returnxerrors.Errorf("assign owner role: %w",err)
87+
}
88+
}
89+
90+
logger.Info(ctx,"notification runner is ready")
91+
92+
// We don't need to wait for notifications since we're not an owner
93+
if!r.cfg.IsOwner {
94+
reachedBarrier=true
95+
r.cfg.DialBarrier.Done()
96+
returnnil
97+
}
98+
99+
dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout)
100+
defercancel()
101+
102+
logger.Info(ctx,"connecting to notification websocket")
103+
conn,err:=r.dialNotificationWebsocket(dialCtx,newUserClient,newUser,logger)
104+
iferr!=nil {
105+
returnxerrors.Errorf("dial notification websocket: %w",err)
106+
}
107+
deferconn.Close(websocket.StatusNormalClosure,"done")
108+
logger.Info(ctx,"connected to notification websocket")
109+
110+
reachedBarrier=true
111+
r.cfg.DialBarrier.Done()
112+
r.cfg.DialBarrier.Wait()
113+
114+
logger.Info(ctx,fmt.Sprintf("waiting up to %v for notifications...",r.cfg.NotificationTimeout))
115+
116+
watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout)
117+
defercancel()
118+
119+
iferr:=r.watchNotifications(watchCtx,conn,newUser,logger);err!=nil {
120+
returnxerrors.Errorf("notification watch failed: %w",err)
121+
}
122+
123+
returnnil
124+
}
125+
126+
func (r*Runner)Cleanup(ctx context.Context,idstring,logs io.Writer)error {
127+
ifr.createUserRunner!=nil {
128+
_,_=fmt.Fprintln(logs,"Cleaning up user...")
129+
iferr:=r.createUserRunner.Cleanup(ctx,id,logs);err!=nil {
130+
returnxerrors.Errorf("cleanup user: %w",err)
131+
}
132+
}
133+
134+
returnnil
135+
}
136+
137+
const (
138+
UserCreatedNotificationLatencyMetric="user_created_notification_latency_seconds"
139+
UserDeletedNotificationLatencyMetric="user_deleted_notification_latency_seconds"
140+
)
141+
142+
func (r*Runner)GetMetrics()map[string]any {
143+
metrics:=map[string]any{}
144+
145+
ifr.userCreatedNotificationLatency>0 {
146+
metrics[UserCreatedNotificationLatencyMetric]=r.userCreatedNotificationLatency.Seconds()
147+
}
148+
149+
ifr.userDeletedNotificationLatency>0 {
150+
metrics[UserDeletedNotificationLatencyMetric]=r.userDeletedNotificationLatency.Seconds()
151+
}
152+
153+
returnmetrics
154+
}
155+
156+
func (r*Runner)dialNotificationWebsocket(ctx context.Context,client*codersdk.Client,user codersdk.User,logger slog.Logger) (*websocket.Conn,error) {
157+
u,err:=client.URL.Parse("/api/v2/notifications/inbox/watch")
158+
iferr!=nil {
159+
logger.Error(ctx,"parse notification URL",slog.Error(err))
160+
r.cfg.Metrics.AddError(user.Username,"parse_url")
161+
returnnil,xerrors.Errorf("parse notification URL: %w",err)
162+
}
163+
164+
conn,resp,err:=websocket.Dial(ctx,u.String(),&websocket.DialOptions{
165+
HTTPHeader: http.Header{
166+
"Coder-Session-Token": []string{client.SessionToken()},
167+
},
168+
})
169+
iferr!=nil {
170+
ifresp!=nil {
171+
deferresp.Body.Close()
172+
ifresp.StatusCode!=http.StatusSwitchingProtocols {
173+
err=codersdk.ReadBodyAsError(resp)
174+
}
175+
}
176+
logger.Error(ctx,"dial notification websocket",slog.Error(err))
177+
r.cfg.Metrics.AddError(user.Username,"dial")
178+
returnnil,xerrors.Errorf("dial notification websocket: %w",err)
179+
}
180+
181+
returnconn,nil
182+
}
183+
184+
// watchNotifications reads notifications from the websockert and returns error or nil
185+
// once both expected notifications are received.
186+
func (r*Runner)watchNotifications(ctx context.Context,conn*websocket.Conn,user codersdk.User,logger slog.Logger)error {
187+
notificationStartTime:=time.Now()
188+
logger.Info(ctx,"waiting for notifications",slog.F("username",user.Username))
189+
190+
receivedCreated:=false
191+
receivedDeleted:=false
192+
193+
// Read notifications until we have both expected types
194+
for!receivedCreated||!receivedDeleted {
195+
notif,err:=r.readNotification(ctx,conn)
196+
iferr!=nil {
197+
logger.Error(ctx,"read notification",slog.Error(err))
198+
r.cfg.Metrics.AddError(user.Username,"read_notification")
199+
returnxerrors.Errorf("read notification: %w",err)
200+
}
201+
202+
switchnotif.Notification.TemplateID {
203+
casenotifications.TemplateUserAccountCreated:
204+
if!receivedCreated {
205+
r.userCreatedNotificationLatency=time.Since(notificationStartTime)
206+
r.cfg.Metrics.RecordLatency(r.userCreatedNotificationLatency,user.Username,"user_created")
207+
receivedCreated=true
208+
logger.Info(ctx,"received user created notification")
209+
}
210+
casenotifications.TemplateUserAccountDeleted:
211+
if!receivedDeleted {
212+
r.userDeletedNotificationLatency=time.Since(notificationStartTime)
213+
r.cfg.Metrics.RecordLatency(r.userDeletedNotificationLatency,user.Username,"user_deleted")
214+
receivedDeleted=true
215+
logger.Info(ctx,"received user deleted notification")
216+
}
217+
default:
218+
logger.Warn(ctx,"received unexpected notification type",
219+
slog.F("template_id",notif.Notification.TemplateID),
220+
slog.F("title",notif.Notification.Title))
221+
}
222+
}
223+
logger.Info(ctx,"received both notifications successfully")
224+
returnnil
225+
}
226+
227+
func (*Runner)readNotification(ctx context.Context,conn*websocket.Conn) (codersdk.GetInboxNotificationResponse,error) {
228+
_,message,err:=conn.Read(ctx)
229+
iferr!=nil {
230+
return codersdk.GetInboxNotificationResponse{},err
231+
}
232+
233+
varnotif codersdk.GetInboxNotificationResponse
234+
iferr:=json.Unmarshal(message,&notif);err!=nil {
235+
return codersdk.GetInboxNotificationResponse{},xerrors.Errorf("unmarshal notification: %w",err)
236+
}
237+
238+
returnnotif,nil
239+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp