Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0aee68c

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parent92abdf6 commit0aee68c

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

‎agent/logs.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ import (
1313
"github.com/coder/coder/v2/codersdk/agentsdk"
1414
)
1515

16-
constflushInterval=time.Second
16+
const (
17+
flushInterval=time.Second
18+
logOutputMaxBytes=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
)
1721

1822
typelogQueuestruct {
1923
logs []*proto.Log
@@ -131,14 +135,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
131135
returnnil
132136
}
133137
src,q:=l.getPendingWorkLocked()
138+
logger:=l.logger.With(slog.F("log_source_id",src))
134139
q.flushRequested=false// clear flag since we're now flushing
135140
req:=&proto.BatchCreateLogsRequest{
136141
LogSourceId:src[:],
137-
Logs:q.logs[:],
142+
}
143+
o:=0
144+
n:=0
145+
forn<len(q.logs) {
146+
log:=q.logs[n]
147+
iflen(log.Output)>logOutputMaxBytes {
148+
logger.Warn(ctx,"dropping log line that exceeds our limit")
149+
n++
150+
continue
151+
}
152+
o+=len(log.Output)+overheadPerLog
153+
ifo>logOutputMaxBytes {
154+
break
155+
}
156+
req.Logs=append(req.Logs,log)
157+
n++
138158
}
139159

140160
l.L.Unlock()
141-
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
161+
logger.Debug(ctx,"sending logs to agent API",slog.F("num_logs",len(req.Logs)))
142162
resp,err:=dest.BatchCreateLogs(ctx,req)
143163
l.L.Lock()
144164
iferr!=nil {
@@ -157,10 +177,10 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
157177
// Since elsewhere we only append to the logs, here we can remove them
158178
// since we successfully sent them. First we nil the pointers though,
159179
// so that they can be gc'd.
160-
fori:=0;i<len(req.Logs);i++ {
180+
fori:=0;i<n;i++ {
161181
q.logs[i]=nil
162182
}
163-
q.logs=q.logs[len(req.Logs):]
183+
q.logs=q.logs[n:]
164184
iflen(q.logs)==0 {
165185
// no empty queues
166186
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/google/uuid"
1111
"github.com/stretchr/testify/require"
12+
protobuf"google.golang.org/protobuf/proto"
1213

1314
"cdr.dev/slog"
1415
"cdr.dev/slog/sloggers/slogtest"
@@ -165,6 +166,102 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
165166
require.Len(t,uut.queues,0)
166167
}
167168

169+
funcTestLogSender_SkipHugeLog(t*testing.T) {
170+
t.Parallel()
171+
testCtx:=testutil.Context(t,testutil.WaitShort)
172+
ctx,cancel:=context.WithCancel(testCtx)
173+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
174+
fDest:=newFakeLogDest()
175+
uut:=newLogSender(logger)
176+
177+
t0:=dbtime.Now()
178+
ls1:= uuid.UUID{0x11}
179+
hugeLog:=make([]byte,logOutputMaxBytes+1)
180+
fori:=rangehugeLog {
181+
hugeLog[i]='q'
182+
}
183+
err:=uut.enqueue(ls1,
184+
agentsdk.Log{
185+
CreatedAt:t0,
186+
Output:string(hugeLog),
187+
Level:codersdk.LogLevelInfo,
188+
},
189+
agentsdk.Log{
190+
CreatedAt:t0,
191+
Output:"test log 1, src 1",
192+
Level:codersdk.LogLevelInfo,
193+
})
194+
require.NoError(t,err)
195+
196+
loopErr:=make(chanerror,1)
197+
gofunc() {
198+
err:=uut.sendLoop(ctx,fDest)
199+
loopErr<-err
200+
}()
201+
202+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
203+
require.NotNil(t,req)
204+
require.Len(t,req.Logs,1,"it should skip the huge log")
205+
require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput())
206+
require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel())
207+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
208+
209+
cancel()
210+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
211+
require.NoError(t,err)
212+
}
213+
214+
funcTestLogSender_Batch(t*testing.T) {
215+
t.Parallel()
216+
testCtx:=testutil.Context(t,testutil.WaitShort)
217+
ctx,cancel:=context.WithCancel(testCtx)
218+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
219+
fDest:=newFakeLogDest()
220+
uut:=newLogSender(logger)
221+
222+
t0:=dbtime.Now()
223+
ls1:= uuid.UUID{0x11}
224+
varlogs []agentsdk.Log
225+
fori:=0;i<60000;i++ {
226+
logs=append(logs, agentsdk.Log{
227+
CreatedAt:t0,
228+
Output:"r",
229+
Level:codersdk.LogLevelInfo,
230+
})
231+
}
232+
err:=uut.enqueue(ls1,logs...)
233+
require.NoError(t,err)
234+
235+
loopErr:=make(chanerror,1)
236+
gofunc() {
237+
err:=uut.sendLoop(ctx,fDest)
238+
loopErr<-err
239+
}()
240+
241+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
242+
// is about 21 bytes.
243+
gotLogs:=0
244+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
245+
require.NotNil(t,req)
246+
gotLogs+=len(req.Logs)
247+
wire,err:=protobuf.Marshal(req)
248+
require.NoError(t,err)
249+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
250+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
251+
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
252+
require.NotNil(t,req)
253+
gotLogs+=len(req.Logs)
254+
wire,err=protobuf.Marshal(req)
255+
require.NoError(t,err)
256+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
257+
require.Equal(t,60000,gotLogs)
258+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
259+
260+
cancel()
261+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
262+
require.NoError(t,err)
263+
}
264+
168265
typefakeLogDeststruct {
169266
reqschan*proto.BatchCreateLogsRequest
170267
respschan*proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp