Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(scaletest): extend notifications runner with smtp support#20222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
kacpersaw wants to merge3 commits intokacpersaw/scaletest-smtp-mock-server
base:kacpersaw/scaletest-smtp-mock-server
Choose a base branch
Loading
fromkacpersaw/scaletest-notifications-smtp
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletionscli/exp_scaletest.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1929,6 +1929,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
notificationTimeout time.Duration
dialTimeout time.Duration
noCleanup bool
smtpAPIURL string

tracingFlags = &scaletestTracingFlags{}

Expand DownExpand Up@@ -1975,6 +1976,12 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
return xerrors.Errorf("--owner-user-percentage must be between 0 and 100")
}

if smtpAPIURL != "" {
if !strings.HasPrefix(smtpAPIURL, "http://") && !strings.HasPrefix(smtpAPIURL, "https://") {
return xerrors.Errorf("smtp_api_url must start with http:// or https://")
}
}

ownerUserCount := int64(float64(userCount) * ownerUserPercentage / 100)
if ownerUserCount == 0 && ownerUserPercentage > 0 {
ownerUserCount = 1
Expand DownExpand Up@@ -2039,6 +2046,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
ReceivingWatchBarrier: ownerWatchBarrier,
ExpectedNotifications: expectedNotifications,
Metrics: metrics,
SMTPApiURL: smtpAPIURL,
}
if err := config.Validate(); err != nil {
return xerrors.Errorf("validate config: %w", err)
Expand All@@ -2056,6 +2064,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
DialBarrier: dialBarrier,
ReceivingWatchBarrier: ownerWatchBarrier,
Metrics: metrics,
SMTPApiURL: smtpAPIURL,
}
if err := config.Validate(); err != nil {
return xerrors.Errorf("validate config: %w", err)
Expand DownExpand Up@@ -2165,6 +2174,12 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
Description: "Do not clean up resources after the test completes.",
Value: serpent.BoolOf(&noCleanup),
},
{
Flag: "smtp-api-url",
Env: "CODER_SCALETEST_SMTP_API_URL",
Description: "SMTP mock HTTP API address.",
Value: serpent.StringOf(&smtpAPIURL),
},
}

tracingFlags.attach(&cmd.Options)
Expand Down
3 changes: 3 additions & 0 deletionsscaletest/notifications/config.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -35,6 +35,9 @@ type Config struct {

// ReceivingWatchBarrier is the barrier for receiving users. Regular users wait on this to disconnect after receiving users complete.
ReceivingWatchBarrier *sync.WaitGroup `json:"-"`

// SMTPApiUrl is the URL of the SMTP mock HTTP API
SMTPApiURL string `json:"smtp_api_url"`
}

func (c Config) Validate() error {
Expand Down
13 changes: 10 additions & 3 deletionsscaletest/notifications/metrics.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,6 +6,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

type NotificationType string

const (
NotificationTypeWebsocket NotificationType = "websocket"
NotificationTypeSMTP NotificationType = "smtp"
)

type Metrics struct {
notificationLatency *prometheus.HistogramVec
notificationErrors *prometheus.CounterVec
Expand All@@ -22,7 +29,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Subsystem: "scaletest",
Name: "notification_delivery_latency_seconds",
Help: "Time between notification-creating action and receipt of notification by client",
}, []string{"username", "notification_type"})
}, []string{"username", "notification_id", "notification_type"})
errors := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Expand All@@ -45,8 +52,8 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
}
}

func (m *Metrics) RecordLatency(latency time.Duration, username,notificationType string) {
m.notificationLatency.WithLabelValues(username, notificationType).Observe(latency.Seconds())
func (m *Metrics) RecordLatency(latency time.Duration, username,notificationID string, notificationType NotificationType) {
m.notificationLatency.WithLabelValues(username,notificationID, string(notificationType)).Observe(latency.Seconds())
}

func (m *Metrics) AddError(username, action string) {
Expand Down
160 changes: 150 additions & 10 deletionsscaletest/notifications/run.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,9 +6,11 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand All@@ -19,6 +21,7 @@ import (
"github.com/coder/coder/v2/scaletest/createusers"
"github.com/coder/coder/v2/scaletest/harness"
"github.com/coder/coder/v2/scaletest/loadtestutil"
"github.com/coder/coder/v2/scaletest/smtpmock"
"github.com/coder/websocket"
)

Expand All@@ -28,15 +31,21 @@ type Runner struct {

createUserRunner *createusers.Runner

// notificationLatencies stores the latency for each notification type
notificationLatencies map[uuid.UUID]time.Duration
// websocketLatencies stores the latency for websocket notifications
websocketLatencies map[uuid.UUID]time.Duration
websocketLatenciesMu sync.RWMutex

// smtpLatencies stores the latency for SMTP notifications
smtpLatencies map[uuid.UUID]time.Duration
smtpLatenciesMu sync.RWMutex
}

func NewRunner(client *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: client,
cfg: cfg,
notificationLatencies: make(map[uuid.UUID]time.Duration),
client: client,
cfg: cfg,
websocketLatencies: make(map[uuid.UUID]time.Duration),
smtpLatencies: make(map[uuid.UUID]time.Duration),
}
}

Expand DownExpand Up@@ -136,7 +145,20 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
watchCtx, cancel := context.WithTimeout(ctx, r.cfg.NotificationTimeout)
defer cancel()

if err := r.watchNotifications(watchCtx, conn, newUser, logger, r.cfg.ExpectedNotifications); err != nil {
eg, egCtx := errgroup.WithContext(watchCtx)

eg.Go(func() error {
return r.watchNotifications(egCtx, conn, newUser, logger, r.cfg.ExpectedNotifications)
})

if r.cfg.SMTPApiURL != "" {
logger.Info(ctx, "running SMTP notification watcher")
eg.Go(func() error {
return r.watchNotificationsSMTP(egCtx, newUser, logger, r.cfg.ExpectedNotifications)
})
}

if err := eg.Wait(); err != nil {
return xerrors.Errorf("notification watch failed: %w", err)
}

Expand All@@ -157,11 +179,29 @@ func (r *Runner) Cleanup(ctx context.Context, id string, logs io.Writer) error {
return nil
}

const NotificationDeliveryLatencyMetric = "notification_delivery_latency_seconds"
const (
WebsocketNotificationLatencyMetric = "notification_websocket_latency_seconds"
SMTPNotificationLatencyMetric = "notification_smtp_latency_seconds"
)

func (r *Runner) GetMetrics() map[string]any {
r.websocketLatenciesMu.RLock()
websocketLatencies := make(map[uuid.UUID]time.Duration, len(r.websocketLatencies))
for id, latency := range r.websocketLatencies {
websocketLatencies[id] = latency
}
r.websocketLatenciesMu.RUnlock()

r.smtpLatenciesMu.RLock()
smtpLatencies := make(map[uuid.UUID]time.Duration, len(r.smtpLatencies))
for id, latency := range r.smtpLatencies {
smtpLatencies[id] = latency
}
r.smtpLatenciesMu.RUnlock()

return map[string]any{
NotificationDeliveryLatencyMetric: r.notificationLatencies,
WebsocketNotificationLatencyMetric: websocketLatencies,
SMTPNotificationLatencyMetric: smtpLatencies,
}
}

Expand DownExpand Up@@ -228,8 +268,10 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
select {
case triggerTime := <-triggerTimeChan:
latency := receiptTime.Sub(triggerTime)
r.notificationLatencies[templateID] = latency
r.cfg.Metrics.RecordLatency(latency, user.Username, templateID.String())
r.websocketLatenciesMu.Lock()
r.websocketLatencies[templateID] = latency
r.websocketLatenciesMu.Unlock()
r.cfg.Metrics.RecordLatency(latency, user.Username, templateID.String(), NotificationTypeWebsocket)
receivedNotifications[templateID] = struct{}{}

logger.Info(ctx, "received expected notification",
Expand All@@ -248,6 +290,104 @@ func (r *Runner) watchNotifications(ctx context.Context, conn *websocket.Conn, u
}
}

const smtpPollInterval = 2 * time.Second

// watchNotificationsSMTP polls the SMTP HTTP API for notifications and returns error or nil
// once all expected notifications are received.
func (r *Runner) watchNotificationsSMTP(ctx context.Context, user codersdk.User, logger slog.Logger, expectedNotifications map[uuid.UUID]chan time.Time) error {
logger.Info(ctx, "polling SMTP API for notifications",
slog.F("username", user.Username),
slog.F("email", user.Email),
slog.F("expected_count", len(expectedNotifications)),
)

receivedNotifications := make(map[uuid.UUID]struct{})
ticker := time.NewTicker(smtpPollInterval)
defer ticker.Stop()

apiURL := fmt.Sprintf("%s/messages?email=%s", r.cfg.SMTPApiURL, user.Email)
httpClient := &http.Client{
Timeout: 10 * time.Second,
}

for {
select {
case <-ctx.Done():
return xerrors.Errorf("context canceled while waiting for notifications: %w", ctx.Err())
case <-ticker.C:
if len(receivedNotifications) == len(expectedNotifications) {
logger.Info(ctx, "received all expected notifications via SMTP")
return nil
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, apiURL, nil)
if err != nil {
logger.Error(ctx, "create SMTP API request", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, "smtp_create_request")
return xerrors.Errorf("create SMTP API request: %w", err)
}

resp, err := httpClient.Do(req)
if err != nil {
logger.Error(ctx, "poll smtp api for notifications", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, "smtp_poll")
return xerrors.Errorf("poll smtp api: %w", err)
}

if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
logger.Error(ctx, "smtp api returned non-200 status", slog.F("status", resp.StatusCode))
r.cfg.Metrics.AddError(user.Username, "smtp_bad_status")
return xerrors.Errorf("smtp api returned status %d", resp.StatusCode)
}

var summaries []smtpmock.EmailSummary
if err := json.NewDecoder(resp.Body).Decode(&summaries); err != nil {
_ = resp.Body.Close()
logger.Error(ctx, "decode smtp api response", slog.Error(err))
r.cfg.Metrics.AddError(user.Username, "smtp_decode")
return xerrors.Errorf("decode smtp api response: %w", err)
}
_ = resp.Body.Close()

// Process each email summary
for _, summary := range summaries {
notificationID := summary.NotificationTemplateID
if notificationID == uuid.Nil {
continue
}

if triggerTimeChan, exists := expectedNotifications[notificationID]; exists {
if _, received := receivedNotifications[notificationID]; !received {
receiptTime := summary.Date
if receiptTime.IsZero() {
receiptTime = time.Now()
}

select {
case triggerTime := <-triggerTimeChan:
latency := receiptTime.Sub(triggerTime)
r.smtpLatenciesMu.Lock()
r.smtpLatencies[notificationID] = latency
r.smtpLatenciesMu.Unlock()
r.cfg.Metrics.RecordLatency(latency, user.Username, notificationID.String(), NotificationTypeSMTP)
receivedNotifications[notificationID] = struct{}{}

logger.Info(ctx, "received expected notification via SMTP",
slog.F("notification_id", notificationID),
slog.F("subject", summary.Subject),
slog.F("latency", latency))
case <-ctx.Done():
return xerrors.Errorf("context canceled while waiting for trigger time: %w", ctx.Err())
default:
}
}
}
}
}
}
}

func readNotification(ctx context.Context, conn *websocket.Conn) (codersdk.GetInboxNotificationResponse, error) {
_, message, err := conn.Read(ctx)
if err != nil {
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp