Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit149bfb6

Browse files
committed
drain connection async
1 parent7269e25 commit149bfb6

File tree

2 files changed

+62
-22
lines changed

2 files changed

+62
-22
lines changed

‎cli/scaletest.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,8 @@ func (r *RootCmd) scaletestTrafficGen() *clibase.Cmd {
975975
}
976976

977977
ifagentID==uuid.Nil {
978-
returnxerrors.Errorf("no agent found for workspace %s",ws.Name)
978+
_,_=fmt.Fprintf(inv.Stderr,"WARN: skipping workspace %s: no agent\n",ws.Name)
979+
continue
979980
}
980981

981982
// Setup our workspace agent connection.

‎scaletest/trafficgen/run.go

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package trafficgen
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"io"
@@ -12,6 +13,7 @@ import (
1213

1314
"cdr.dev/slog"
1415
"cdr.dev/slog/sloggers/sloghuman"
16+
1517
"github.com/coder/coder/coderd/tracing"
1618
"github.com/coder/coder/codersdk"
1719
"github.com/coder/coder/cryptorand"
@@ -72,14 +74,14 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
7274
_=conn.Close()
7375
}()
7476

75-
// Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd.
76-
crw:=countReadWriter{ReadWriter:conn}
77-
7877
// Set a deadline for stopping the text.
7978
start:=time.Now()
8079
deadlineCtx,cancel:=context.WithDeadline(ctx,start.Add(r.cfg.Duration))
8180
defercancel()
8281

82+
// Wrap the conn in a countReadWriter so we can monitor bytes sent/rcvd.
83+
crw:=countReadWriter{ReadWriter:conn,ctx:deadlineCtx}
84+
8385
// Create a ticker for sending data to the PTY.
8486
tick:=time.NewTicker(time.Duration(tickInterval))
8587
defertick.Stop()
@@ -88,10 +90,15 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
8890
rch:=make(chanerror)
8991
wch:=make(chanerror)
9092

93+
gofunc() {
94+
<-deadlineCtx.Done()
95+
logger.Debug(ctx,"context deadline reached",slog.F("duration",time.Since(start)))
96+
}()
97+
9198
// Read forever in the background.
9299
gofunc() {
93100
logger.Debug(ctx,"reading from agent",slog.F("agent_id",agentID))
94-
rch<-readContext(deadlineCtx,&crw,bytesPerTick*2)
101+
rch<-drainContext(deadlineCtx,&crw,bytesPerTick*2)
95102
logger.Debug(ctx,"done reading from agent",slog.F("agent_id",agentID))
96103
conn.Close()
97104
close(rch)
@@ -109,14 +116,17 @@ func (r *Runner) Run(ctx context.Context, _ string, logs io.Writer) error {
109116
ifwErr:=<-wch;wErr!=nil {
110117
returnxerrors.Errorf("write to pty: %w",wErr)
111118
}
119+
drainStart:=time.Now()
112120
ifrErr:=<-rch;rErr!=nil {
113121
returnxerrors.Errorf("read from pty: %w",rErr)
114122
}
115123

116124
duration:=time.Since(start)
125+
drainDuration:=time.Since(drainStart)
117126

118-
logger.Info(ctx,"trafficgen result",
127+
logger.Info(ctx,"results",
119128
slog.F("duration",duration),
129+
slog.F("drain",drainDuration),
120130
slog.F("sent",crw.BytesWritten()),
121131
slog.F("rcvd",crw.BytesRead()),
122132
)
@@ -129,14 +139,34 @@ func (*Runner) Cleanup(context.Context, string) error {
129139
returnnil
130140
}
131141

132-
funcreadContext(ctx context.Context,src io.Reader,bufSizeint64)error {
133-
buf:=make([]byte,bufSize)
142+
// drainContext drains from src until it returns io.EOF or ctx times out.
143+
funcdrainContext(ctx context.Context,src io.Reader,bufSizeint64)error {
144+
errCh:=make(chanerror)
145+
done:=make(chanstruct{})
146+
gofunc() {
147+
tmp:=make([]byte,bufSize)
148+
buf:=bytes.NewBuffer(tmp)
149+
for {
150+
select {
151+
case<-done:
152+
return
153+
default:
154+
_,err:=io.CopyN(buf,src,1)
155+
// _, err := src.Read(tmp)
156+
iferr!=nil {
157+
errCh<-err
158+
close(errCh)
159+
return
160+
}
161+
}
162+
}
163+
}()
134164
for {
135165
select {
136166
case<-ctx.Done():
167+
close(done)
137168
returnnil
138-
default:
139-
_,err:=src.Read(buf)
169+
caseerr:=<-errCh:
140170
iferr!=nil {
141171
ifxerrors.Is(err,io.EOF) {
142172
returnnil
@@ -175,31 +205,37 @@ func copyContext(ctx context.Context, dst io.Writer, src []byte) (int, error) {
175205
case<-ctx.Done():
176206
returncount,nil
177207
default:
178-
n,err:=dst.Write(src)
179-
iferr!=nil {
180-
ifxerrors.Is(err,io.EOF) {
181-
// On an EOF, assume that all of src was consumed.
182-
returnlen(src),nil
208+
foridx:=rangesrc {
209+
n,err:=dst.Write(src[idx :idx+1])
210+
iferr!=nil {
211+
ifxerrors.Is(err,io.EOF) {
212+
returncount,nil
213+
}
214+
ifxerrors.Is(err,context.DeadlineExceeded) {
215+
// It's OK if we reach the deadline before writing the full payload.
216+
returncount,nil
217+
}
218+
returncount,err
183219
}
184-
returncount,err
185-
}
186-
count+=n
187-
ifn==len(src) {
188-
returncount,nil
220+
count+=n
189221
}
190-
// Not all of src was consumed. Update src and retry.
191-
src=src[n:]
222+
returncount,nil
192223
}
193224
}
194225
}
195226

227+
// countReadWriter wraps an io.ReadWriter and counts the number of bytes read and written.
196228
typecountReadWriterstruct {
229+
ctx context.Context
197230
io.ReadWriter
198231
bytesRead atomic.Int64
199232
bytesWritten atomic.Int64
200233
}
201234

202235
func (w*countReadWriter)Read(p []byte) (int,error) {
236+
iferr:=w.ctx.Err();err!=nil {
237+
return0,err
238+
}
203239
n,err:=w.ReadWriter.Read(p)
204240
iferr==nil {
205241
w.bytesRead.Add(int64(n))
@@ -208,6 +244,9 @@ func (w *countReadWriter) Read(p []byte) (int, error) {
208244
}
209245

210246
func (w*countReadWriter)Write(p []byte) (int,error) {
247+
iferr:=w.ctx.Err();err!=nil {
248+
return0,err
249+
}
211250
n,err:=w.ReadWriter.Write(p)
212251
iferr==nil {
213252
w.bytesWritten.Add(int64(n))

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp