Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit2aff014

Browse files
authored
feat: add logSender for sending logs on agent v2 API (#12046)
Adds a new subcomponent of the agent for queueing up logs until they can be sent over the Agent API.Subsequent PR will change the agent to use this instead of the HTTP API for posting logs.Relates to#10534
1 parent627232e commit2aff014

File tree

3 files changed

+652
-0
lines changed

3 files changed

+652
-0
lines changed

‎agent/logs.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/proto"
13+
"github.com/coder/coder/v2/codersdk/agentsdk"
14+
)
15+
16+
const (
17+
flushInterval=time.Second
18+
maxBytesPerBatch=1<<20// 1MiB
19+
overheadPerLog=21// found by testing
20+
21+
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
22+
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
23+
// accept in the database.
24+
maxBytesQueued=1048576
25+
)
26+
27+
typelogQueuestruct {
28+
logs []*proto.Log
29+
flushRequestedbool
30+
lastFlush time.Time
31+
}
32+
33+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
34+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
35+
// the agent calls sendLoop to send pending logs.
36+
typelogSenderstruct {
37+
*sync.Cond
38+
queuesmap[uuid.UUID]*logQueue
39+
logger slog.Logger
40+
exceededLogLimitbool
41+
outputLenint
42+
}
43+
44+
typelogDestinterface {
45+
BatchCreateLogs(ctx context.Context,request*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error)
46+
}
47+
48+
funcnewLogSender(logger slog.Logger)*logSender {
49+
return&logSender{
50+
Cond:sync.NewCond(&sync.Mutex{}),
51+
logger:logger,
52+
queues:make(map[uuid.UUID]*logQueue),
53+
}
54+
}
55+
56+
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log) {
57+
logger:=l.logger.With(slog.F("log_source_id",src))
58+
iflen(logs)==0 {
59+
logger.Debug(context.Background(),"enqueue called with no logs")
60+
return
61+
}
62+
l.L.Lock()
63+
deferl.L.Unlock()
64+
ifl.exceededLogLimit {
65+
logger.Warn(context.Background(),"dropping enqueued logs because we have reached the server limit")
66+
// don't error, as we also write to file and don't want the overall write to fail
67+
return
68+
}
69+
deferl.Broadcast()
70+
q,ok:=l.queues[src]
71+
if!ok {
72+
q=&logQueue{}
73+
l.queues[src]=q
74+
}
75+
fork,log:=rangelogs {
76+
// Here we check the queue size before adding a log because we want to queue up slightly
77+
// more logs than the database would store to ensure we trigger "logs truncated" at the
78+
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
79+
// examined the Coder agent logs.
80+
ifl.outputLen>maxBytesQueued {
81+
logger.Warn(context.Background(),"log queue full; truncating new logs",slog.F("new_logs",k),slog.F("queued_logs",len(q.logs)))
82+
return
83+
}
84+
pl,err:=agentsdk.ProtoFromLog(log)
85+
iferr!=nil {
86+
logger.Critical(context.Background(),"failed to convert log",slog.Error(err))
87+
return
88+
}
89+
iflen(pl.Output)+overheadPerLog>maxBytesPerBatch {
90+
logger.Warn(context.Background(),"dropping log line that exceeds our limit",slog.F("len",len(pl.Output)))
91+
continue
92+
}
93+
q.logs=append(q.logs,pl)
94+
l.outputLen+=len(pl.Output)
95+
}
96+
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
97+
}
98+
99+
func (l*logSender)flush(src uuid.UUID) {
100+
l.L.Lock()
101+
deferl.L.Unlock()
102+
deferl.Broadcast()
103+
q,ok:=l.queues[src]
104+
ifok {
105+
q.flushRequested=true
106+
}
107+
// queue might not exist because it's already been flushed and removed from
108+
// the map.
109+
}
110+
111+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
112+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
113+
// calls sendLoop again.
114+
func (l*logSender)sendLoop(ctx context.Context,destlogDest)error {
115+
l.L.Lock()
116+
deferl.L.Unlock()
117+
ifl.exceededLogLimit {
118+
l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded")
119+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
120+
// error because we're already handled it
121+
returnnil
122+
}
123+
124+
ctxDone:=false
125+
deferl.logger.Debug(ctx,"sendLoop exiting")
126+
127+
// wake 4 times per flush interval to check if anything needs to be flushed
128+
ctx,cancel:=context.WithCancel(ctx)
129+
defercancel()
130+
gofunc() {
131+
tkr:=time.NewTicker(flushInterval/4)
132+
defertkr.Stop()
133+
for {
134+
select {
135+
// also monitor the context here, so we notice immediately, rather
136+
// than waiting for the next tick or logs
137+
case<-ctx.Done():
138+
l.L.Lock()
139+
ctxDone=true
140+
l.L.Unlock()
141+
l.Broadcast()
142+
return
143+
case<-tkr.C:
144+
l.Broadcast()
145+
}
146+
}
147+
}()
148+
149+
for {
150+
for!ctxDone&&!l.hasPendingWorkLocked() {
151+
l.Wait()
152+
}
153+
ifctxDone {
154+
returnnil
155+
}
156+
157+
src,q:=l.getPendingWorkLocked()
158+
logger:=l.logger.With(slog.F("log_source_id",src))
159+
q.flushRequested=false// clear flag since we're now flushing
160+
req:=&proto.BatchCreateLogsRequest{
161+
LogSourceId:src[:],
162+
}
163+
164+
// outputToSend keeps track of the size of the protobuf message we send, while
165+
// outputToRemove keeps track of the size of the output we'll remove from the queues on
166+
// success. They are different because outputToSend also counts protocol message overheads.
167+
outputToSend:=0
168+
outputToRemove:=0
169+
n:=0
170+
forn<len(q.logs) {
171+
log:=q.logs[n]
172+
outputToSend+=len(log.Output)+overheadPerLog
173+
ifoutputToSend>maxBytesPerBatch {
174+
break
175+
}
176+
req.Logs=append(req.Logs,log)
177+
n++
178+
outputToRemove+=len(log.Output)
179+
}
180+
181+
l.L.Unlock()
182+
logger.Debug(ctx,"sending logs to agent API",slog.F("num_logs",len(req.Logs)))
183+
resp,err:=dest.BatchCreateLogs(ctx,req)
184+
l.L.Lock()
185+
iferr!=nil {
186+
returnxerrors.Errorf("failed to upload logs: %w",err)
187+
}
188+
ifresp.LogLimitExceeded {
189+
l.logger.Warn(ctx,"server log limit exceeded; logs truncated")
190+
l.exceededLogLimit=true
191+
// no point in keeping anything we have queued around, server will not accept them
192+
l.queues=make(map[uuid.UUID]*logQueue)
193+
// We've handled the error as best as we can. We don't want the server limit to grind
194+
// other things to a halt, so this is all we can do.
195+
returnnil
196+
}
197+
198+
// Since elsewhere we only append to the logs, here we can remove them
199+
// since we successfully sent them. First we nil the pointers though,
200+
// so that they can be gc'd.
201+
fori:=0;i<n;i++ {
202+
q.logs[i]=nil
203+
}
204+
q.logs=q.logs[n:]
205+
l.outputLen-=outputToRemove
206+
iflen(q.logs)==0 {
207+
// no empty queues
208+
delete(l.queues,src)
209+
continue
210+
}
211+
q.lastFlush=time.Now()
212+
}
213+
}
214+
215+
func (l*logSender)hasPendingWorkLocked()bool {
216+
for_,q:=rangel.queues {
217+
iftime.Since(q.lastFlush)>flushInterval {
218+
returntrue
219+
}
220+
ifq.flushRequested {
221+
returntrue
222+
}
223+
}
224+
returnfalse
225+
}
226+
227+
func (l*logSender)getPendingWorkLocked() (src uuid.UUID,q*logQueue) {
228+
// take the one it's been the longest since we've flushed, so that we have some sense of
229+
// fairness across sources
230+
varearliestFlush time.Time
231+
foris,iq:=rangel.queues {
232+
ifq==nil||iq.lastFlush.Before(earliestFlush) {
233+
src=is
234+
q=iq
235+
earliestFlush=iq.lastFlush
236+
}
237+
}
238+
returnsrc,q
239+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp