@@ -1778,8 +1778,10 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
1778
1778
1779
1779
_ ,_ = fmt .Fprintln (inv .Stderr ,"Creating users..." )
1780
1780
1781
- dialBarrier := & sync.WaitGroup {}
1782
- dialBarrier .Add (int (userCount ))
1781
+ ownerDialBarrier := & sync.WaitGroup {}
1782
+ regularDialBarrier := & sync.WaitGroup {}
1783
+ ownerDialBarrier .Add (int (ownerUserCount ))
1784
+ regularDialBarrier .Add (int (regularUserCount ))
1783
1785
1784
1786
configs := make ([]notifications.Config ,0 ,userCount )
1785
1787
for range ownerUserCount {
@@ -1790,7 +1792,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
1790
1792
IsOwner :true ,
1791
1793
NotificationTimeout :notificationTimeout ,
1792
1794
DialTimeout :dialTimeout ,
1793
- DialBarrier :dialBarrier ,
1795
+ DialBarrier :ownerDialBarrier ,
1794
1796
Metrics :metrics ,
1795
1797
}
1796
1798
if err := config .Validate ();err != nil {
@@ -1806,7 +1808,8 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
1806
1808
IsOwner :false ,
1807
1809
NotificationTimeout :notificationTimeout ,
1808
1810
DialTimeout :dialTimeout ,
1809
- DialBarrier :dialBarrier ,
1811
+ DialBarrier :regularDialBarrier ,
1812
+ OwnerDialBarrier :ownerDialBarrier ,
1810
1813
Metrics :metrics ,
1811
1814
}
1812
1815
if err := config .Validate ();err != nil {
@@ -1816,31 +1819,55 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
1816
1819
}
1817
1820
1818
1821
go func () {
1819
- logger .Info (ctx ,"waiting forall users to connect" )
1822
+ logger .Info (ctx ,"waiting forowner users to connect" )
1820
1823
1821
- //Create timeout context forthe entire trigger operation
1822
- waitCtx ,cancel := context .WithTimeout (ctx ,dialTimeout + 30 * time .Second )
1824
+ //Wait forowner users to connect
1825
+ ownerWaitCtx ,cancel := context .WithTimeout (ctx ,dialTimeout + 30 * time .Second )
1823
1826
defer cancel ()
1824
1827
1825
- // Wait for all runners to reach the barrier
1826
- done := make (chan struct {})
1828
+ ownerDone := make (chan struct {})
1827
1829
go func () {
1828
- dialBarrier .Wait ()
1829
- close (done )
1830
+ ownerDialBarrier .Wait ()
1831
+ close (ownerDone )
1830
1832
}()
1831
1833
1832
1834
select {
1833
- case <- done :
1834
- logger .Info (ctx ,"all users connected" )
1835
- case <- waitCtx .Done ():
1836
- if waitCtx .Err ()== context .DeadlineExceeded {
1837
- logger .Error (ctx ,"timeout waiting forall users to connect" )
1835
+ case <- ownerDone :
1836
+ logger .Info (ctx ,"allowner users connected" )
1837
+ case <- ownerWaitCtx .Done ():
1838
+ if ownerWaitCtx .Err ()== context .DeadlineExceeded {
1839
+ logger .Error (ctx ,"timeout waiting forowner users to connect" )
1838
1840
}else {
1839
- logger .Info (ctx ,"context canceled while waiting for users" )
1841
+ logger .Info (ctx ,"context canceled while waiting forowner users" )
1840
1842
}
1841
1843
return
1842
1844
}
1843
1845
1846
+ // Wait for regular users to connect
1847
+ logger .Info (ctx ,"waiting for regular users to connect" )
1848
+ regularWaitCtx ,cancel := context .WithTimeout (ctx ,dialTimeout + 30 * time .Second )
1849
+ defer cancel ()
1850
+
1851
+ regularDone := make (chan struct {})
1852
+ go func () {
1853
+ regularDialBarrier .Wait ()
1854
+ close (regularDone )
1855
+ }()
1856
+
1857
+ select {
1858
+ case <- regularDone :
1859
+ logger .Info (ctx ,"all regular users connected" )
1860
+ case <- regularWaitCtx .Done ():
1861
+ if regularWaitCtx .Err ()== context .DeadlineExceeded {
1862
+ logger .Error (ctx ,"timeout waiting for regular users to connect" )
1863
+ }else {
1864
+ logger .Info (ctx ,"context canceled while waiting for regular users" )
1865
+ }
1866
+ return
1867
+ }
1868
+
1869
+ logger .Info (ctx ,"all users connected, triggering notifications" )
1870
+
1844
1871
const (
1845
1872
triggerUsername = "scaletest-trigger-user"
1846
1873
triggerEmail = "scaletest-trigger@example.com"