- Notifications
You must be signed in to change notification settings - Fork929
feat(cli/exp): add app testing to scaletest workspace-traffic#11633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes from1 commit
ef5cb9a
fd6b18d
02c14e6
20e93b9
f52b492
f6e399c
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
- Loading branch information
Uh oh!
There was an error while loading.Please reload this page.
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -21,6 +21,7 @@ import ( | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
"go.opentelemetry.io/otel/trace" | ||
"golang.org/x/exp/slices" | ||
"golang.org/x/xerrors" | ||
"cdr.dev/slog" | ||
@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
tickInterval time.Duration | ||
bytesPerTick int64 | ||
ssh bool | ||
app string | ||
template string | ||
client = &codersdk.Client{} | ||
@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
} | ||
} | ||
appHost, err := client.AppHost(ctx) | ||
if err != nil { | ||
return xerrors.Errorf("get app host: %w", err) | ||
} | ||
workspaces, err := getScaletestWorkspaces(inv.Context(), client, template) | ||
if err != nil { | ||
return err | ||
@@ -949,6 +956,8 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
agentName string | ||
name = "workspace-traffic" | ||
id = strconv.Itoa(idx) | ||
apps []codersdk.WorkspaceApp | ||
appConfig workspacetraffic.AppConfig | ||
) | ||
for _, res := range ws.LatestBuild.Resources { | ||
@@ -957,13 +966,34 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
} | ||
agentID = res.Agents[0].ID | ||
agentName = res.Agents[0].Name | ||
apps = res.Agents[0].Apps | ||
} | ||
if agentID == uuid.Nil { | ||
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name) | ||
continue | ||
} | ||
if app != "" { | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
i := slices.IndexFunc(apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app }) | ||
if i == -1 { | ||
return xerrors.Errorf("app %q not found in workspace %q", app, ws.Name) | ||
} | ||
appConfig = workspacetraffic.AppConfig{ | ||
Name: apps[i].Slug, | ||
} | ||
if apps[i].Subdomain { | ||
if appHost.Host == "" { | ||
return xerrors.Errorf("app %q is a subdomain app but no app host is configured", app) | ||
} | ||
appConfig.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost.Host, "*", apps[i].SubdomainName, 1)) | ||
} else { | ||
appConfig.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), ws.OwnerName, ws.Name, agentName, apps[i].Slug) | ||
} | ||
} | ||
// Setup our workspace agent connection. | ||
config := workspacetraffic.Config{ | ||
AgentID: agentID, | ||
@@ -974,6 +1004,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name, agentName), | ||
SSH: ssh, | ||
Echo: ssh, | ||
App: appConfig, | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
} | ||
if err := config.Validate(); err != nil { | ||
@@ -1046,9 +1077,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd { | ||
Flag: "ssh", | ||
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH", | ||
Default: "", | ||
Description: "Send traffic over SSH, cannot be used with --app.", | ||
Value: clibase.BoolOf(&ssh), | ||
}, | ||
{ | ||
Flag: "app", | ||
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP", | ||
Default: "", | ||
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.", | ||
Value: clibase.StringOf(&app), | ||
}, | ||
} | ||
tracingFlags.attach(&cmd.Options) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -5,9 +5,13 @@ import ( | ||
"encoding/json" | ||
"errors" | ||
"io" | ||
"net" | ||
"net/http" | ||
"sync" | ||
"time" | ||
"nhooyr.io/websocket" | ||
"github.com/coder/coder/v2/codersdk" | ||
"github.com/google/uuid" | ||
@@ -27,9 +31,9 @@ const ( | ||
// | ||
// failed to write frame: WebSocket closed: received close frame: status = StatusMessageTooBig and reason = "read limited at 32769 bytes" | ||
// | ||
// Since we can't control fragmentation/buffer sizes, weuse a conservative | ||
//value. Derived from 1024 * 9 * 3 = <28KB. | ||
rptyJSONMaxDataSize = 1024 * 9 | ||
) | ||
func connectRPTY(ctx context.Context, client *codersdk.Client, agentID, reconnect uuid.UUID, cmd string) (*countReadWriteCloser, error) { | ||
@@ -260,3 +264,114 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) { | ||
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) { | ||
return w.stdin.Write(p) | ||
} | ||
func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) { | ||
headers := http.Header{} | ||
tokenHeader := codersdk.SessionTokenHeader | ||
if client.SessionTokenHeader != "" { | ||
tokenHeader = client.SessionTokenHeader | ||
} | ||
headers.Set(tokenHeader, client.SessionToken()) | ||
//nolint:bodyclose // The websocket conn manages the body. | ||
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{ | ||
HTTPClient: client.HTTPClient, | ||
HTTPHeader: headers, | ||
}) | ||
if err != nil { | ||
return nil, xerrors.Errorf("websocket dial: %w", err) | ||
} | ||
netConn := websocketNetConn(conn, websocket.MessageBinary) | ||
// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd. | ||
crw := &countReadWriteCloser{rwc: netConn} | ||
return crw, nil | ||
} | ||
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func | ||
// is called if a read or write error is encountered. | ||
type wsNetConn struct { | ||
net.Conn | ||
writeMu sync.Mutex | ||
readMu sync.Mutex | ||
cancel context.CancelFunc | ||
closeMu sync.Mutex | ||
closed bool | ||
} | ||
func (c *wsNetConn) Read(b []byte) (n int, err error) { | ||
c.readMu.Lock() | ||
defer c.readMu.Unlock() | ||
if c.isClosed() { | ||
return 0, io.EOF | ||
} | ||
n, err = c.Conn.Read(b) | ||
if err != nil { | ||
if c.isClosed() { | ||
return n, io.EOF | ||
} | ||
return n, err | ||
} | ||
return n, nil | ||
} | ||
func (c *wsNetConn) Write(b []byte) (n int, err error) { | ||
c.writeMu.Lock() | ||
defer c.writeMu.Unlock() | ||
if c.isClosed() { | ||
return 0, io.EOF | ||
} | ||
for len(b) > 0 { | ||
bb := b | ||
if len(bb) > rptyJSONMaxDataSize { | ||
bb = b[:rptyJSONMaxDataSize] | ||
} | ||
b = b[len(bb):] | ||
nn, err := c.Conn.Write(bb) | ||
n += nn | ||
if err != nil { | ||
if c.isClosed() { | ||
return n, io.EOF | ||
} | ||
return n, err | ||
} | ||
} | ||
return n, nil | ||
} | ||
func (c *wsNetConn) isClosed() bool { | ||
c.closeMu.Lock() | ||
defer c.closeMu.Unlock() | ||
return c.closed | ||
} | ||
func (c *wsNetConn) Close() error { | ||
c.closeMu.Lock() | ||
closed := c.closed | ||
c.closed = true | ||
c.closeMu.Unlock() | ||
if closed { | ||
return nil | ||
} | ||
c.cancel() | ||
c.readMu.Lock() | ||
defer c.readMu.Unlock() | ||
c.writeMu.Lock() | ||
defer c.writeMu.Unlock() | ||
_ = c.Conn.Close() | ||
return nil | ||
} | ||
func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
nc := websocket.NetConn(ctx, conn, msgType) | ||
return &wsNetConn{cancel: cancel, Conn: nc} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick) | ||
var conn *countReadWriteCloser | ||
switch { | ||
case r.cfg.App.Name != "": | ||
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name)) | ||
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL) | ||
if err != nil { | ||
logger.Error(ctx, "connect to workspace app", slog.Error(err)) | ||
return xerrors.Errorf("connect to workspace app: %w", err) | ||
} | ||
case r.cfg.SSH: | ||
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh")) | ||
// If echo is enabled, disable PTY to avoid double echo and | ||
// reduce CPU usage. | ||
@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err)) | ||
return xerrors.Errorf("connect to workspace via ssh: %w", err) | ||
} | ||
default: | ||
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty")) | ||
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command) | ||
if err != nil { | ||
@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
closeConn := func() error { | ||
closeOnce.Do(func() { | ||
closeErr = conn.Close() | ||
ifcloseErr != nil { | ||
logger.Error(ctx, "close agent connection", slog.Error(closeErr)) | ||
} | ||
}) | ||
return closeErr | ||
@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
// Read until connection is closed. | ||
go func() { | ||
logger.Debug(ctx, "reading from agent") | ||
rch <- drain(conn) | ||
logger.Debug(ctx, "done reading from agent") | ||
@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
// Write random data to the conn every tick. | ||
go func() { | ||
logger.Debug(ctx, "writing to agent") | ||
wch <- writeRandomData(conn, bytesPerTick, tick.C) | ||
logger.Debug(ctx, "done writing to agent") | ||
@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
var waitCloseTimeoutCh <-chan struct{} | ||
deadlineCtxCh := deadlineCtx.Done() | ||
wchRef, rchRef := wch, rch | ||
mafredri marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
for { | ||
ifwchRef == nil &&rchRef == nil { | ||
return nil | ||
} | ||
select { | ||
case <-waitCloseTimeoutCh: | ||
logger.Warn(ctx, "timed out waiting for read/write to complete", | ||
slog.F("write_done",wchRef == nil), | ||
slog.F("read_done",rchRef == nil), | ||
) | ||
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err()) | ||
case <-deadlineCtxCh: | ||
@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error) | ||
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout) | ||
defer cancel() //nolint:revive // Only called once. | ||
waitCloseTimeoutCh = waitCtx.Done() | ||
case err = <-wchRef: | ||
if err != nil { | ||
return xerrors.Errorf("write to agent: %w", err) | ||
} | ||
wchRef = nil | ||
case err = <-rchRef: | ||
if err != nil { | ||
return xerrors.Errorf("read from agent: %w", err) | ||
} | ||
rchRef = nil | ||
} | ||
} | ||
} | ||
Uh oh!
There was an error while loading.Please reload this page.