Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitede6f2f

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parentc8e7282 commitede6f2f

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

‎agent/logs.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ import (
66
"time"
77

88
"github.com/google/uuid"
9-
"golang.org/x/exp/slices"
109
"golang.org/x/xerrors"
1110

1211
"cdr.dev/slog"
1312
"github.com/coder/coder/v2/agent/proto"
1413
"github.com/coder/coder/v2/codersdk/agentsdk"
1514
)
1615

17-
constflushInterval=time.Second
16+
const (
17+
flushInterval=time.Second
18+
logOutputMaxBytes=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
)
1821

1922
typelogQueuestruct {
2023
logs []*proto.Log
@@ -132,15 +135,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
132135
returnnil
133136
}
134137
src,q:=l.getPendingWorkLocked()
138+
logger:=l.logger.With(slog.F("log_source_id",src))
135139
q.flushRequested=false// clear flag since we're now flushing
136140
req:=&proto.BatchCreateLogsRequest{
137141
LogSourceId:src[:],
138-
// when we release the lock, we don't want modifications to the slice to affect us
139-
Logs:slices.Clone(q.logs),
142+
}
143+
o:=0
144+
n:=0
145+
forn<len(q.logs) {
146+
log:=q.logs[n]
147+
iflen(log.Output)>logOutputMaxBytes {
148+
logger.Warn(ctx,"dropping log line that exceeds our limit")
149+
n++
150+
continue
151+
}
152+
o+=len(log.Output)+overheadPerLog
153+
ifo>logOutputMaxBytes {
154+
break
155+
}
156+
req.Logs=append(req.Logs,log)
157+
n++
140158
}
141159

142160
l.L.Unlock()
143-
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
161+
logger.Debug(ctx,"sending logs to agent API",slog.F("num_logs",len(req.Logs)))
144162
resp,err:=dest.BatchCreateLogs(ctx,req)
145163
l.L.Lock()
146164
iferr!=nil {
@@ -158,7 +176,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
158176

159177
// since elsewhere we only append to the logs, here we can remove them
160178
// since we successfully sent them
161-
q.logs=q.logs[len(req.Logs):]
179+
q.logs=q.logs[n:]
162180
iflen(q.logs)==0 {
163181
// no empty queues
164182
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"github.com/stretchr/testify/require"
10+
protobuf"google.golang.org/protobuf/proto"
1011

1112
"cdr.dev/slog"
1213
"cdr.dev/slog/sloggers/slogtest"
@@ -163,6 +164,102 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
163164
require.Len(t,uut.queues,0)
164165
}
165166

167+
funcTestLogSender_SkipHugeLog(t*testing.T) {
168+
t.Parallel()
169+
testCtx:=testutil.Context(t,testutil.WaitShort)
170+
ctx,cancel:=context.WithCancel(testCtx)
171+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
172+
fDest:=newFakeLogDest()
173+
uut:=newLogSender(logger)
174+
175+
t0:=dbtime.Now()
176+
ls1:= uuid.UUID{0x11}
177+
hugeLog:=make([]byte,logOutputMaxBytes+1)
178+
fori:=rangehugeLog {
179+
hugeLog[i]='q'
180+
}
181+
err:=uut.enqueue(ls1,
182+
agentsdk.Log{
183+
CreatedAt:t0,
184+
Output:string(hugeLog),
185+
Level:codersdk.LogLevelInfo,
186+
},
187+
agentsdk.Log{
188+
CreatedAt:t0,
189+
Output:"test log 1, src 1",
190+
Level:codersdk.LogLevelInfo,
191+
})
192+
require.NoError(t,err)
193+
194+
loopErr:=make(chanerror,1)
195+
gofunc() {
196+
err:=uut.sendLoop(ctx,fDest)
197+
loopErr<-err
198+
}()
199+
200+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
201+
require.NotNil(t,req)
202+
require.Len(t,req.Logs,1,"it should skip the huge log")
203+
require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput())
204+
require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel())
205+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
206+
207+
cancel()
208+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
209+
require.NoError(t,err)
210+
}
211+
212+
funcTestLogSender_Batch(t*testing.T) {
213+
t.Parallel()
214+
testCtx:=testutil.Context(t,testutil.WaitShort)
215+
ctx,cancel:=context.WithCancel(testCtx)
216+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
217+
fDest:=newFakeLogDest()
218+
uut:=newLogSender(logger)
219+
220+
t0:=dbtime.Now()
221+
ls1:= uuid.UUID{0x11}
222+
varlogs []agentsdk.Log
223+
fori:=0;i<60000;i++ {
224+
logs=append(logs, agentsdk.Log{
225+
CreatedAt:t0,
226+
Output:"r",
227+
Level:codersdk.LogLevelInfo,
228+
})
229+
}
230+
err:=uut.enqueue(ls1,logs...)
231+
require.NoError(t,err)
232+
233+
loopErr:=make(chanerror,1)
234+
gofunc() {
235+
err:=uut.sendLoop(ctx,fDest)
236+
loopErr<-err
237+
}()
238+
239+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
240+
// is about 21 bytes.
241+
gotLogs:=0
242+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
243+
require.NotNil(t,req)
244+
gotLogs+=len(req.Logs)
245+
wire,err:=protobuf.Marshal(req)
246+
require.NoError(t,err)
247+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
248+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
249+
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
250+
require.NotNil(t,req)
251+
gotLogs+=len(req.Logs)
252+
wire,err=protobuf.Marshal(req)
253+
require.NoError(t,err)
254+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
255+
require.Equal(t,60000,gotLogs)
256+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
257+
258+
cancel()
259+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
260+
require.NoError(t,err)
261+
}
262+
166263
typefakeLogDeststruct {
167264
reqschan*proto.BatchCreateLogsRequest
168265
respschan*proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp