@@ -14,9 +14,14 @@ import (
14
14
)
15
15
16
16
const (
17
- flushInterval = time .Second
18
- logOutputMaxBytes = 1 << 20 // 1MiB
19
- overheadPerLog = 21 // found by testing
17
+ flushInterval = time .Second
18
+ maxBytesPerBatch = 1 << 20 // 1MiB
19
+ overheadPerLog = 21 // found by testing
20
+
21
+ // maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22
+ // from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23
+ // accept in the database.
24
+ maxBytesQueued = 1048576
20
25
)
21
26
22
27
type logQueue struct {
@@ -33,6 +38,7 @@ type logSender struct {
33
38
queues map [uuid.UUID ]* logQueue
34
39
logger slog.Logger
35
40
exceededLogLimit bool
41
+ outputLen int
36
42
}
37
43
38
44
type logDest interface {
@@ -47,37 +53,50 @@ func newLogSender(logger slog.Logger) *logSender {
47
53
}
48
54
}
49
55
50
- func (l * logSender )enqueue (src uuid.UUID ,logs ... agentsdk.Log )error {
56
+ func (l * logSender )enqueue (src uuid.UUID ,logs ... agentsdk.Log ) {
51
57
logger := l .logger .With (slog .F ("log_source_id" ,src ))
52
58
if len (logs )== 0 {
53
59
logger .Debug (context .Background (),"enqueue called with no logs" )
54
- return nil
60
+ return
55
61
}
56
62
l .L .Lock ()
57
63
defer l .L .Unlock ()
58
64
if l .exceededLogLimit {
59
65
logger .Warn (context .Background (),"dropping enqueued logs because we have reached the server limit" )
60
66
// don't error, as we also write to file and don't want the overall write to fail
61
- return nil
67
+ return
62
68
}
63
69
defer l .Broadcast ()
64
70
q ,ok := l .queues [src ]
65
71
if ! ok {
66
72
q = & logQueue {}
67
73
l .queues [src ]= q
68
74
}
69
- for _ ,log := range logs {
75
+ for k ,log := range logs {
76
+ // Here we check the queue size before adding a log because we want to queue up slightly
77
+ // more logs than the database would store to ensure we trigger "logs truncated" at the
78
+ // database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79
+ // examined the Coder agent logs.
80
+ if l .outputLen > maxBytesQueued {
81
+ logger .Warn (context .Background (),"log queue full; truncating new logs" ,slog .F ("new_logs" ,k ),slog .F ("queued_logs" ,len (q .logs )))
82
+ return
83
+ }
70
84
pl ,err := agentsdk .ProtoFromLog (log )
71
85
if err != nil {
72
- return xerrors .Errorf ("failed to convert log: %w" ,err )
86
+ logger .Critical (context .Background (),"failed to convert log" ,slog .Error (err ))
87
+ return
88
+ }
89
+ if len (pl .Output )+ overheadPerLog > maxBytesPerBatch {
90
+ logger .Warn (context .Background (),"dropping log line that exceeds our limit" ,slog .F ("len" ,len (pl .Output )))
91
+ continue
73
92
}
74
93
q .logs = append (q .logs ,pl )
94
+ l .outputLen += len (pl .Output )
75
95
}
76
96
logger .Debug (context .Background (),"enqueued agent logs" ,slog .F ("new_logs" ,len (logs )),slog .F ("queued_logs" ,len (q .logs )))
77
- return nil
78
97
}
79
98
80
- func (l * logSender )flush (src uuid.UUID )error {
99
+ func (l * logSender )flush (src uuid.UUID ) {
81
100
l .L .Lock ()
82
101
defer l .L .Unlock ()
83
102
defer l .Broadcast ()
@@ -87,13 +106,21 @@ func (l *logSender) flush(src uuid.UUID) error {
87
106
}
88
107
// queue might not exist because it's already been flushed and removed from
89
108
// the map.
90
- return nil
91
109
}
92
110
93
111
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
94
112
// retry as it is expected that a higher layer retries establishing connection to the agent API and
95
113
// calls sendLoop again.
96
114
func (l * logSender )sendLoop (ctx context.Context ,dest logDest )error {
115
+ l .L .Lock ()
116
+ defer l .L .Unlock ()
117
+ if l .exceededLogLimit {
118
+ l .logger .Debug (ctx ,"aborting sendLoop because log limit is already exceeded" )
119
+ // no point in keeping this loop going, if log limit is exceeded, but don't return an
120
+ // error because we're already handled it
121
+ return nil
122
+ }
123
+
97
124
ctxDone := false
98
125
defer l .logger .Debug (ctx ,"sendLoop exiting" )
99
126
@@ -119,42 +146,36 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
119
146
}
120
147
}()
121
148
122
- l .L .Lock ()
123
- defer l .L .Unlock ()
124
149
for {
125
- for ! ctxDone && ! l .exceededLogLimit && ! l . hasPendingWorkLocked () {
150
+ for ! ctxDone && ! l .hasPendingWorkLocked () {
126
151
l .Wait ()
127
152
}
128
153
if ctxDone {
129
154
return nil
130
155
}
131
- if l .exceededLogLimit {
132
- l .logger .Debug (ctx ,"aborting sendLoop because log limit is already exceeded" )
133
- // no point in keeping this loop going, if log limit is exceeded, but don't return an
134
- // error because we're already handled it
135
- return nil
136
- }
156
+
137
157
src ,q := l .getPendingWorkLocked ()
138
158
logger := l .logger .With (slog .F ("log_source_id" ,src ))
139
159
q .flushRequested = false // clear flag since we're now flushing
140
160
req := & proto.BatchCreateLogsRequest {
141
161
LogSourceId :src [:],
142
162
}
143
- o := 0
163
+
164
+ // outputToSend keeps track of the size of the protobuf message we send, while
165
+ // outputToRemove keeps track of the size of the output we'll remove from the queues on
166
+ // success. They are different because outputToSend also counts protocol message overheads.
167
+ outputToSend := 0
168
+ outputToRemove := 0
144
169
n := 0
145
170
for n < len (q .logs ) {
146
171
log := q .logs [n ]
147
- if len (log .Output )> logOutputMaxBytes {
148
- logger .Warn (ctx ,"dropping log line that exceeds our limit" )
149
- n ++
150
- continue
151
- }
152
- o += len (log .Output )+ overheadPerLog
153
- if o > logOutputMaxBytes {
172
+ outputToSend += len (log .Output )+ overheadPerLog
173
+ if outputToSend > maxBytesPerBatch {
154
174
break
155
175
}
156
176
req .Logs = append (req .Logs ,log )
157
177
n ++
178
+ outputToRemove += len (log .Output )
158
179
}
159
180
160
181
l .L .Unlock ()
@@ -181,6 +202,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
181
202
q .logs [i ]= nil
182
203
}
183
204
q .logs = q .logs [n :]
205
+ l .outputLen -= outputToRemove
184
206
if len (q .logs )== 0 {
185
207
// no empty queues
186
208
delete (l .queues ,src )