Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit181b699

Browse files
committed
feat: ensure that log batches don't exceed 1MiB in logSender
1 parent4bcbd7e commit181b699

File tree

2 files changed

+119
-7
lines changed

2 files changed

+119
-7
lines changed

‎agent/logs.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@ import (
66
"time"
77

88
"github.com/google/uuid"
9-
"golang.org/x/exp/slices"
109
"golang.org/x/xerrors"
1110

1211
"cdr.dev/slog"
1312
"github.com/coder/coder/v2/agent/proto"
1413
"github.com/coder/coder/v2/codersdk/agentsdk"
1514
)
1615

17-
constflushInterval=time.Second
16+
const (
17+
flushInterval=time.Second
18+
logOutputMaxBytes=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
)
1821

1922
typelogQueuestruct {
2023
logs []*proto.Log
@@ -118,15 +121,30 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
118121
returnnil
119122
}
120123
src,q:=l.getPendingWorkLocked()
124+
logger:=l.logger.With(slog.F("log_source_id",src))
121125
q.flushRequested=false// clear flag since we're now flushing
122126
req:=&proto.BatchCreateLogsRequest{
123127
LogSourceId:src[:],
124-
// when we release the lock, we don't want modifications to the slice to affect us
125-
Logs:slices.Clone(q.logs),
128+
}
129+
o:=0
130+
n:=0
131+
forn<len(q.logs) {
132+
log:=q.logs[n]
133+
iflen(log.Output)>logOutputMaxBytes {
134+
logger.Warn(ctx,"dropping log line that exceeds our limit")
135+
n++
136+
continue
137+
}
138+
o+=len(log.Output)+overheadPerLog
139+
ifo>logOutputMaxBytes {
140+
break
141+
}
142+
req.Logs=append(req.Logs,log)
143+
n++
126144
}
127145

128146
l.L.Unlock()
129-
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
147+
logger.Debug(ctx,"sending logs to agent API",slog.F("num_logs",len(req.Logs)))
130148
_,err:=dest.BatchCreateLogs(ctx,req)
131149
l.L.Lock()
132150
iferr!=nil {
@@ -135,7 +153,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
135153

136154
// since elsewhere we only append to the logs, here we can remove them
137155
// since we successfully sent them
138-
q.logs=q.logs[len(req.Logs):]
156+
q.logs=q.logs[n:]
139157
iflen(q.logs)==0 {
140158
// no empty queues
141159
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"github.com/stretchr/testify/require"
10+
protobuf"google.golang.org/protobuf/proto"
1011

1112
"cdr.dev/slog"
1213
"cdr.dev/slog/sloggers/slogtest"
@@ -17,7 +18,7 @@ import (
1718
"github.com/coder/coder/v2/testutil"
1819
)
1920

20-
funcTestLogSender(t*testing.T) {
21+
funcTestLogSender_Mainline(t*testing.T) {
2122
t.Parallel()
2223
testCtx:=testutil.Context(t,testutil.WaitShort)
2324
ctx,cancel:=context.WithCancel(testCtx)
@@ -116,6 +117,99 @@ func TestLogSender(t *testing.T) {
116117
require.NoError(t,err)
117118
}
118119

120+
funcTestLogSender_SkipHugeLog(t*testing.T) {
121+
t.Parallel()
122+
testCtx:=testutil.Context(t,testutil.WaitShort)
123+
ctx,cancel:=context.WithCancel(testCtx)
124+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
125+
fDest:=newFakeLogDest()
126+
uut:=newLogSender(logger)
127+
128+
t0:=dbtime.Now()
129+
ls1:= uuid.UUID{0x11}
130+
hugeLog:=make([]byte,logOutputMaxBytes+1)
131+
fori:=rangehugeLog {
132+
hugeLog[i]='q'
133+
}
134+
err:=uut.enqueue(ls1,
135+
agentsdk.Log{
136+
CreatedAt:t0,
137+
Output:string(hugeLog),
138+
Level:codersdk.LogLevelInfo,
139+
},
140+
agentsdk.Log{
141+
CreatedAt:t0,
142+
Output:"test log 1, src 1",
143+
Level:codersdk.LogLevelInfo,
144+
})
145+
require.NoError(t,err)
146+
147+
loopErr:=make(chanerror,1)
148+
gofunc() {
149+
err:=uut.sendLoop(ctx,fDest)
150+
loopErr<-err
151+
}()
152+
153+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
154+
require.NotNil(t,req)
155+
require.Len(t,req.Logs,1,"it should skip the huge log")
156+
require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput())
157+
require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel())
158+
159+
cancel()
160+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
161+
require.NoError(t,err)
162+
}
163+
164+
funcTestLogSender_Batch(t*testing.T) {
165+
t.Parallel()
166+
testCtx:=testutil.Context(t,testutil.WaitShort)
167+
ctx,cancel:=context.WithCancel(testCtx)
168+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
169+
fDest:=newFakeLogDest()
170+
uut:=newLogSender(logger)
171+
172+
t0:=dbtime.Now()
173+
ls1:= uuid.UUID{0x11}
174+
varlogs []agentsdk.Log
175+
fori:=0;i<60000;i++ {
176+
logs=append(logs, agentsdk.Log{
177+
CreatedAt:t0,
178+
Output:"r",
179+
Level:codersdk.LogLevelInfo,
180+
})
181+
}
182+
err:=uut.enqueue(ls1,logs...)
183+
require.NoError(t,err)
184+
185+
loopErr:=make(chanerror,1)
186+
gofunc() {
187+
err:=uut.sendLoop(ctx,fDest)
188+
loopErr<-err
189+
}()
190+
191+
// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log
192+
// is about 21 bytes.
193+
gotLogs:=0
194+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
195+
require.NotNil(t,req)
196+
gotLogs+=len(req.Logs)
197+
wire,err:=protobuf.Marshal(req)
198+
require.NoError(t,err)
199+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
200+
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
201+
require.NotNil(t,req)
202+
gotLogs+=len(req.Logs)
203+
wire,err=protobuf.Marshal(req)
204+
require.NoError(t,err)
205+
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
206+
require.Equal(t,60000,gotLogs)
207+
208+
cancel()
209+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
210+
require.NoError(t,err)
211+
}
212+
119213
typefakeLogDeststruct {
120214
reqschan*proto.BatchCreateLogsRequest
121215
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp