Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(cli/exp): add app testing to scaletest workspace-traffic#11633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
mafredri merged 6 commits intomainfrommafredri/scaletest-workspace-app-traffic
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletionscli/exp_scaletest.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"

"cdr.dev/slog"
Expand DownExpand Up@@ -859,6 +860,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
tickInterval time.Duration
bytesPerTick int64
ssh bool
app string
template string

client = &codersdk.Client{}
Expand DownExpand Up@@ -911,6 +913,11 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
}
}

appHost, err := client.AppHost(ctx)
if err != nil {
return xerrors.Errorf("get app host: %w", err)
}

workspaces, err := getScaletestWorkspaces(inv.Context(), client, template)
if err != nil {
return err
Expand DownExpand Up@@ -945,35 +952,39 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
for idx, ws := range workspaces {
var (
agentID uuid.UUID
agentName string
name = "workspace-traffic"
id = strconv.Itoa(idx)
agent codersdk.WorkspaceAgent
name = "workspace-traffic"
id = strconv.Itoa(idx)
)

for _, res := range ws.LatestBuild.Resources {
if len(res.Agents) == 0 {
continue
}
agentID = res.Agents[0].ID
agentName = res.Agents[0].Name
agent = res.Agents[0]
}

ifagentID == uuid.Nil {
ifagent.ID == uuid.Nil {
_, _ = fmt.Fprintf(inv.Stderr, "WARN: skipping workspace %s: no agent\n", ws.Name)
continue
}

appConfig, err := createWorkspaceAppConfig(client, appHost.Host, app, ws, agent)
if err != nil {
return xerrors.Errorf("configure workspace app: %w", err)
}

// Setup our workspace agent connection.
config := workspacetraffic.Config{
AgentID:agentID,
AgentID:agent.ID,
BytesPerTick: bytesPerTick,
Duration: strategy.timeout,
TickInterval: tickInterval,
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name,agentName),
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name,agentName),
ReadMetrics: metrics.ReadMetrics(ws.OwnerName, ws.Name,agent.Name),
WriteMetrics: metrics.WriteMetrics(ws.OwnerName, ws.Name,agent.Name),
SSH: ssh,
Echo: ssh,
App: appConfig,
}

if err := config.Validate(); err != nil {
Expand DownExpand Up@@ -1046,9 +1057,16 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *clibase.Cmd {
Flag: "ssh",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_SSH",
Default: "",
Description: "Send traffic over SSH.",
Description: "Send traffic over SSH, cannot be used with --app.",
Value: clibase.BoolOf(&ssh),
},
{
Flag: "app",
Env: "CODER_SCALETEST_WORKSPACE_TRAFFIC_APP",
Default: "",
Description: "Send WebSocket traffic to a workspace app (proxied via coderd), cannot be used with --ssh.",
Value: clibase.StringOf(&app),
},
}

tracingFlags.attach(&cmd.Options)
Expand DownExpand Up@@ -1411,3 +1429,29 @@ func parseTemplate(ctx context.Context, client *codersdk.Client, organizationIDs

return tpl, nil
}

func createWorkspaceAppConfig(client *codersdk.Client, appHost, app string, workspace codersdk.Workspace, agent codersdk.WorkspaceAgent) (workspacetraffic.AppConfig, error) {
if app == "" {
return workspacetraffic.AppConfig{}, nil
}

i := slices.IndexFunc(agent.Apps, func(a codersdk.WorkspaceApp) bool { return a.Slug == app })
if i == -1 {
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q not found in workspace %q", app, workspace.Name)
}

c := workspacetraffic.AppConfig{
Name: agent.Apps[i].Slug,
}
if agent.Apps[i].Subdomain {
if appHost == "" {
return workspacetraffic.AppConfig{}, xerrors.Errorf("app %q is a subdomain app but no app host is configured", app)
}

c.URL = fmt.Sprintf("%s://%s", client.URL.Scheme, strings.Replace(appHost, "*", agent.Apps[i].SubdomainName, 1))
} else {
c.URL = fmt.Sprintf("%s/@%s/%s.%s/apps/%s", client.URL.String(), workspace.OwnerName, workspace.Name, agent.Name, agent.Apps[i].Slug)
}

return c, nil
}
11 changes: 11 additions & 0 deletionsscaletest/workspacetraffic/config.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -31,6 +31,8 @@ type Config struct {
// to true will double the amount of data read from the agent for
// PTYs (e.g. reconnecting pty or SSH connections that request PTY).
Echo bool `json:"echo"`

App AppConfig `json:"app"`
}

func (c Config) Validate() error {
Expand All@@ -50,5 +52,14 @@ func (c Config) Validate() error {
return xerrors.Errorf("validate tick_interval: must be greater than zero")
}

if c.SSH && c.App.Name != "" {
return xerrors.Errorf("validate ssh: must be false when app is used")
}

return nil
}

type AppConfig struct {
Name string `json:"name"`
URL string `json:"url"`
}
119 changes: 119 additions & 0 deletionsscaletest/workspacetraffic/conn.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,9 +5,13 @@ import (
"encoding/json"
"errors"
"io"
"net"
"net/http"
"sync"
"time"

"nhooyr.io/websocket"

"github.com/coder/coder/v2/codersdk"

"github.com/google/uuid"
Expand DownExpand Up@@ -260,3 +264,118 @@ func (w *wrappedSSHConn) Read(p []byte) (n int, err error) {
func (w *wrappedSSHConn) Write(p []byte) (n int, err error) {
return w.stdin.Write(p)
}

func appClientConn(ctx context.Context, client *codersdk.Client, url string) (*countReadWriteCloser, error) {
headers := http.Header{}
tokenHeader := codersdk.SessionTokenHeader
if client.SessionTokenHeader != "" {
tokenHeader = client.SessionTokenHeader
}
headers.Set(tokenHeader, client.SessionToken())

//nolint:bodyclose // The websocket conn manages the body.
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPClient: client.HTTPClient,
HTTPHeader: headers,
})
if err != nil {
return nil, xerrors.Errorf("websocket dial: %w", err)
}

netConn := websocketNetConn(conn, websocket.MessageBinary)

// Wrap the conn in a countReadWriteCloser so we can monitor bytes sent/rcvd.
crw := &countReadWriteCloser{rwc: netConn}
return crw, nil
}

// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
// is called if a read or write error is encountered.
type wsNetConn struct {
net.Conn

writeMu sync.Mutex
readMu sync.Mutex

cancel context.CancelFunc
closeMu sync.Mutex
closed bool
}

func (c *wsNetConn) Read(b []byte) (n int, err error) {
c.readMu.Lock()
defer c.readMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}
n, err = c.Conn.Read(b)
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
return n, nil
}

func (c *wsNetConn) Write(b []byte) (n int, err error) {
c.writeMu.Lock()
defer c.writeMu.Unlock()
if c.isClosed() {
return 0, io.EOF
}

for len(b) > 0 {
bb := b
if len(bb) > rptyJSONMaxDataSize {
bb = b[:rptyJSONMaxDataSize]
}
b = b[len(bb):]
nn, err := c.Conn.Write(bb)
n += nn
if err != nil {
if c.isClosed() {
return n, io.EOF
}
return n, err
}
}
return n, nil
}

func (c *wsNetConn) isClosed() bool {
c.closeMu.Lock()
defer c.closeMu.Unlock()
return c.closed
}

func (c *wsNetConn) Close() error {
c.closeMu.Lock()
closed := c.closed
c.closed = true
c.closeMu.Unlock()

if closed {
return nil
}

// Cancel before acquiring locks to speed up teardown.
c.cancel()

c.readMu.Lock()
defer c.readMu.Unlock()
c.writeMu.Lock()
defer c.writeMu.Unlock()

_ = c.Conn.Close()
return nil
}

func websocketNetConn(conn *websocket.Conn, msgType websocket.MessageType) net.Conn {
// Since `websocket.NetConn` binds to a context for the lifetime of the
// connection, we need to create a new context that can be canceled when
// the connection is closed.
ctx, cancel := context.WithCancel(context.Background())
nc := websocket.NetConn(ctx, conn, msgType)
return &wsNetConn{cancel: cancel, Conn: nc}
}
35 changes: 22 additions & 13 deletionsscaletest/workspacetraffic/run.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -91,7 +91,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
command := fmt.Sprintf("dd if=/dev/stdin of=%s bs=%d status=none", output, bytesPerTick)

var conn *countReadWriteCloser
if r.cfg.SSH {
switch {
case r.cfg.App.Name != "":
logger.Info(ctx, "sending traffic to workspace app", slog.F("app", r.cfg.App.Name))
conn, err = appClientConn(ctx, r.client, r.cfg.App.URL)
if err != nil {
logger.Error(ctx, "connect to workspace app", slog.Error(err))
return xerrors.Errorf("connect to workspace app: %w", err)
}

case r.cfg.SSH:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "ssh"))
// If echo is enabled, disable PTY to avoid double echo and
// reduce CPU usage.
Expand All@@ -101,7 +110,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
logger.Error(ctx, "connect to workspace agent via ssh", slog.Error(err))
return xerrors.Errorf("connect to workspace via ssh: %w", err)
}
} else {

default:
logger.Info(ctx, "connecting to workspace agent", slog.F("method", "reconnectingpty"))
conn, err = connectRPTY(ctx, r.client, agentID, reconnect, command)
if err != nil {
Expand All@@ -114,8 +124,8 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
closeConn := func() error {
closeOnce.Do(func() {
closeErr = conn.Close()
iferr != nil {
logger.Error(ctx, "close agent connection", slog.Error(err))
ifcloseErr != nil {
logger.Error(ctx, "close agent connection", slog.Error(closeErr))
}
})
return closeErr
Expand All@@ -142,7 +152,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Read until connection is closed.
go func() {
rch := rch // Shadowed for reassignment.
logger.Debug(ctx, "reading from agent")
rch <- drain(conn)
logger.Debug(ctx, "done reading from agent")
Expand All@@ -151,7 +160,6 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

// Write random data to the conn every tick.
go func() {
wch := wch // Shadowed for reassignment.
logger.Debug(ctx, "writing to agent")
wch <- writeRandomData(conn, bytesPerTick, tick.C)
logger.Debug(ctx, "done writing to agent")
Expand All@@ -160,16 +168,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)

var waitCloseTimeoutCh <-chan struct{}
deadlineCtxCh := deadlineCtx.Done()
wchRef, rchRef := wch, rch
for {
ifwch == nil &&rch == nil {
ifwchRef == nil &&rchRef == nil {
return nil
}

select {
case <-waitCloseTimeoutCh:
logger.Warn(ctx, "timed out waiting for read/write to complete",
slog.F("write_done",wch == nil),
slog.F("read_done",rch == nil),
slog.F("write_done",wchRef == nil),
slog.F("read_done",rchRef == nil),
)
return xerrors.Errorf("timed out waiting for read/write to complete: %w", ctx.Err())
case <-deadlineCtxCh:
Expand All@@ -181,16 +190,16 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) (err error)
waitCtx, cancel := context.WithTimeout(context.Background(), waitCloseTimeout)
defer cancel() //nolint:revive // Only called once.
waitCloseTimeoutCh = waitCtx.Done()
case err = <-wch:
case err = <-wchRef:
if err != nil {
return xerrors.Errorf("write to agent: %w", err)
}
wch = nil
case err = <-rch:
wchRef = nil
case err = <-rchRef:
if err != nil {
return xerrors.Errorf("read from agent: %w", err)
}
rch = nil
rchRef = nil
}
}
}
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp