Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit446cb12

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent743f5f3 commit446cb12

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

‎agent/logs.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval=time.Second
18-
logOutputMaxBytes=1<<20// 1MiB
19-
overheadPerLog=21// found by testing
17+
flushInterval=time.Second
18+
maxBytesPerBatch=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued=1048576
2025
)
2126

2227
typelogQueuestruct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queuesmap[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimitbool
41+
outputLenint
3642
}
3743

3844
typelogDestinterface {
@@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

56+
varMaxQueueExceededError=xerrors.New("maximum queued logs exceeded")
57+
5058
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error {
5159
logger:=l.logger.With(slog.F("log_source_id",src))
5260
iflen(logs)==0 {
@@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
6674
q=&logQueue{}
6775
l.queues[src]=q
6876
}
69-
for_,log:=rangelogs {
77+
fork,log:=rangelogs {
78+
// Here we check the queue size before adding a log because we want to queue up slightly
79+
// more logs than the database would store to ensure we trigger "logs truncated" at the
80+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
81+
// examined the Coder agent logs.
82+
ifl.outputLen>maxBytesQueued {
83+
logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs)))
84+
returnMaxQueueExceededError
85+
}
7086
pl,err:=agentsdk.ProtoFromLog(log)
7187
iferr!=nil {
7288
returnxerrors.Errorf("failed to convert log: %w",err)
7389
}
90+
iflen(pl.Output)>maxBytesPerBatch {
91+
logger.Warn(context.Background(),"dropping log line that exceeds our limit")
92+
continue
93+
}
7494
q.logs=append(q.logs,pl)
95+
l.outputLen+=len(pl.Output)
7596
}
7697
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
7798
returnnil
@@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
140161
req:=&proto.BatchCreateLogsRequest{
141162
LogSourceId:src[:],
142163
}
143-
o:=0
164+
165+
// outputToSend keeps track of the size of the protobuf message we send, while
166+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
167+
// success. They are different because outputToSend also counts protocol message overheads.
168+
outputToSend:=0
169+
outputToRemove:=0
144170
n:=0
145171
forn<len(q.logs) {
146172
log:=q.logs[n]
147-
iflen(log.Output)>logOutputMaxBytes {
148-
logger.Warn(ctx,"dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o+=len(log.Output)+overheadPerLog
153-
ifo>logOutputMaxBytes {
173+
outputToSend+=len(log.Output)+overheadPerLog
174+
ifoutputToSend>maxBytesPerBatch {
154175
break
155176
}
156177
req.Logs=append(req.Logs,log)
157178
n++
179+
outputToRemove+=len(log.Output)
158180
}
159181

160182
l.L.Unlock()
@@ -181,6 +203,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
181203
q.logs[i]=nil
182204
}
183205
q.logs=q.logs[n:]
206+
l.outputLen-=outputToRemove
184207
iflen(q.logs)==0 {
185208
// no empty queues
186209
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
176176

177177
t0:=dbtime.Now()
178178
ls1:= uuid.UUID{0x11}
179-
hugeLog:=make([]byte,logOutputMaxBytes+1)
179+
hugeLog:=make([]byte,maxBytesPerBatch+1)
180180
fori:=rangehugeLog {
181181
hugeLog[i]='q'
182182
}
@@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) {
246246
gotLogs+=len(req.Logs)
247247
wire,err:=protobuf.Marshal(req)
248248
require.NoError(t,err)
249-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
249+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
250250
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
251251
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
252252
require.NotNil(t,req)
253253
gotLogs+=len(req.Logs)
254254
wire,err=protobuf.Marshal(req)
255255
require.NoError(t,err)
256-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
256+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
257257
require.Equal(t,60000,gotLogs)
258258
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
259259

@@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) {
262262
require.NoError(t,err)
263263
}
264264

265+
funcTestLogSender_MaxQueuedLogs(t*testing.T) {
266+
t.Parallel()
267+
testCtx:=testutil.Context(t,testutil.WaitShort)
268+
ctx,cancel:=context.WithCancel(testCtx)
269+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
270+
fDest:=newFakeLogDest()
271+
uut:=newLogSender(logger)
272+
273+
t0:=dbtime.Now()
274+
ls1:= uuid.UUID{0x11}
275+
n:=4
276+
hugeLog:=make([]byte,maxBytesQueued/n)
277+
fori:=rangehugeLog {
278+
hugeLog[i]='q'
279+
}
280+
varlogs []agentsdk.Log
281+
fori:=0;i<n;i++ {
282+
logs=append(logs, agentsdk.Log{
283+
CreatedAt:t0,
284+
Output:string(hugeLog),
285+
Level:codersdk.LogLevelInfo,
286+
})
287+
}
288+
err:=uut.enqueue(ls1,logs...)
289+
require.NoError(t,err)
290+
291+
// we're now right at the limit of output
292+
require.Equal(t,maxBytesQueued,uut.outputLen)
293+
294+
// adding more logs should error...
295+
ls2:= uuid.UUID{0x22}
296+
err=uut.enqueue(ls2,logs...)
297+
require.ErrorIs(t,err,MaxQueueExceededError)
298+
299+
loopErr:=make(chanerror,1)
300+
gofunc() {
301+
err:=uut.sendLoop(ctx,fDest)
302+
loopErr<-err
303+
}()
304+
305+
// ...but, it should still queue up one log from source #2, so that we would exceed the database
306+
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
307+
// #1 come in 2 updates, plus 1 update for source #2.
308+
logsBySource:=make(map[uuid.UUID]int)
309+
fori:=0;i<3;i++ {
310+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
311+
require.NotNil(t,req)
312+
srcID,err:=uuid.FromBytes(req.LogSourceId)
313+
require.NoError(t,err)
314+
logsBySource[srcID]+=len(req.Logs)
315+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
316+
}
317+
require.Equal(t,map[uuid.UUID]int{
318+
ls1:n,
319+
ls2:1,
320+
},logsBySource)
321+
322+
cancel()
323+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
324+
require.NoError(t,err)
325+
}
326+
265327
typefakeLogDeststruct {
266328
reqschan*proto.BatchCreateLogsRequest
267329
respschan*proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp