Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitef5cb9a

Browse files
committed
feat(cli/exp): add app testing to scaletest workspace-traffic
1 parent5bfbf9f commitef5cb9a

File tree

5 files changed

+291
-22
lines changed

5 files changed

+291
-22
lines changed

‎cli/exp_scaletest.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
"github.com/prometheus/client_golang/prometheus/promhttp"
2323
"go.opentelemetry.io/otel/trace"
24+
"golang.org/x/exp/slices"
2425
"golang.org/x/xerrors"
2526

2627
"cdr.dev/slog"
@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
859860
tickInterval time.Duration
860861
bytesPerTickint64
861862
sshbool
863+
appstring
862864
templatestring
863865

864866
client=&codersdk.Client{}
@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
911913
}
912914
}
913915

916+
appHost,err:=client.AppHost(ctx)
917+
iferr!=nil {
918+
returnxerrors.Errorf("get app host: %w",err)
919+
}
920+
914921
workspaces,err:=getScaletestWorkspaces(inv.Context(),client,template)
915922
iferr!=nil {
916923
returnerr
@@ -949,6 +956,8 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
949956
agentNamestring
950957
name="workspace-traffic"
951958
id=strconv.Itoa(idx)
959+
apps []codersdk.WorkspaceApp
960+
appConfig workspacetraffic.AppConfig
952961
)
953962

954963
for_,res:=rangews.LatestBuild.Resources {
@@ -957,13 +966,34 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
957966
}
958967
agentID=res.Agents[0].ID
959968
agentName=res.Agents[0].Name
969+
apps=res.Agents[0].Apps
960970
}
961971

962972
ifagentID==uuid.Nil {
963973
_,_=fmt.Fprintf(inv.Stderr,"WARN: skipping workspace %s: no agent\n",ws.Name)
964974
continue
965975
}
966976

977+
ifapp!="" {
978+
i:=slices.IndexFunc(apps,func(a codersdk.WorkspaceApp)bool {returna.Slug==app })
979+
ifi==-1 {
980+
returnxerrors.Errorf("app %q not found in workspace %q",app,ws.Name)
981+
}
982+
983+
appConfig= workspacetraffic.AppConfig{
984+
Name:apps[i].Slug,
985+
}
986+
ifapps[i].Subdomain {
987+
ifappHost.Host=="" {
988+
returnxerrors.Errorf("app %q is a subdomain app but no app host is configured",app)
989+
}
990+
991+
appConfig.URL=fmt.Sprintf("%s://%s",client.URL.Scheme,strings.Replace(appHost.Host,"*",apps[i].SubdomainName,1))
992+
}else {
993+
appConfig.URL=fmt.Sprintf("%s/@%s/%s.%s/apps/%s",client.URL.String(),ws.OwnerName,ws.Name,agentName,apps[i].Slug)
994+
}
995+
}
996+
967997
// Setup our workspace agent connection.
968998
config:= workspacetraffic.Config{
969999
AgentID:agentID,
@@ -974,6 +1004,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
9741004
WriteMetrics:metrics.WriteMetrics(ws.OwnerName,ws.Name,agentName),
9751005
SSH:ssh,
9761006
Echo:ssh,
1007+
App:appConfig,
9771008
}
9781009

9791010
iferr:=config.Validate();err!=nil {
@@ -1046,9 +1077,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
10461077
Flag:"ssh",
10471078
Env:"CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
10481079
Default:"",
1049-
Description:"Send traffic over SSH.",
1080+
Description:"Send traffic over SSH, cannot be used with --app.",
10501081
Value:clibase.BoolOf(&ssh),
10511082
},
1083+
{
1084+
Flag:"app",
1085+
Env:"CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
1086+
Default:"",
1087+
Description:"Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
1088+
Value:clibase.StringOf(&app),
1089+
},
10521090
}
10531091

10541092
tracingFlags.attach(&cmd.Options)

‎scaletest/workspacetraffic/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
// to true will double the amount of data read from the agent for
3232
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
3333
Echobool`json:"echo"`
34+
35+
AppAppConfig`json:"app"`
3436
}
3537

3638
func (cConfig)Validate()error {
@@ -50,5 +52,14 @@ func (c Config) Validate() error {
5052
returnxerrors.Errorf("validate tick_interval: must be greater than zero")
5153
}
5254

55+
ifc.SSH&&c.App.Name!="" {
56+
returnxerrors.Errorf("validate ssh: must be false when app is used")
57+
}
58+
5359
returnnil
5460
}
61+
62+
typeAppConfigstruct {
63+
Namestring`json:"name"`
64+
URLstring`json:"url"`
65+
}

‎scaletest/workspacetraffic/conn.go

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import (
55
"encoding/json"
66
"errors"
77
"io"
8+
"net"
9+
"net/http"
810
"sync"
911
"time"
1012

13+
"nhooyr.io/websocket"
14+
1115
"github.com/coder/coder/v2/codersdk"
1216

1317
"github.com/google/uuid"
@@ -27,9 +31,9 @@ const (
2731
//
2832
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes"
2933
//
30-
// Since we can't control fragmentation/buffer sizes, wekeep it simple and
31-
//match the conservative payload size used by agent/reconnectingpty (1024).
32-
rptyJSONMaxDataSize=1024
34+
// Since we can't control fragmentation/buffer sizes, weuse a conservative
35+
//value. Derived from 1024 * 9 * 3 = <28KB.
36+
rptyJSONMaxDataSize=1024*9
3337
)
3438

3539
funcconnectRPTY(ctx context.Context,client*codersdk.Client,agentID,reconnect uuid.UUID,cmdstring) (*countReadWriteCloser,error) {
@@ -260,3 +264,114 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
260264
func (w*wrappedSSHConn)Write(p []byte) (nint,errerror) {
261265
returnw.stdin.Write(p)
262266
}
267+
268+
funcappClientConn(ctx context.Context,client*codersdk.Client,urlstring) (*countReadWriteCloser,error) {
269+
headers:= http.Header{}
270+
tokenHeader:=codersdk.SessionTokenHeader
271+
ifclient.SessionTokenHeader!="" {
272+
tokenHeader=client.SessionTokenHeader
273+
}
274+
headers.Set(tokenHeader,client.SessionToken())
275+
276+
//nolint:bodyclose // The websocket conn manages the body.
277+
conn,_,err:=websocket.Dial(ctx,url,&websocket.DialOptions{
278+
HTTPClient:client.HTTPClient,
279+
HTTPHeader:headers,
280+
})
281+
iferr!=nil {
282+
returnnil,xerrors.Errorf("websocket dial: %w",err)
283+
}
284+
285+
netConn:=websocketNetConn(conn,websocket.MessageBinary)
286+
287+
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
288+
crw:=&countReadWriteCloser{rwc:netConn}
289+
returncrw,nil
290+
}
291+
292+
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
293+
// is called if a read or write error is encountered.
294+
typewsNetConnstruct {
295+
net.Conn
296+
297+
writeMu sync.Mutex
298+
readMu sync.Mutex
299+
300+
cancel context.CancelFunc
301+
closeMu sync.Mutex
302+
closedbool
303+
}
304+
305+
func (c*wsNetConn)Read(b []byte) (nint,errerror) {
306+
c.readMu.Lock()
307+
deferc.readMu.Unlock()
308+
ifc.isClosed() {
309+
return0,io.EOF
310+
}
311+
n,err=c.Conn.Read(b)
312+
iferr!=nil {
313+
ifc.isClosed() {
314+
returnn,io.EOF
315+
}
316+
returnn,err
317+
}
318+
returnn,nil
319+
}
320+
321+
func (c*wsNetConn)Write(b []byte) (nint,errerror) {
322+
c.writeMu.Lock()
323+
deferc.writeMu.Unlock()
324+
ifc.isClosed() {
325+
return0,io.EOF
326+
}
327+
328+
forlen(b)>0 {
329+
bb:=b
330+
iflen(bb)>rptyJSONMaxDataSize {
331+
bb=b[:rptyJSONMaxDataSize]
332+
}
333+
b=b[len(bb):]
334+
nn,err:=c.Conn.Write(bb)
335+
n+=nn
336+
iferr!=nil {
337+
ifc.isClosed() {
338+
returnn,io.EOF
339+
}
340+
returnn,err
341+
}
342+
}
343+
returnn,nil
344+
}
345+
346+
func (c*wsNetConn)isClosed()bool {
347+
c.closeMu.Lock()
348+
deferc.closeMu.Unlock()
349+
returnc.closed
350+
}
351+
352+
func (c*wsNetConn)Close()error {
353+
c.closeMu.Lock()
354+
closed:=c.closed
355+
c.closed=true
356+
c.closeMu.Unlock()
357+
358+
ifclosed {
359+
returnnil
360+
}
361+
362+
c.cancel()
363+
364+
c.readMu.Lock()
365+
deferc.readMu.Unlock()
366+
c.writeMu.Lock()
367+
deferc.writeMu.Unlock()
368+
369+
_=c.Conn.Close()
370+
returnnil
371+
}
372+
373+
funcwebsocketNetConn(conn*websocket.Conn,msgType websocket.MessageType) net.Conn {
374+
ctx,cancel:=context.WithCancel(context.Background())
375+
nc:=websocket.NetConn(ctx,conn,msgType)
376+
return&wsNetConn{cancel:cancel,Conn:nc}
377+
}

‎scaletest/workspacetraffic/run.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
9191
command:=fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none",output,bytesPerTick)
9292

9393
varconn*countReadWriteCloser
94-
ifr.cfg.SSH {
94+
switch {
95+
caser.cfg.App.Name!="":
96+
logger.Info(ctx,"sending traffic to workspace app",slog.F("app",r.cfg.App.Name))
97+
conn,err=appClientConn(ctx,r.client,r.cfg.App.URL)
98+
iferr!=nil {
99+
logger.Error(ctx,"connect to workspace app",slog.Error(err))
100+
returnxerrors.Errorf("connect to workspace app: %w",err)
101+
}
102+
103+
caser.cfg.SSH:
95104
logger.Info(ctx,"connecting to workspace agent",slog.F("method","ssh"))
96105
// If echo is enabled, disable PTY to avoid double echo and
97106
// reduce CPU usage.
@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
101110
logger.Error(ctx,"connect to workspace agent via ssh",slog.Error(err))
102111
returnxerrors.Errorf("connect to workspace via ssh: %w",err)
103112
}
104-
}else {
113+
114+
default:
105115
logger.Info(ctx,"connecting to workspace agent",slog.F("method","reconnectingpty"))
106116
conn,err=connectRPTY(ctx,r.client,agentID,reconnect,command)
107117
iferr!=nil {
@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
114124
closeConn:=func()error {
115125
closeOnce.Do(func() {
116126
closeErr=conn.Close()
117-
iferr!=nil {
118-
logger.Error(ctx,"close agent connection",slog.Error(err))
127+
ifcloseErr!=nil {
128+
logger.Error(ctx,"close agent connection",slog.Error(closeErr))
119129
}
120130
})
121131
returncloseErr
@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
142152

143153
// Read until connection is closed.
144154
gofunc() {
145-
rch:=rch// Shadowed for reassignment.
146155
logger.Debug(ctx,"reading from agent")
147156
rch<-drain(conn)
148157
logger.Debug(ctx,"done reading from agent")
@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
151160

152161
// Write random data to the conn every tick.
153162
gofunc() {
154-
wch:=wch// Shadowed for reassignment.
155163
logger.Debug(ctx,"writing to agent")
156164
wch<-writeRandomData(conn,bytesPerTick,tick.C)
157165
logger.Debug(ctx,"done writing to agent")
@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
160168

161169
varwaitCloseTimeoutCh<-chanstruct{}
162170
deadlineCtxCh:=deadlineCtx.Done()
171+
wchRef,rchRef:=wch,rch
163172
for {
164-
ifwch==nil&&rch==nil {
173+
ifwchRef==nil&&rchRef==nil {
165174
returnnil
166175
}
167176

168177
select {
169178
case<-waitCloseTimeoutCh:
170179
logger.Warn(ctx,"timed out waiting for read/write to complete",
171-
slog.F("write_done",wch==nil),
172-
slog.F("read_done",rch==nil),
180+
slog.F("write_done",wchRef==nil),
181+
slog.F("read_done",rchRef==nil),
173182
)
174183
returnxerrors.Errorf("timed out waiting for read/write to complete: %w",ctx.Err())
175184
case<-deadlineCtxCh:
@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
181190
waitCtx,cancel:=context.WithTimeout(context.Background(),waitCloseTimeout)
182191
defercancel()//nolint:revive // Only called once.
183192
waitCloseTimeoutCh=waitCtx.Done()
184-
caseerr=<-wch:
193+
caseerr=<-wchRef:
185194
iferr!=nil {
186195
returnxerrors.Errorf("write to agent: %w",err)
187196
}
188-
wch=nil
189-
caseerr=<-rch:
197+
wchRef=nil
198+
caseerr=<-rchRef:
190199
iferr!=nil {
191200
returnxerrors.Errorf("read from agent: %w",err)
192201
}
193-
rch=nil
202+
rchRef=nil
194203
}
195204
}
196205
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp