Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4a32bba

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent27b55aa commit4a32bba

File tree

2 files changed

+101
-17
lines changed

2 files changed

+101
-17
lines changed

‎agent/logs.go

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval=time.Second
18-
logOutputMaxBytes=1<<20// 1MiB
19-
overheadPerLog=21// found by testing
17+
flushInterval=time.Second
18+
maxBytesPerBatch=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued=1048576
2025
)
2126

2227
typelogQueuestruct {
@@ -30,8 +35,9 @@ type logQueue struct {
3035
// the agent calls sendLoop to send pending logs.
3136
typelogSenderstruct {
3237
*sync.Cond
33-
queuesmap[uuid.UUID]*logQueue
34-
logger slog.Logger
38+
queuesmap[uuid.UUID]*logQueue
39+
logger slog.Logger
40+
outputLenint
3541
}
3642

3743
typelogDestinterface {
@@ -46,6 +52,8 @@ func newLogSender(logger slog.Logger) *logSender {
4652
}
4753
}
4854

55+
varMaxQueueExceededError=xerrors.New("maximum queued logs exceeded")
56+
4957
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error {
5058
logger:=l.logger.With(slog.F("log_source_id",src))
5159
iflen(logs)==0 {
@@ -60,12 +68,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
6068
q=&logQueue{}
6169
l.queues[src]=q
6270
}
63-
for_,log:=rangelogs {
71+
fork,log:=rangelogs {
72+
// Here we check the queue size before adding a log because we want to queue up slightly
73+
// more logs than the database would store to ensure we trigger "logs truncated" at the
74+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
75+
// examined the Coder agent logs.
76+
ifl.outputLen>maxBytesQueued {
77+
logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs)))
78+
returnMaxQueueExceededError
79+
}
6480
pl,err:=agentsdk.ProtoFromLog(log)
6581
iferr!=nil {
6682
returnxerrors.Errorf("failed to convert log: %w",err)
6783
}
84+
iflen(pl.Output)>maxBytesPerBatch {
85+
logger.Warn(context.Background(),"dropping log line that exceeds our limit")
86+
continue
87+
}
6888
q.logs=append(q.logs,pl)
89+
l.outputLen+=len(pl.Output)
6990
}
7091
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
7192
returnnil
@@ -126,21 +147,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
126147
req:=&proto.BatchCreateLogsRequest{
127148
LogSourceId:src[:],
128149
}
129-
o:=0
150+
151+
// outputToSend keeps track of the size of the protobuf message we send, while
152+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
153+
// success. They are different because outputToSend also counts protocol message overheads.
154+
outputToSend:=0
155+
outputToRemove:=0
130156
n:=0
131157
forn<len(q.logs) {
132158
log:=q.logs[n]
133-
iflen(log.Output)>logOutputMaxBytes {
134-
logger.Warn(ctx,"dropping log line that exceeds our limit")
135-
n++
136-
continue
137-
}
138-
o+=len(log.Output)+overheadPerLog
139-
ifo>logOutputMaxBytes {
159+
outputToSend+=len(log.Output)+overheadPerLog
160+
ifoutputToSend>maxBytesPerBatch {
140161
break
141162
}
142163
req.Logs=append(req.Logs,log)
143164
n++
165+
outputToRemove+=len(log.Output)
144166
}
145167

146168
l.L.Unlock()
@@ -154,6 +176,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
154176
// since elsewhere we only append to the logs, here we can remove them
155177
// since we successfully sent them
156178
q.logs=q.logs[n:]
179+
l.outputLen-=outputToRemove
157180
iflen(q.logs)==0 {
158181
// no empty queues
159182
delete(l.queues,src)

‎agent/logs_internal_test.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {
127127

128128
t0:=dbtime.Now()
129129
ls1:= uuid.UUID{0x11}
130-
hugeLog:=make([]byte,logOutputMaxBytes+1)
130+
hugeLog:=make([]byte,maxBytesPerBatch+1)
131131
fori:=rangehugeLog {
132132
hugeLog[i]='q'
133133
}
@@ -196,20 +196,81 @@ func TestLogSender_Batch(t *testing.T) {
196196
gotLogs+=len(req.Logs)
197197
wire,err:=protobuf.Marshal(req)
198198
require.NoError(t,err)
199-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
199+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
200200
req=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
201201
require.NotNil(t,req)
202202
gotLogs+=len(req.Logs)
203203
wire,err=protobuf.Marshal(req)
204204
require.NoError(t,err)
205-
require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB")
205+
require.Less(t,len(wire),maxBytesPerBatch,"wire should not exceed 1MiB")
206206
require.Equal(t,60000,gotLogs)
207207

208208
cancel()
209209
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
210210
require.NoError(t,err)
211211
}
212212

213+
funcTestLogSender_MaxQueuedLogs(t*testing.T) {
214+
t.Parallel()
215+
testCtx:=testutil.Context(t,testutil.WaitShort)
216+
ctx,cancel:=context.WithCancel(testCtx)
217+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
218+
fDest:=newFakeLogDest()
219+
uut:=newLogSender(logger)
220+
221+
t0:=dbtime.Now()
222+
ls1:= uuid.UUID{0x11}
223+
n:=4
224+
hugeLog:=make([]byte,maxBytesQueued/n)
225+
fori:=rangehugeLog {
226+
hugeLog[i]='q'
227+
}
228+
varlogs []agentsdk.Log
229+
fori:=0;i<n;i++ {
230+
logs=append(logs, agentsdk.Log{
231+
CreatedAt:t0,
232+
Output:string(hugeLog),
233+
Level:codersdk.LogLevelInfo,
234+
})
235+
}
236+
err:=uut.enqueue(ls1,logs...)
237+
require.NoError(t,err)
238+
239+
// we're now right at the limit of output
240+
require.Equal(t,maxBytesQueued,uut.outputLen)
241+
242+
// adding more logs should error...
243+
ls2:= uuid.UUID{0x22}
244+
err=uut.enqueue(ls2,logs...)
245+
require.ErrorIs(t,err,MaxQueueExceededError)
246+
247+
loopErr:=make(chanerror,1)
248+
gofunc() {
249+
err:=uut.sendLoop(ctx,fDest)
250+
loopErr<-err
251+
}()
252+
253+
// ...but, it should still queue up one log from source #2, so that we would exceed the database
254+
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
255+
// #1 come in 2 updates, plus 1 update for source #2.
256+
logsBySource:=make(map[uuid.UUID]int)
257+
fori:=0;i<3;i++ {
258+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
259+
require.NotNil(t,req)
260+
srcID,err:=uuid.FromBytes(req.LogSourceId)
261+
require.NoError(t,err)
262+
logsBySource[srcID]+=len(req.Logs)
263+
}
264+
require.Equal(t,map[uuid.UUID]int{
265+
ls1:n,
266+
ls2:1,
267+
},logsBySource)
268+
269+
cancel()
270+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
271+
require.NoError(t,err)
272+
}
273+
213274
typefakeLogDeststruct {
214275
reqschan*proto.BatchCreateLogsRequest
215276
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp