Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbfd3cda

Browse files
committed
add dual barrier support for owner and regular users
1 parentc5938da commitbfd3cda

File tree

3 files changed

+85
-21
lines changed

3 files changed

+85
-21
lines changed

‎scaletest/notifications/config.go‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Config struct {
2929

3030
// DialBarrier ensures all runners are connected before notifications are triggered.
3131
DialBarrier*sync.WaitGroup`json:"-"`
32+
33+
// OwnerDialBarrier is the barrier for owner users. Regular users wait on this to disconnect after owner users complete.
34+
OwnerDialBarrier*sync.WaitGroup`json:"-"`
3235
}
3336

3437
func (cConfig)Validate()error {
@@ -45,6 +48,10 @@ func (c Config) Validate() error {
4548
returnxerrors.New("dial barrier must be set")
4649
}
4750

51+
if!c.IsOwner&&c.OwnerDialBarrier==nil {
52+
returnxerrors.New("owner_dial_barrier must be set for regular users")
53+
}
54+
4855
ifc.NotificationTimeout<=0 {
4956
returnxerrors.New("notification_timeout must be greater than 0")
5057
}

‎scaletest/notifications/run.go‎

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,6 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
8989

9090
logger.Info(ctx,"notification runner is ready")
9191

92-
// We don't need to wait for notifications since we're not an owner
93-
if!r.cfg.IsOwner {
94-
reachedBarrier=true
95-
r.cfg.DialBarrier.Done()
96-
returnnil
97-
}
98-
9992
dialCtx,cancel:=context.WithTimeout(ctx,r.cfg.DialTimeout)
10093
defercancel()
10194

@@ -111,6 +104,25 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
111104
r.cfg.DialBarrier.Done()
112105
r.cfg.DialBarrier.Wait()
113106

107+
if!r.cfg.IsOwner {
108+
logger.Info(ctx,"maintaining websocket connection, waiting for owner users to complete")
109+
110+
// Wait for owners to complete
111+
done:=make(chanstruct{})
112+
gofunc() {
113+
r.cfg.OwnerDialBarrier.Wait()
114+
close(done)
115+
}()
116+
117+
select {
118+
case<-done:
119+
logger.Info(ctx,"owner users complete, closing connection")
120+
case<-ctx.Done():
121+
logger.Info(ctx,"context canceled, closing connection")
122+
}
123+
returnnil
124+
}
125+
114126
logger.Info(ctx,"waiting for notifications",slog.F("timeout",r.cfg.NotificationTimeout))
115127

116128
watchCtx,cancel:=context.WithTimeout(ctx,r.cfg.NotificationTimeout)

‎scaletest/notifications/run_test.go‎

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,21 +66,24 @@ func TestRun(t *testing.T) {
6666
require.NoError(t,err)
6767

6868
client:=coderdtest.New(t,&coderdtest.Options{
69-
Database:db,
70-
Pubsub:ps,
71-
NotificationsEnqueuer:enqueuer,
69+
Database:db,
70+
Pubsub:ps,
71+
NotificationsEnqueuer:enqueuer,
7272
})
7373
firstUser:=coderdtest.CreateFirstUser(t,client)
7474

7575
constnumOwners=2
76-
barrier:=new(sync.WaitGroup)
77-
barrier.Add(numOwners+1)
76+
constnumRegularUsers=2
77+
ownerBarrier:=new(sync.WaitGroup)
78+
regularBarrier:=new(sync.WaitGroup)
79+
ownerBarrier.Add(numOwners)
80+
regularBarrier.Add(numRegularUsers)
7881
metrics:=notifications.NewMetrics(prometheus.NewRegistry())
7982

8083
eg,runCtx:=errgroup.WithContext(ctx)
8184

8285
// Start owner runners who will receive notifications
83-
runners:=make([]*notifications.Runner,0,numOwners)
86+
ownerRunners:=make([]*notifications.Runner,0,numOwners)
8487
fori:=rangenumOwners {
8588
runnerCfg:= notifications.Config{
8689
User: createusers.Config{
@@ -90,22 +93,47 @@ func TestRun(t *testing.T) {
9093
NotificationTimeout:testutil.WaitLong,
9194
DialTimeout:testutil.WaitLong,
9295
Metrics:metrics,
93-
DialBarrier:barrier,
96+
DialBarrier:ownerBarrier,
9497
}
9598
err:=runnerCfg.Validate()
9699
require.NoError(t,err)
97100

98101
runner:=notifications.NewRunner(client,runnerCfg)
99-
runners=append(runners,runner)
102+
ownerRunners=append(ownerRunners,runner)
100103
eg.Go(func()error {
101104
returnrunner.Run(runCtx,"owner-"+strconv.Itoa(i),io.Discard)
102105
})
103106
}
104107

108+
// Start regular user runners who will maintain websocket connections
109+
regularRunners:=make([]*notifications.Runner,0,numRegularUsers)
110+
fori:=rangenumRegularUsers {
111+
runnerCfg:= notifications.Config{
112+
User: createusers.Config{
113+
OrganizationID:firstUser.OrganizationID,
114+
},
115+
IsOwner:false,
116+
NotificationTimeout:testutil.WaitLong,
117+
DialTimeout:testutil.WaitLong,
118+
Metrics:metrics,
119+
DialBarrier:regularBarrier,
120+
OwnerDialBarrier:ownerBarrier,
121+
}
122+
err:=runnerCfg.Validate()
123+
require.NoError(t,err)
124+
125+
runner:=notifications.NewRunner(client,runnerCfg)
126+
regularRunners=append(regularRunners,runner)
127+
eg.Go(func()error {
128+
returnrunner.Run(runCtx,"regular-"+strconv.Itoa(i),io.Discard)
129+
})
130+
}
131+
105132
// Trigger notifications by creating and deleting a user
106133
eg.Go(func()error {
107-
barrier.Done()
108-
barrier.Wait()
134+
// Wait for all runners to connect
135+
ownerBarrier.Wait()
136+
regularBarrier.Wait()
109137

110138
newUser,err:=client.CreateUserWithOrgs(runCtx, codersdk.CreateUserRequestWithOrgs{
111139
OrganizationIDs: []uuid.UUID{firstUser.OrganizationID},
@@ -128,20 +156,37 @@ func TestRun(t *testing.T) {
128156
require.NoError(t,err,"runner execution should complete successfully")
129157

130158
cleanupEg,cleanupCtx:=errgroup.WithContext(ctx)
131-
fori,runner:=rangerunners {
159+
fori,runner:=rangeownerRunners {
160+
cleanupEg.Go(func()error {
161+
returnrunner.Cleanup(cleanupCtx,"owner-"+strconv.Itoa(i),io.Discard)
162+
})
163+
}
164+
fori,runner:=rangeregularRunners {
132165
cleanupEg.Go(func()error {
133-
returnrunner.Cleanup(cleanupCtx,strconv.Itoa(i),io.Discard)
166+
returnrunner.Cleanup(cleanupCtx,"regular-"+strconv.Itoa(i),io.Discard)
134167
})
135168
}
136169
err=cleanupEg.Wait()
137170
require.NoError(t,err)
138171

139-
// Verify that each runner received both notifications and recorded metrics
140-
for_,runner:=rangerunners {
172+
users,err:=client.Users(ctx, codersdk.UsersRequest{})
173+
require.NoError(t,err)
174+
require.Len(t,users.Users,1)
175+
require.Equal(t,firstUser.UserID,users.Users[0].ID)
176+
177+
// Verify that owner runners received both notifications and recorded metrics
178+
for_,runner:=rangeownerRunners {
141179
runnerMetrics:=runner.GetMetrics()
142180
require.Contains(t,runnerMetrics,notifications.UserCreatedNotificationLatencyMetric)
143181
require.Contains(t,runnerMetrics,notifications.UserDeletedNotificationLatencyMetric)
144182
}
183+
184+
// Verify that regular runners don't have notification metrics
185+
for_,runner:=rangeregularRunners {
186+
runnerMetrics:=runner.GetMetrics()
187+
require.NotContains(t,runnerMetrics,notifications.UserCreatedNotificationLatencyMetric)
188+
require.NotContains(t,runnerMetrics,notifications.UserDeletedNotificationLatencyMetric)
189+
}
145190
}
146191

147192
funcdefaultNotificationsConfig(method database.NotificationMethod) codersdk.NotificationsConfig {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp