|
| 1 | +package agent |
| 2 | + |
| 3 | +import ( |
| 4 | +"context" |
| 5 | +"sync" |
| 6 | +"time" |
| 7 | + |
| 8 | +"github.com/google/uuid" |
| 9 | +"golang.org/x/xerrors" |
| 10 | + |
| 11 | +"cdr.dev/slog" |
| 12 | +"github.com/coder/coder/v2/agent/proto" |
| 13 | +"github.com/coder/coder/v2/codersdk/agentsdk" |
| 14 | +) |
| 15 | + |
| 16 | +const ( |
| 17 | +flushInterval=time.Second |
| 18 | +maxBytesPerBatch=1<<20// 1MiB |
| 19 | +overheadPerLog=21// found by testing |
| 20 | + |
| 21 | +// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken |
| 22 | +// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll |
| 23 | +// accept in the database. |
| 24 | +maxBytesQueued=1048576 |
| 25 | +) |
| 26 | + |
| 27 | +typelogQueuestruct { |
| 28 | +logs []*proto.Log |
| 29 | +flushRequestedbool |
| 30 | +lastFlush time.Time |
| 31 | +} |
| 32 | + |
| 33 | +// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the |
| 34 | +// agent API. Things that need to log call enqueue and flush. When the agent API becomes available, |
| 35 | +// the agent calls sendLoop to send pending logs. |
| 36 | +typelogSenderstruct { |
| 37 | +*sync.Cond |
| 38 | +queuesmap[uuid.UUID]*logQueue |
| 39 | +logger slog.Logger |
| 40 | +exceededLogLimitbool |
| 41 | +outputLenint |
| 42 | +} |
| 43 | + |
| 44 | +typelogDestinterface { |
| 45 | +BatchCreateLogs(ctx context.Context,request*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error) |
| 46 | +} |
| 47 | + |
| 48 | +funcnewLogSender(logger slog.Logger)*logSender { |
| 49 | +return&logSender{ |
| 50 | +Cond:sync.NewCond(&sync.Mutex{}), |
| 51 | +logger:logger, |
| 52 | +queues:make(map[uuid.UUID]*logQueue), |
| 53 | +} |
| 54 | +} |
| 55 | + |
| 56 | +func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log) { |
| 57 | +logger:=l.logger.With(slog.F("log_source_id",src)) |
| 58 | +iflen(logs)==0 { |
| 59 | +logger.Debug(context.Background(),"enqueue called with no logs") |
| 60 | +return |
| 61 | +} |
| 62 | +l.L.Lock() |
| 63 | +deferl.L.Unlock() |
| 64 | +ifl.exceededLogLimit { |
| 65 | +logger.Warn(context.Background(),"dropping enqueued logs because we have reached the server limit") |
| 66 | +// don't error, as we also write to file and don't want the overall write to fail |
| 67 | +return |
| 68 | +} |
| 69 | +deferl.Broadcast() |
| 70 | +q,ok:=l.queues[src] |
| 71 | +if!ok { |
| 72 | +q=&logQueue{} |
| 73 | +l.queues[src]=q |
| 74 | +} |
| 75 | +fork,log:=rangelogs { |
| 76 | +// Here we check the queue size before adding a log because we want to queue up slightly |
| 77 | +// more logs than the database would store to ensure we trigger "logs truncated" at the |
| 78 | +// database layer. Otherwise, the end user wouldn't know logs are truncated unless they |
| 79 | +// examined the Coder agent logs. |
| 80 | +ifl.outputLen>maxBytesQueued { |
| 81 | +logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs))) |
| 82 | +return |
| 83 | +} |
| 84 | +pl,err:=agentsdk.ProtoFromLog(log) |
| 85 | +iferr!=nil { |
| 86 | +logger.Critical(context.Background(),"failed to convert log",slog.Error(err)) |
| 87 | +return |
| 88 | +} |
| 89 | +iflen(pl.Output)+overheadPerLog>maxBytesPerBatch { |
| 90 | +logger.Warn(context.Background(),"dropping log line that exceeds our limit",slog.F("len",len(pl.Output))) |
| 91 | +continue |
| 92 | +} |
| 93 | +q.logs=append(q.logs,pl) |
| 94 | +l.outputLen+=len(pl.Output) |
| 95 | +} |
| 96 | +logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs))) |
| 97 | +} |
| 98 | + |
| 99 | +func (l*logSender)flush(src uuid.UUID) { |
| 100 | +l.L.Lock() |
| 101 | +deferl.L.Unlock() |
| 102 | +deferl.Broadcast() |
| 103 | +q,ok:=l.queues[src] |
| 104 | +ifok { |
| 105 | +q.flushRequested=true |
| 106 | +} |
| 107 | +// queue might not exist because it's already been flushed and removed from |
| 108 | +// the map. |
| 109 | +} |
| 110 | + |
| 111 | +// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not |
| 112 | +// retry as it is expected that a higher layer retries establishing connection to the agent API and |
| 113 | +// calls sendLoop again. |
| 114 | +func (l*logSender)sendLoop(ctx context.Context,destlogDest)error { |
| 115 | +l.L.Lock() |
| 116 | +deferl.L.Unlock() |
| 117 | +ifl.exceededLogLimit { |
| 118 | +l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded") |
| 119 | +// no point in keeping this loop going, if log limit is exceeded, but don't return an |
| 120 | +// error because we're already handled it |
| 121 | +returnnil |
| 122 | +} |
| 123 | + |
| 124 | +ctxDone:=false |
| 125 | +deferl.logger.Debug(ctx,"sendLoop exiting") |
| 126 | + |
| 127 | +// wake 4 times per flush interval to check if anything needs to be flushed |
| 128 | +ctx,cancel:=context.WithCancel(ctx) |
| 129 | +defercancel() |
| 130 | +gofunc() { |
| 131 | +tkr:=time.NewTicker(flushInterval/4) |
| 132 | +defertkr.Stop() |
| 133 | +for { |
| 134 | +select { |
| 135 | +// also monitor the context here, so we notice immediately, rather |
| 136 | +// than waiting for the next tick or logs |
| 137 | +case<-ctx.Done(): |
| 138 | +l.L.Lock() |
| 139 | +ctxDone=true |
| 140 | +l.L.Unlock() |
| 141 | +l.Broadcast() |
| 142 | +return |
| 143 | +case<-tkr.C: |
| 144 | +l.Broadcast() |
| 145 | +} |
| 146 | +} |
| 147 | +}() |
| 148 | + |
| 149 | +for { |
| 150 | +for!ctxDone&&!l.hasPendingWorkLocked() { |
| 151 | +l.Wait() |
| 152 | +} |
| 153 | +ifctxDone { |
| 154 | +returnnil |
| 155 | +} |
| 156 | + |
| 157 | +src,q:=l.getPendingWorkLocked() |
| 158 | +logger:=l.logger.With(slog.F("log_source_id",src)) |
| 159 | +q.flushRequested=false// clear flag since we're now flushing |
| 160 | +req:=&proto.BatchCreateLogsRequest{ |
| 161 | +LogSourceId:src[:], |
| 162 | +} |
| 163 | + |
| 164 | +// outputToSend keeps track of the size of the protobuf message we send, while |
| 165 | +// outputToRemove keeps track of the size of the output we'll remove from the queues on |
| 166 | +// success. They are different because outputToSend also counts protocol message overheads. |
| 167 | +outputToSend:=0 |
| 168 | +outputToRemove:=0 |
| 169 | +n:=0 |
| 170 | +forn<len(q.logs) { |
| 171 | +log:=q.logs[n] |
| 172 | +outputToSend+=len(log.Output)+overheadPerLog |
| 173 | +ifoutputToSend>maxBytesPerBatch { |
| 174 | +break |
| 175 | +} |
| 176 | +req.Logs=append(req.Logs,log) |
| 177 | +n++ |
| 178 | +outputToRemove+=len(log.Output) |
| 179 | +} |
| 180 | + |
| 181 | +l.L.Unlock() |
| 182 | +logger.Debug(ctx,"sending logs to agent API",slog.F("num_logs",len(req.Logs))) |
| 183 | +resp,err:=dest.BatchCreateLogs(ctx,req) |
| 184 | +l.L.Lock() |
| 185 | +iferr!=nil { |
| 186 | +returnxerrors.Errorf("failed to upload logs: %w",err) |
| 187 | +} |
| 188 | +ifresp.LogLimitExceeded { |
| 189 | +l.logger.Warn(ctx,"server log limit exceeded; logs truncated") |
| 190 | +l.exceededLogLimit=true |
| 191 | +// no point in keeping anything we have queued around, server will not accept them |
| 192 | +l.queues=make(map[uuid.UUID]*logQueue) |
| 193 | +// We've handled the error as best as we can. We don't want the server limit to grind |
| 194 | +// other things to a halt, so this is all we can do. |
| 195 | +returnnil |
| 196 | +} |
| 197 | + |
| 198 | +// Since elsewhere we only append to the logs, here we can remove them |
| 199 | +// since we successfully sent them. First we nil the pointers though, |
| 200 | +// so that they can be gc'd. |
| 201 | +fori:=0;i<n;i++ { |
| 202 | +q.logs[i]=nil |
| 203 | +} |
| 204 | +q.logs=q.logs[n:] |
| 205 | +l.outputLen-=outputToRemove |
| 206 | +iflen(q.logs)==0 { |
| 207 | +// no empty queues |
| 208 | +delete(l.queues,src) |
| 209 | +continue |
| 210 | +} |
| 211 | +q.lastFlush=time.Now() |
| 212 | +} |
| 213 | +} |
| 214 | + |
| 215 | +func (l*logSender)hasPendingWorkLocked()bool { |
| 216 | +for_,q:=rangel.queues { |
| 217 | +iftime.Since(q.lastFlush)>flushInterval { |
| 218 | +returntrue |
| 219 | +} |
| 220 | +ifq.flushRequested { |
| 221 | +returntrue |
| 222 | +} |
| 223 | +} |
| 224 | +returnfalse |
| 225 | +} |
| 226 | + |
| 227 | +func (l*logSender)getPendingWorkLocked() (src uuid.UUID,q*logQueue) { |
| 228 | +// take the one it's been the longest since we've flushed, so that we have some sense of |
| 229 | +// fairness across sources |
| 230 | +varearliestFlush time.Time |
| 231 | +foris,iq:=rangel.queues { |
| 232 | +ifq==nil||iq.lastFlush.Before(earliestFlush) { |
| 233 | +src=is |
| 234 | +q=iq |
| 235 | +earliestFlush=iq.lastFlush |
| 236 | +} |
| 237 | +} |
| 238 | +returnsrc,q |
| 239 | +} |