Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit14d0583

Browse files
committed
feat: limit queued logs to database limit in agent
1 parente941fbc commit14d0583

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

‎agent/logs.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval=time.Second
18-
logOutputMaxBytes=1<<20// 1MiB
19-
overheadPerLog=21// found by testing
17+
flushInterval=time.Second
18+
maxBytesPerBatch=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued=1048576
2025
)
2126

2227
typelogQueuestruct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queuesmap[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimitbool
41+
outputLenint
3642
}
3743

3844
typelogDestinterface {
@@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

56+
varMaxQueueExceededError=xerrors.New("maximum queued logs exceeded")
57+
5058
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error {
5159
logger:=l.logger.With(slog.F("log_source_id",src))
5260
iflen(logs)==0 {
@@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
6674
q=&logQueue{}
6775
l.queues[src]=q
6876
}
69-
for_,log:=rangelogs {
77+
fork,log:=rangelogs {
78+
// Here we check the queue size before adding a log because we want to queue up slightly
79+
// more logs than the database would store to ensure we trigger "logs truncated" at the
80+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
81+
// examined the Coder agent logs.
82+
ifl.outputLen>maxBytesQueued {
83+
logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs)))
84+
returnMaxQueueExceededError
85+
}
7086
pl,err:=agentsdk.ProtoFromLog(log)
7187
iferr!=nil {
7288
returnxerrors.Errorf("failed to convert log: %w",err)
7389
}
90+
iflen(pl.Output)>maxBytesPerBatch {
91+
logger.Warn(context.Background(),"dropping log line that exceeds our limit")
92+
continue
93+
}
7494
q.logs=append(q.logs,pl)
95+
l.outputLen+=len(pl.Output)
7596
}
7697
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
7798
returnnil
@@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
140161
req:=&proto.BatchCreateLogsRequest{
141162
LogSourceId:src[:],
142163
}
143-
o:=0
164+
165+
// outputToSend keeps track of the size of the protobuf message we send, while
166+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
167+
// success. They are different because outputToSend also counts protocol message overheads.
168+
outputToSend:=0
169+
outputToRemove:=0
144170
n:=0
145171
forn<len(q.logs) {
146172
log:=q.logs[n]
147-
iflen(log.Output)>logOutputMaxBytes {
148-
logger.Warn(ctx,"dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o+=len(log.Output)+overheadPerLog
153-
ifo>logOutputMaxBytes {
173+
outputToSend+=len(log.Output)+overheadPerLog
174+
ifoutputToSend>maxBytesPerBatch {
154175
break
155176
}
156177
req.Logs=append(req.Logs,log)
157178
n++
179+
outputToRemove+=len(log.Output)
158180
}
159181

160182
l.L.Unlock()
@@ -177,6 +199,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
177199
// since elsewhere we only append to the logs, here we can remove them
178200
// since we successfully sent them
179201
q.logs=q.logs[n:]
202+
l.outputLen-=outputToRemove
180203
iflen(q.logs)==0 {
181204
// no empty queues
182205
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
174174

175175
t0:=dbtime.Now()
176176
ls1:= uuid.UUID{0x11}
177-
hugeLog:=make([]byte,logOutputMaxBytes+1)
177+
hugeLog:=make([]byte,maxBytesPerBatch+1)
178178
fori:=rangehugeLog {
179179
hugeLog[i]='q'
180180
}
@@ -244,14 +244,14 @@ func TestLogSender_Batch(t *testing.T) {
244244
gotLogs+=len(req.Logs)
245245
wire,err:=protobuf.Marshal(req)
246246
require.NoError(t,err)
247-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
247+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
248248
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
249249
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
250250
require.NotNil(t,req)
251251
gotLogs+=len(req.Logs)
252252
wire,err=protobuf.Marshal(req)
253253
require.NoError(t,err)
254-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
254+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
255255
require.Equal(t,60000,gotLogs)
256256
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
257257

@@ -260,6 +260,68 @@ func TestLogSender_Batch(t *testing.T) {
260260
require.NoError(t,err)
261261
}
262262

263+
funcTestLogSender_MaxQueuedLogs(t*testing.T) {
264+
t.Parallel()
265+
testCtx:=testutil.Context(t,testutil.WaitShort)
266+
ctx,cancel:=context.WithCancel(testCtx)
267+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
268+
fDest:=newFakeLogDest()
269+
uut:=newLogSender(logger)
270+
271+
t0:=dbtime.Now()
272+
ls1:= uuid.UUID{0x11}
273+
n:=4
274+
hugeLog:=make([]byte,maxBytesQueued/n)
275+
fori:=rangehugeLog {
276+
hugeLog[i]='q'
277+
}
278+
varlogs []agentsdk.Log
279+
fori:=0;i<n;i++ {
280+
logs=append(logs, agentsdk.Log{
281+
CreatedAt:t0,
282+
Output:string(hugeLog),
283+
Level:codersdk.LogLevelInfo,
284+
})
285+
}
286+
err:=uut.enqueue(ls1,logs...)
287+
require.NoError(t,err)
288+
289+
// we're now right at the limit of output
290+
require.Equal(t,maxBytesQueued,uut.outputLen)
291+
292+
// adding more logs should error...
293+
ls2:= uuid.UUID{0x22}
294+
err=uut.enqueue(ls2,logs...)
295+
require.ErrorIs(t,err,MaxQueueExceededError)
296+
297+
loopErr:=make(chanerror,1)
298+
gofunc() {
299+
err:=uut.sendLoop(ctx,fDest)
300+
loopErr<-err
301+
}()
302+
303+
// ...but, it should still queue up one log from source #2, so that we would exceed the database
304+
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
305+
// #1 come in 2 updates, plus 1 update for source #2.
306+
logsBySource:=make(map[uuid.UUID]int)
307+
fori:=0;i<3;i++ {
308+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
309+
require.NotNil(t,req)
310+
srcID,err:=uuid.FromBytes(req.LogSourceId)
311+
require.NoError(t,err)
312+
logsBySource[srcID]+=len(req.Logs)
313+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
314+
}
315+
require.Equal(t,map[uuid.UUID]int{
316+
ls1:n,
317+
ls2:1,
318+
},logsBySource)
319+
320+
cancel()
321+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
322+
require.NoError(t,err)
323+
}
324+
263325
typefakeLogDeststruct {
264326
reqschan*proto.BatchCreateLogsRequest
265327
respschan*proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp