Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5f74643

Browse files
committed
feat: limit queued logs to database limit in agent
1 parent5bee9ed commit5f74643

File tree

2 files changed

+183
-53
lines changed

2 files changed

+183
-53
lines changed

‎agent/logs.go

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@ import (
1414
)
1515

1616
const (
17-
flushInterval=time.Second
18-
logOutputMaxBytes=1<<20// 1MiB
19-
overheadPerLog=21// found by testing
17+
flushInterval=time.Second
18+
maxBytesPerBatch=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued=1048576
2025
)
2126

2227
typelogQueuestruct {
@@ -33,6 +38,7 @@ type logSender struct {
3338
queuesmap[uuid.UUID]*logQueue
3439
logger slog.Logger
3540
exceededLogLimitbool
41+
outputLenint
3642
}
3743

3844
typelogDestinterface {
@@ -47,37 +53,50 @@ func newLogSender(logger slog.Logger) *logSender {
4753
}
4854
}
4955

50-
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error{
56+
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log) {
5157
logger:=l.logger.With(slog.F("log_source_id",src))
5258
iflen(logs)==0 {
5359
logger.Debug(context.Background(),"enqueue called with no logs")
54-
returnnil
60+
return
5561
}
5662
l.L.Lock()
5763
deferl.L.Unlock()
5864
ifl.exceededLogLimit {
5965
logger.Warn(context.Background(),"dropping enqueued logs because we have reached the server limit")
6066
// don't error, as we also write to file and don't want the overall write to fail
61-
returnnil
67+
return
6268
}
6369
deferl.Broadcast()
6470
q,ok:=l.queues[src]
6571
if!ok {
6672
q=&logQueue{}
6773
l.queues[src]=q
6874
}
69-
for_,log:=rangelogs {
75+
fork,log:=rangelogs {
76+
// Here we check the queue size before adding a log because we want to queue up slightly
77+
// more logs than the database would store to ensure we trigger "logs truncated" at the
78+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79+
// examined the Coder agent logs.
80+
ifl.outputLen>maxBytesQueued {
81+
logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs)))
82+
return
83+
}
7084
pl,err:=agentsdk.ProtoFromLog(log)
7185
iferr!=nil {
72-
returnxerrors.Errorf("failed to convert log: %w",err)
86+
logger.Critical(context.Background(),"failed to convert log",slog.Error(err))
87+
return
88+
}
89+
iflen(pl.Output)+overheadPerLog>maxBytesPerBatch {
90+
logger.Warn(context.Background(),"dropping log line that exceeds our limit",slog.F("len",len(pl.Output)))
91+
continue
7392
}
7493
q.logs=append(q.logs,pl)
94+
l.outputLen+=len(pl.Output)
7595
}
7696
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
77-
returnnil
7897
}
7998

80-
func (l*logSender)flush(src uuid.UUID)error{
99+
func (l*logSender)flush(src uuid.UUID) {
81100
l.L.Lock()
82101
deferl.L.Unlock()
83102
deferl.Broadcast()
@@ -87,13 +106,21 @@ func (l *logSender) flush(src uuid.UUID) error {
87106
}
88107
// queue might not exist because it's already been flushed and removed from
89108
// the map.
90-
returnnil
91109
}
92110

93111
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
94112
// retry as it is expected that a higher layer retries establishing connection to the agent API and
95113
// calls sendLoop again.
96114
func (l*logSender)sendLoop(ctx context.Context,destlogDest)error {
115+
l.L.Lock()
116+
deferl.L.Unlock()
117+
ifl.exceededLogLimit {
118+
l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded")
119+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
120+
// error because we're already handled it
121+
returnnil
122+
}
123+
97124
ctxDone:=false
98125
deferl.logger.Debug(ctx,"sendLoop exiting")
99126

@@ -119,42 +146,36 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
119146
}
120147
}()
121148

122-
l.L.Lock()
123-
deferl.L.Unlock()
124149
for {
125-
for!ctxDone&&!l.exceededLogLimit&&!l.hasPendingWorkLocked() {
150+
for!ctxDone&&!l.hasPendingWorkLocked() {
126151
l.Wait()
127152
}
128153
ifctxDone {
129154
returnnil
130155
}
131-
ifl.exceededLogLimit {
132-
l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded")
133-
// no point in keeping this loop going, if log limit is exceeded, but don't return an
134-
// error because we're already handled it
135-
returnnil
136-
}
156+
137157
src,q:=l.getPendingWorkLocked()
138158
logger:=l.logger.With(slog.F("log_source_id",src))
139159
q.flushRequested=false// clear flag since we're now flushing
140160
req:=&proto.BatchCreateLogsRequest{
141161
LogSourceId:src[:],
142162
}
143-
o:=0
163+
164+
// outputToSend keeps track of the size of the protobuf message we send, while
165+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
166+
// success. They are different because outputToSend also counts protocol message overheads.
167+
outputToSend:=0
168+
outputToRemove:=0
144169
n:=0
145170
forn<len(q.logs) {
146171
log:=q.logs[n]
147-
iflen(log.Output)>logOutputMaxBytes {
148-
logger.Warn(ctx,"dropping log line that exceeds our limit")
149-
n++
150-
continue
151-
}
152-
o+=len(log.Output)+overheadPerLog
153-
ifo>logOutputMaxBytes {
172+
outputToSend+=len(log.Output)+overheadPerLog
173+
ifoutputToSend>maxBytesPerBatch {
154174
break
155175
}
156176
req.Logs=append(req.Logs,log)
157177
n++
178+
outputToRemove+=len(log.Output)
158179
}
159180

160181
l.L.Unlock()
@@ -181,6 +202,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
181202
q.logs[i]=nil
182203
}
183204
q.logs=q.logs[n:]
205+
l.outputLen-=outputToRemove
184206
iflen(q.logs)==0 {
185207
// no empty queues
186208
delete(l.queues,src)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp