Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit73e6bbf

Browse files
authored
feat(cli/exp): add app testing to scaletest workspace-traffic (#11633)
1 parent1f63a11 commit73e6bbf

File tree

5 files changed

+313
-34
lines changed

5 files changed

+313
-34
lines changed

‎cli/exp_scaletest.go

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
"github.com/prometheus/client_golang/prometheus/promhttp"
2323
"go.opentelemetry.io/otel/trace"
24+
"golang.org/x/exp/slices"
2425
"golang.org/x/xerrors"
2526

2627
"cdr.dev/slog"
@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
859860
tickInterval time.Duration
860861
bytesPerTickint64
861862
sshbool
863+
appstring
862864
templatestring
863865

864866
client=&codersdk.Client{}
@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
911913
}
912914
}
913915

916+
appHost,err:=client.AppHost(ctx)
917+
iferr!=nil {
918+
returnxerrors.Errorf("get app host: %w",err)
919+
}
920+
914921
workspaces,err:=getScaletestWorkspaces(inv.Context(),client,template)
915922
iferr!=nil {
916923
returnerr
@@ -945,35 +952,39 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
945952
th:=harness.NewTestHarness(strategy.toStrategy(),cleanupStrategy.toStrategy())
946953
foridx,ws:=rangeworkspaces {
947954
var (
948-
agentID uuid.UUID
949-
agentNamestring
950-
name="workspace-traffic"
951-
id=strconv.Itoa(idx)
955+
agent codersdk.WorkspaceAgent
956+
name="workspace-traffic"
957+
id=strconv.Itoa(idx)
952958
)
953959

954960
for_,res:=rangews.LatestBuild.Resources {
955961
iflen(res.Agents)==0 {
956962
continue
957963
}
958-
agentID=res.Agents[0].ID
959-
agentName=res.Agents[0].Name
964+
agent=res.Agents[0]
960965
}
961966

962-
ifagentID==uuid.Nil {
967+
ifagent.ID==uuid.Nil {
963968
_,_=fmt.Fprintf(inv.Stderr,"WARN: skipping workspace %s: no agent\n",ws.Name)
964969
continue
965970
}
966971

972+
appConfig,err:=createWorkspaceAppConfig(client,appHost.Host,app,ws,agent)
973+
iferr!=nil {
974+
returnxerrors.Errorf("configure workspace app: %w",err)
975+
}
976+
967977
// Setup our workspace agent connection.
968978
config:= workspacetraffic.Config{
969-
AgentID:agentID,
979+
AgentID:agent.ID,
970980
BytesPerTick:bytesPerTick,
971981
Duration:strategy.timeout,
972982
TickInterval:tickInterval,
973-
ReadMetrics:metrics.ReadMetrics(ws.OwnerName,ws.Name,agentName),
974-
WriteMetrics:metrics.WriteMetrics(ws.OwnerName,ws.Name,agentName),
983+
ReadMetrics:metrics.ReadMetrics(ws.OwnerName,ws.Name,agent.Name),
984+
WriteMetrics:metrics.WriteMetrics(ws.OwnerName,ws.Name,agent.Name),
975985
SSH:ssh,
976986
Echo:ssh,
987+
App:appConfig,
977988
}
978989

979990
iferr:=config.Validate();err!=nil {
@@ -1046,9 +1057,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
10461057
Flag:"ssh",
10471058
Env:"CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
10481059
Default:"",
1049-
Description:"Send traffic over SSH.",
1060+
Description:"Send traffic over SSH, cannot be used with --app.",
10501061
Value:clibase.BoolOf(&ssh),
10511062
},
1063+
{
1064+
Flag:"app",
1065+
Env:"CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
1066+
Default:"",
1067+
Description:"Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
1068+
Value:clibase.StringOf(&app),
1069+
},
10521070
}
10531071

10541072
tracingFlags.attach(&cmd.Options)
@@ -1411,3 +1429,29 @@ func parseTemplate(ctx context.Context, client *codersdk.Client, organizationIDs
14111429

14121430
returntpl,nil
14131431
}
1432+
1433+
funccreateWorkspaceAppConfig(client*codersdk.Client,appHost,appstring,workspace codersdk.Workspace,agent codersdk.WorkspaceAgent) (workspacetraffic.AppConfig,error) {
1434+
ifapp=="" {
1435+
return workspacetraffic.AppConfig{},nil
1436+
}
1437+
1438+
i:=slices.IndexFunc(agent.Apps,func(a codersdk.WorkspaceApp)bool {returna.Slug==app })
1439+
ifi==-1 {
1440+
return workspacetraffic.AppConfig{},xerrors.Errorf("app %q not found in workspace %q",app,workspace.Name)
1441+
}
1442+
1443+
c:= workspacetraffic.AppConfig{
1444+
Name:agent.Apps[i].Slug,
1445+
}
1446+
ifagent.Apps[i].Subdomain {
1447+
ifappHost=="" {
1448+
return workspacetraffic.AppConfig{},xerrors.Errorf("app %q is a subdomain app but no app host is configured",app)
1449+
}
1450+
1451+
c.URL=fmt.Sprintf("%s://%s",client.URL.Scheme,strings.Replace(appHost,"*",agent.Apps[i].SubdomainName,1))
1452+
}else {
1453+
c.URL=fmt.Sprintf("%s/@%s/%s.%s/apps/%s",client.URL.String(),workspace.OwnerName,workspace.Name,agent.Name,agent.Apps[i].Slug)
1454+
}
1455+
1456+
returnc,nil
1457+
}

‎scaletest/workspacetraffic/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type Config struct {
3131
// to true will double the amount of data read from the agent for
3232
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
3333
Echobool`json:"echo"`
34+
35+
AppAppConfig`json:"app"`
3436
}
3537

3638
func (cConfig)Validate()error {
@@ -50,5 +52,14 @@ func (c Config) Validate() error {
5052
returnxerrors.Errorf("validate tick_interval: must be greater than zero")
5153
}
5254

55+
ifc.SSH&&c.App.Name!="" {
56+
returnxerrors.Errorf("validate ssh: must be false when app is used")
57+
}
58+
5359
returnnil
5460
}
61+
62+
typeAppConfigstruct {
63+
Namestring`json:"name"`
64+
URLstring`json:"url"`
65+
}

‎scaletest/workspacetraffic/conn.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import (
55
"encoding/json"
66
"errors"
77
"io"
8+
"net"
9+
"net/http"
810
"sync"
911
"time"
1012

13+
"nhooyr.io/websocket"
14+
1115
"github.com/coder/coder/v2/codersdk"
1216

1317
"github.com/google/uuid"
@@ -260,3 +264,118 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
260264
func (w*wrappedSSHConn)Write(p []byte) (nint,errerror) {
261265
returnw.stdin.Write(p)
262266
}
267+
268+
funcappClientConn(ctx context.Context,client*codersdk.Client,urlstring) (*countReadWriteCloser,error) {
269+
headers:= http.Header{}
270+
tokenHeader:=codersdk.SessionTokenHeader
271+
ifclient.SessionTokenHeader!="" {
272+
tokenHeader=client.SessionTokenHeader
273+
}
274+
headers.Set(tokenHeader,client.SessionToken())
275+
276+
//nolint:bodyclose // The websocket conn manages the body.
277+
conn,_,err:=websocket.Dial(ctx,url,&websocket.DialOptions{
278+
HTTPClient:client.HTTPClient,
279+
HTTPHeader:headers,
280+
})
281+
iferr!=nil {
282+
returnnil,xerrors.Errorf("websocket dial: %w",err)
283+
}
284+
285+
netConn:=websocketNetConn(conn,websocket.MessageBinary)
286+
287+
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
288+
crw:=&countReadWriteCloser{rwc:netConn}
289+
returncrw,nil
290+
}
291+
292+
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
293+
// is called if a read or write error is encountered.
294+
typewsNetConnstruct {
295+
net.Conn
296+
297+
writeMu sync.Mutex
298+
readMu sync.Mutex
299+
300+
cancel context.CancelFunc
301+
closeMu sync.Mutex
302+
closedbool
303+
}
304+
305+
func (c*wsNetConn)Read(b []byte) (nint,errerror) {
306+
c.readMu.Lock()
307+
deferc.readMu.Unlock()
308+
ifc.isClosed() {
309+
return0,io.EOF
310+
}
311+
n,err=c.Conn.Read(b)
312+
iferr!=nil {
313+
ifc.isClosed() {
314+
returnn,io.EOF
315+
}
316+
returnn,err
317+
}
318+
returnn,nil
319+
}
320+
321+
func (c*wsNetConn)Write(b []byte) (nint,errerror) {
322+
c.writeMu.Lock()
323+
deferc.writeMu.Unlock()
324+
ifc.isClosed() {
325+
return0,io.EOF
326+
}
327+
328+
forlen(b)>0 {
329+
bb:=b
330+
iflen(bb)>rptyJSONMaxDataSize {
331+
bb=b[:rptyJSONMaxDataSize]
332+
}
333+
b=b[len(bb):]
334+
nn,err:=c.Conn.Write(bb)
335+
n+=nn
336+
iferr!=nil {
337+
ifc.isClosed() {
338+
returnn,io.EOF
339+
}
340+
returnn,err
341+
}
342+
}
343+
returnn,nil
344+
}
345+
346+
func (c*wsNetConn)isClosed()bool {
347+
c.closeMu.Lock()
348+
deferc.closeMu.Unlock()
349+
returnc.closed
350+
}
351+
352+
func (c*wsNetConn)Close()error {
353+
c.closeMu.Lock()
354+
closed:=c.closed
355+
c.closed=true
356+
c.closeMu.Unlock()
357+
358+
ifclosed {
359+
returnnil
360+
}
361+
362+
// Cancel before acquiring locks to speed up teardown.
363+
c.cancel()
364+
365+
c.readMu.Lock()
366+
deferc.readMu.Unlock()
367+
c.writeMu.Lock()
368+
deferc.writeMu.Unlock()
369+
370+
_=c.Conn.Close()
371+
returnnil
372+
}
373+
374+
funcwebsocketNetConn(conn*websocket.Conn,msgType websocket.MessageType) net.Conn {
375+
// Since `websocket.NetConn` binds to a context for the lifetime of the
376+
// connection, we need to create a new context that can be canceled when
377+
// the connection is closed.
378+
ctx,cancel:=context.WithCancel(context.Background())
379+
nc:=websocket.NetConn(ctx,conn,msgType)
380+
return&wsNetConn{cancel:cancel,Conn:nc}
381+
}

‎scaletest/workspacetraffic/run.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
9191
command:=fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none",output,bytesPerTick)
9292

9393
varconn*countReadWriteCloser
94-
ifr.cfg.SSH {
94+
switch {
95+
caser.cfg.App.Name!="":
96+
logger.Info(ctx,"sending traffic to workspace app",slog.F("app",r.cfg.App.Name))
97+
conn,err=appClientConn(ctx,r.client,r.cfg.App.URL)
98+
iferr!=nil {
99+
logger.Error(ctx,"connect to workspace app",slog.Error(err))
100+
returnxerrors.Errorf("connect to workspace app: %w",err)
101+
}
102+
103+
caser.cfg.SSH:
95104
logger.Info(ctx,"connecting to workspace agent",slog.F("method","ssh"))
96105
// If echo is enabled, disable PTY to avoid double echo and
97106
// reduce CPU usage.
@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
101110
logger.Error(ctx,"connect to workspace agent via ssh",slog.Error(err))
102111
returnxerrors.Errorf("connect to workspace via ssh: %w",err)
103112
}
104-
}else {
113+
114+
default:
105115
logger.Info(ctx,"connecting to workspace agent",slog.F("method","reconnectingpty"))
106116
conn,err=connectRPTY(ctx,r.client,agentID,reconnect,command)
107117
iferr!=nil {
@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
114124
closeConn:=func()error {
115125
closeOnce.Do(func() {
116126
closeErr=conn.Close()
117-
iferr!=nil {
118-
logger.Error(ctx,"close agent connection",slog.Error(err))
127+
ifcloseErr!=nil {
128+
logger.Error(ctx,"close agent connection",slog.Error(closeErr))
119129
}
120130
})
121131
returncloseErr
@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
142152

143153
// Read until connection is closed.
144154
gofunc() {
145-
rch:=rch// Shadowed for reassignment.
146155
logger.Debug(ctx,"reading from agent")
147156
rch<-drain(conn)
148157
logger.Debug(ctx,"done reading from agent")
@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
151160

152161
// Write random data to the conn every tick.
153162
gofunc() {
154-
wch:=wch// Shadowed for reassignment.
155163
logger.Debug(ctx,"writing to agent")
156164
wch<-writeRandomData(conn,bytesPerTick,tick.C)
157165
logger.Debug(ctx,"done writing to agent")
@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
160168

161169
varwaitCloseTimeoutCh<-chanstruct{}
162170
deadlineCtxCh:=deadlineCtx.Done()
171+
wchRef,rchRef:=wch,rch
163172
for {
164-
ifwch==nil&&rch==nil {
173+
ifwchRef==nil&&rchRef==nil {
165174
returnnil
166175
}
167176

168177
select {
169178
case<-waitCloseTimeoutCh:
170179
logger.Warn(ctx,"timed out waiting for read/write to complete",
171-
slog.F("write_done",wch==nil),
172-
slog.F("read_done",rch==nil),
180+
slog.F("write_done",wchRef==nil),
181+
slog.F("read_done",rchRef==nil),
173182
)
174183
returnxerrors.Errorf("timed out waiting for read/write to complete: %w",ctx.Err())
175184
case<-deadlineCtxCh:
@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
181190
waitCtx,cancel:=context.WithTimeout(context.Background(),waitCloseTimeout)
182191
defercancel()//nolint:revive // Only called once.
183192
waitCloseTimeoutCh=waitCtx.Done()
184-
caseerr=<-wch:
193+
caseerr=<-wchRef:
185194
iferr!=nil {
186195
returnxerrors.Errorf("write to agent: %w",err)
187196
}
188-
wch=nil
189-
caseerr=<-rch:
197+
wchRef=nil
198+
caseerr=<-rchRef:
190199
iferr!=nil {
191200
returnxerrors.Errorf("read from agent: %w",err)
192201
}
193-
rch=nil
202+
rchRef=nil
194203
}
195204
}
196205
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp