@@ -14,9 +14,14 @@ import (
14
14
)
15
15
16
16
const (
17
- flushInterval = time .Second
18
- logOutputMaxBytes = 1 << 20 // 1MiB
19
- overheadPerLog = 21 // found by testing
17
+ flushInterval = time .Second
18
+ maxBytesPerBatch = 1 << 20 // 1MiB
19
+ overheadPerLog = 21 // found by testing
20
+
21
+ // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22
+ // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23
+ // accept in the database.
24
+ maxBytesQueued = 1048576
20
25
)
21
26
22
27
type logQueue struct {
@@ -30,8 +35,9 @@ type logQueue struct {
30
35
// the agent calls sendLoop to send pending logs.
31
36
type logSender struct {
32
37
* sync.Cond
33
- queues map [uuid.UUID ]* logQueue
34
- logger slog.Logger
38
+ queues map [uuid.UUID ]* logQueue
39
+ logger slog.Logger
40
+ outputLen int
35
41
}
36
42
37
43
type logDest interface {
@@ -46,6 +52,8 @@ func newLogSender(logger slog.Logger) *logSender {
46
52
}
47
53
}
48
54
55
+ var MaxQueueExceededError = xerrors .New ("maximum queued logs exceeded" )
56
+
49
57
func (l * logSender )enqueue (src uuid.UUID ,logs ... agentsdk.Log )error {
50
58
logger := l .logger .With (slog .F ("log_source_id" ,src ))
51
59
if len (logs )== 0 {
@@ -60,12 +68,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
60
68
q = & logQueue {}
61
69
l .queues [src ]= q
62
70
}
63
- for _ ,log := range logs {
71
+ for k ,log := range logs {
72
+ // Here we check the queue size before adding a log because we want to queue up slightly
73
+ // more logs than the database would store to ensure we trigger "logs truncated" at the
74
+ // database layer. Otherwise, the end user wouldn't know logs are truncated unless they
75
+ // examined the Coder agent logs.
76
+ if l .outputLen > maxBytesQueued {
77
+ logger .Warn (context .Background (),"log queue full; truncating new logs" ,slog .F ("new_logs" ,k ),slog .F ("queued_logs" ,len (q .logs )))
78
+ return MaxQueueExceededError
79
+ }
64
80
pl ,err := agentsdk .ProtoFromLog (log )
65
81
if err != nil {
66
82
return xerrors .Errorf ("failed to convert log: %w" ,err )
67
83
}
84
+ if len (pl .Output )> maxBytesPerBatch {
85
+ logger .Warn (context .Background (),"dropping log line that exceeds our limit" )
86
+ continue
87
+ }
68
88
q .logs = append (q .logs ,pl )
89
+ l .outputLen += len (pl .Output )
69
90
}
70
91
logger .Debug (context .Background (),"enqueued agent logs" ,slog .F ("new_logs" ,len (logs )),slog .F ("queued_logs" ,len (q .logs )))
71
92
return nil
@@ -126,21 +147,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
126
147
req := & proto.BatchCreateLogsRequest {
127
148
LogSourceId :src [:],
128
149
}
129
- o := 0
150
+
151
+ // outputToSend keeps track of the size of the protobuf message we send, while
152
+ // outputToRemove keeps track of the size of the output we'll remove from the queues on
153
+ // success. They are different because outputToSend also counts protocol message overheads.
154
+ outputToSend := 0
155
+ outputToRemove := 0
130
156
n := 0
131
157
for n < len (q .logs ) {
132
158
log := q .logs [n ]
133
- if len (log .Output )> logOutputMaxBytes {
134
- logger .Warn (ctx ,"dropping log line that exceeds our limit" )
135
- n ++
136
- continue
137
- }
138
- o += len (log .Output )+ overheadPerLog
139
- if o > logOutputMaxBytes {
159
+ outputToSend += len (log .Output )+ overheadPerLog
160
+ if outputToSend > maxBytesPerBatch {
140
161
break
141
162
}
142
163
req .Logs = append (req .Logs ,log )
143
164
n ++
165
+ outputToRemove += len (log .Output )
144
166
}
145
167
146
168
l .L .Unlock ()
@@ -154,6 +176,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
154
176
// since elsewhere we only append to the logs, here we can remove them
155
177
// since we successfully sent them
156
178
q .logs = q .logs [n :]
179
+ l .outputLen -= outputToRemove
157
180
if len (q .logs )== 0 {
158
181
// no empty queues
159
182
delete (l .queues ,src )