Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd10c640

Browse files
committed
feat(scaletest): extend notifications runner with smtp support
1 parent20790fc commitd10c640

File tree

5 files changed

+401
-44
lines changed

5 files changed

+401
-44
lines changed

‎cli/exp_scaletest.go‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
19301930
notificationTimeout time.Duration
19311931
dialTimeout time.Duration
19321932
noCleanupbool
1933+
smtpApiUrlstring
19331934

19341935
tracingFlags=&scaletestTracingFlags{}
19351936

@@ -1976,6 +1977,12 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
19761977
returnxerrors.Errorf("--owner-user-percentage must be between 0 and 100")
19771978
}
19781979

1980+
ifsmtpApiUrl!="" {
1981+
if!strings.HasPrefix(smtpApiUrl,"http://")&&!strings.HasPrefix(smtpApiUrl,"https://") {
1982+
returnxerrors.Errorf("smtp_api_url must start with http:// or https://")
1983+
}
1984+
}
1985+
19791986
ownerUserCount:=int64(float64(userCount)*ownerUserPercentage/100)
19801987
ifownerUserCount==0&&ownerUserPercentage>0 {
19811988
ownerUserCount=1
@@ -2040,6 +2047,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
20402047
ReceivingWatchBarrier:ownerWatchBarrier,
20412048
ExpectedNotifications:expectedNotifications,
20422049
Metrics:metrics,
2050+
SMTPApiUrl:smtpApiUrl,
20432051
}
20442052
iferr:=config.Validate();err!=nil {
20452053
returnxerrors.Errorf("validate config: %w",err)
@@ -2057,6 +2065,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
20572065
DialBarrier:dialBarrier,
20582066
ReceivingWatchBarrier:ownerWatchBarrier,
20592067
Metrics:metrics,
2068+
SMTPApiUrl:smtpApiUrl,
20602069
}
20612070
iferr:=config.Validate();err!=nil {
20622071
returnxerrors.Errorf("validate config: %w",err)
@@ -2166,6 +2175,12 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
21662175
Description:"Do not clean up resources after the test completes.",
21672176
Value:serpent.BoolOf(&noCleanup),
21682177
},
2178+
{
2179+
Flag:"smtp-api-url",
2180+
Env:"CODER_SCALETEST_SMTP_API_URL",
2181+
Description:"SMTP mock HTTP API address.",
2182+
Value:serpent.StringOf(&smtpApiUrl),
2183+
},
21692184
}
21702185

21712186
tracingFlags.attach(&cmd.Options)

‎scaletest/notifications/config.go‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type Config struct {
3535

3636
// ReceivingWatchBarrier is the barrier for receiving users. Regular users wait on this to disconnect after receiving users complete.
3737
ReceivingWatchBarrier*sync.WaitGroup`json:"-"`
38+
39+
// SMTPApiUrl is the URL of the SMTP mock HTTP API
40+
SMTPApiUrlstring`json:"smtp_api_url"`
3841
}
3942

4043
func (cConfig)Validate()error {

‎scaletest/notifications/metrics.go‎

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ import (
66
"github.com/prometheus/client_golang/prometheus"
77
)
88

9+
typeNotificationTypestring
10+
11+
const (
12+
NotificationTypeWebsocketNotificationType="websocket"
13+
NotificationTypeSMTPNotificationType="smtp"
14+
)
15+
916
typeMetricsstruct {
1017
notificationLatency*prometheus.HistogramVec
1118
notificationErrors*prometheus.CounterVec
@@ -22,7 +29,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
2229
Subsystem:"scaletest",
2330
Name:"notification_delivery_latency_seconds",
2431
Help:"Time between notification-creating action and receipt of notification by client",
25-
}, []string{"username","notification_type"})
32+
}, []string{"username","notification_id","notification_type"})
2633
errors:=prometheus.NewCounterVec(prometheus.CounterOpts{
2734
Namespace:"coderd",
2835
Subsystem:"scaletest",
@@ -45,8 +52,8 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
4552
}
4653
}
4754

48-
func (m*Metrics)RecordLatency(latency time.Duration,username,notificationTypestring) {
49-
m.notificationLatency.WithLabelValues(username,notificationType).Observe(latency.Seconds())
55+
func (m*Metrics)RecordLatency(latency time.Duration,username,notificationIDstring,notificationTypeNotificationType) {
56+
m.notificationLatency.WithLabelValues(username,notificationID,string(notificationType)).Observe(latency.Seconds())
5057
}
5158

5259
func (m*Metrics)AddError(username,actionstring) {

‎scaletest/notifications/run.go‎

Lines changed: 150 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"sync"
910
"time"
1011

1112
"github.com/google/uuid"
13+
"golang.org/x/sync/errgroup"
1214
"golang.org/x/xerrors"
1315

1416
"cdr.dev/slog"
@@ -19,6 +21,7 @@ import (
1921
"github.com/coder/coder/v2/scaletest/createusers"
2022
"github.com/coder/coder/v2/scaletest/harness"
2123
"github.com/coder/coder/v2/scaletest/loadtestutil"
24+
"github.com/coder/coder/v2/scaletest/smtpmock"
2225
"github.com/coder/websocket"
2326
)
2427

@@ -28,15 +31,21 @@ type Runner struct {
2831

2932
createUserRunner*createusers.Runner
3033

31-
// notificationLatencies stores the latency for each notification type
32-
notificationLatenciesmap[uuid.UUID]time.Duration
34+
// websocketLatencies stores the latency for websocket notifications
35+
websocketLatenciesmap[uuid.UUID]time.Duration
36+
websocketLatenciesMu sync.RWMutex
37+
38+
// smtpLatencies stores the latency for SMTP notifications
39+
smtpLatenciesmap[uuid.UUID]time.Duration
40+
smtpLatenciesMu sync.RWMutex
3341
}
3442

3543
funcNewRunner(client*codersdk.Client,cfgConfig)*Runner {
3644
return&Runner{
37-
client:client,
38-
cfg:cfg,
39-
notificationLatencies:make(map[uuid.UUID]time.Duration),
45+
client:client,
46+
cfg:cfg,
47+
websocketLatencies:make(map[uuid.UUID]time.Duration),
48+
smtpLatencies:make(map[uuid.UUID]time.Duration),
4049
}
4150
}
4251

@@ -136,7 +145,20 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
136145
watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout)
137146
defercancel()
138147

139-
iferr:=r.watchNotifications(watchCtx,conn,newUser,logger,r.cfg.ExpectedNotifications);err!=nil {
148+
eg,egCtx:=errgroup.WithContext(watchCtx)
149+
150+
eg.Go(func()error {
151+
returnr.watchNotifications(egCtx,conn,newUser,logger,r.cfg.ExpectedNotifications)
152+
})
153+
154+
ifr.cfg.SMTPApiUrl!="" {
155+
logger.Info(ctx,"running SMTP notification watcher")
156+
eg.Go(func()error {
157+
returnr.watchNotificationsSMTP(egCtx,newUser,logger,r.cfg.ExpectedNotifications)
158+
})
159+
}
160+
161+
iferr:=eg.Wait();err!=nil {
140162
returnxerrors.Errorf("notification watch failed: %w",err)
141163
}
142164

@@ -157,11 +179,29 @@ func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error {
157179
returnnil
158180
}
159181

160-
constNotificationDeliveryLatencyMetric="notification_delivery_latency_seconds"
182+
const (
183+
WebsocketNotificationLatencyMetric="notification_websocket_latency_seconds"
184+
SMTPNotificationLatencyMetric="notification_smtp_latency_seconds"
185+
)
161186

162187
func (r*Runner)GetMetrics()map[string]any {
188+
r.websocketLatenciesMu.RLock()
189+
websocketLatencies:=make(map[uuid.UUID]time.Duration,len(r.websocketLatencies))
190+
forid,latency:=ranger.websocketLatencies {
191+
websocketLatencies[id]=latency
192+
}
193+
r.websocketLatenciesMu.RUnlock()
194+
195+
r.smtpLatenciesMu.RLock()
196+
smtpLatencies:=make(map[uuid.UUID]time.Duration,len(r.smtpLatencies))
197+
forid,latency:=ranger.smtpLatencies {
198+
smtpLatencies[id]=latency
199+
}
200+
r.smtpLatenciesMu.RUnlock()
201+
163202
returnmap[string]any{
164-
NotificationDeliveryLatencyMetric:r.notificationLatencies,
203+
WebsocketNotificationLatencyMetric:websocketLatencies,
204+
SMTPNotificationLatencyMetric:smtpLatencies,
165205
}
166206
}
167207

@@ -228,8 +268,10 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
228268
select {
229269
casetriggerTime:=<-triggerTimeChan:
230270
latency:=receiptTime.Sub(triggerTime)
231-
r.notificationLatencies[templateID]=latency
232-
r.cfg.Metrics.RecordLatency(latency,user.Username,templateID.String())
271+
r.websocketLatenciesMu.Lock()
272+
r.websocketLatencies[templateID]=latency
273+
r.websocketLatenciesMu.Unlock()
274+
r.cfg.Metrics.RecordLatency(latency,user.Username,templateID.String(),NotificationTypeWebsocket)
233275
receivedNotifications[templateID]=struct{}{}
234276

235277
logger.Info(ctx,"received expected notification",
@@ -248,6 +290,104 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
248290
}
249291
}
250292

293+
constsmtpPollInterval=2*time.Second
294+
295+
// watchNotificationsSMTP polls the SMTP HTTP API for notifications and returns error or nil
296+
// once all expected notifications are received.
297+
func (r*Runner)watchNotificationsSMTP(ctx context.Context,user codersdk.User,logger slog.Logger,expectedNotificationsmap[uuid.UUID]chan time.Time)error {
298+
logger.Info(ctx,"polling SMTP API for notifications",
299+
slog.F("username",user.Username),
300+
slog.F("email",user.Email),
301+
slog.F("expected_count",len(expectedNotifications)),
302+
)
303+
304+
receivedNotifications:=make(map[uuid.UUID]struct{})
305+
ticker:=time.NewTicker(smtpPollInterval)
306+
deferticker.Stop()
307+
308+
apiURL:=fmt.Sprintf("%s/messages?email=%s",r.cfg.SMTPApiUrl,user.Email)
309+
httpClient:=&http.Client{
310+
Timeout:10*time.Second,
311+
}
312+
313+
for {
314+
select {
315+
case<-ctx.Done():
316+
returnxerrors.Errorf("context canceled while waiting for notifications: %w",ctx.Err())
317+
case<-ticker.C:
318+
iflen(receivedNotifications)==len(expectedNotifications) {
319+
logger.Info(ctx,"received all expected notifications via SMTP")
320+
returnnil
321+
}
322+
323+
req,err:=http.NewRequestWithContext(ctx,http.MethodGet,apiURL,nil)
324+
iferr!=nil {
325+
logger.Error(ctx,"create SMTP API request",slog.Error(err))
326+
r.cfg.Metrics.AddError(user.Username,"smtp_create_request")
327+
returnxerrors.Errorf("create SMTP API request: %w",err)
328+
}
329+
330+
resp,err:=httpClient.Do(req)
331+
iferr!=nil {
332+
logger.Error(ctx,"poll smtp api for notifications",slog.Error(err))
333+
r.cfg.Metrics.AddError(user.Username,"smtp_poll")
334+
returnxerrors.Errorf("poll smtp api: %w",err)
335+
}
336+
337+
ifresp.StatusCode!=http.StatusOK {
338+
resp.Body.Close()
339+
logger.Error(ctx,"smtp api returned non-200 status",slog.F("status",resp.StatusCode))
340+
r.cfg.Metrics.AddError(user.Username,"smtp_bad_status")
341+
returnxerrors.Errorf("smtp api returned status %d",resp.StatusCode)
342+
}
343+
344+
varsummaries []smtpmock.EmailSummary
345+
iferr:=json.NewDecoder(resp.Body).Decode(&summaries);err!=nil {
346+
resp.Body.Close()
347+
logger.Error(ctx,"decode smtp api response",slog.Error(err))
348+
r.cfg.Metrics.AddError(user.Username,"smtp_decode")
349+
returnxerrors.Errorf("decode smtp api response: %w",err)
350+
}
351+
resp.Body.Close()
352+
353+
// Process each email summary
354+
for_,summary:=rangesummaries {
355+
notificationID:=summary.NotificationID
356+
ifnotificationID==uuid.Nil {
357+
continue
358+
}
359+
360+
iftriggerTimeChan,exists:=expectedNotifications[notificationID];exists {
361+
if_,received:=receivedNotifications[notificationID];!received {
362+
receiptTime:=summary.Date
363+
ifreceiptTime.IsZero() {
364+
receiptTime=time.Now()
365+
}
366+
367+
select {
368+
casetriggerTime:=<-triggerTimeChan:
369+
latency:=receiptTime.Sub(triggerTime)
370+
r.smtpLatenciesMu.Lock()
371+
r.smtpLatencies[notificationID]=latency
372+
r.smtpLatenciesMu.Unlock()
373+
r.cfg.Metrics.RecordLatency(latency,user.Username,notificationID.String(),NotificationTypeSMTP)
374+
receivedNotifications[notificationID]=struct{}{}
375+
376+
logger.Info(ctx,"received expected notification via SMTP",
377+
slog.F("notification_id",notificationID),
378+
slog.F("subject",summary.Subject),
379+
slog.F("latency",latency))
380+
case<-ctx.Done():
381+
returnxerrors.Errorf("context canceled while waiting for trigger time: %w",ctx.Err())
382+
default:
383+
}
384+
}
385+
}
386+
}
387+
}
388+
}
389+
}
390+
251391
funcreadNotification(ctx context.Context,conn*websocket.Conn) (codersdk.GetInboxNotificationResponse,error) {
252392
_,message,err:=conn.Read(ctx)
253393
iferr!=nil {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp