Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc8e7282

Browse files
committed
feat: handle log limit exceeded in logSender
1 parent3047485 commitc8e7282

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

‎agent/logs.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ type logQueue struct {
2727
// the agent calls sendLoop to send pending logs.
2828
typelogSenderstruct {
2929
*sync.Cond
30-
queuesmap[uuid.UUID]*logQueue
31-
logger slog.Logger
30+
queuesmap[uuid.UUID]*logQueue
31+
logger slog.Logger
32+
exceededLogLimitbool
3233
}
3334

3435
typelogDestinterface {
@@ -51,6 +52,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5152
}
5253
l.L.Lock()
5354
deferl.L.Unlock()
55+
ifl.exceededLogLimit {
56+
logger.Warn(context.Background(),"dropping enqueued logs because we have reached the server limit")
57+
// don't error, as we also write to file and don't want the overall write to fail
58+
returnnil
59+
}
5460
deferl.Broadcast()
5561
q,ok:=l.queues[src]
5662
if!ok {
@@ -89,6 +95,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
8995
deferl.logger.Debug(ctx,"sendLoop exiting")
9096

9197
// wake 4 times per flush interval to check if anything needs to be flushed
98+
ctx,cancel:=context.WithCancel(ctx)
99+
defercancel()
92100
gofunc() {
93101
tkr:=time.NewTicker(flushInterval/4)
94102
defertkr.Stop()
@@ -111,12 +119,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
111119
l.L.Lock()
112120
deferl.L.Unlock()
113121
for {
114-
for!ctxDone&&!l.hasPendingWorkLocked() {
122+
for!ctxDone&&!l.exceededLogLimit&&!l.hasPendingWorkLocked() {
115123
l.Wait()
116124
}
117125
ifctxDone {
118126
returnnil
119127
}
128+
ifl.exceededLogLimit {
129+
l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded")
130+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
131+
// error because we're already handled it
132+
returnnil
133+
}
120134
src,q:=l.getPendingWorkLocked()
121135
q.flushRequested=false// clear flag since we're now flushing
122136
req:=&proto.BatchCreateLogsRequest{
@@ -127,11 +141,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
127141

128142
l.L.Unlock()
129143
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
130-
_,err:=dest.BatchCreateLogs(ctx,req)
144+
resp,err:=dest.BatchCreateLogs(ctx,req)
131145
l.L.Lock()
132146
iferr!=nil {
133147
returnxerrors.Errorf("failed to upload logs: %w",err)
134148
}
149+
ifresp.LogLimitExceeded {
150+
l.logger.Warn(ctx,"server log limit exceeded; logs truncated")
151+
l.exceededLogLimit=true
152+
// no point in keeping anything we have queued around, server will not accept them
153+
l.queues=make(map[uuid.UUID]*logQueue)
154+
// We've handled the error as best as we can. We don't want the server limit to grind
155+
// other things to a halt, so this is all we can do.
156+
returnnil
157+
}
135158

136159
// since elsewhere we only append to the logs, here we can remove them
137160
// since we successfully sent them

‎agent/logs_internal_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"github.com/coder/coder/v2/testutil"
1818
)
1919

20-
funcTestLogSender(t*testing.T) {
20+
funcTestLogSender_Mainline(t*testing.T) {
2121
t.Parallel()
2222
testCtx:=testutil.Context(t,testutil.WaitShort)
2323
ctx,cancel:=context.WithCancel(testCtx)
@@ -60,7 +60,9 @@ func TestLogSender(t *testing.T) {
6060
// both, although the order is not controlled
6161
varlogReqs []*proto.BatchCreateLogsRequest
6262
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
63+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
6364
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
65+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
6466
for_,req:=rangelogReqs {
6567
require.NotNil(t,req)
6668
srcID,err:=uuid.FromBytes(req.LogSourceId)
@@ -95,6 +97,7 @@ func TestLogSender(t *testing.T) {
9597
require.NoError(t,err)
9698

9799
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
100+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
98101
// give ourselves a 25% buffer if we're right on the cusp of a tick
99102
require.LessOrEqual(t,time.Since(t1),flushInterval*5/4)
100103
require.NotNil(t,req)
@@ -116,21 +119,72 @@ func TestLogSender(t *testing.T) {
116119
require.NoError(t,err)
117120
}
118121

122+
funcTestLogSender_LogLimitExceeded(t*testing.T) {
123+
t.Parallel()
124+
ctx:=testutil.Context(t,testutil.WaitShort)
125+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
126+
fDest:=newFakeLogDest()
127+
uut:=newLogSender(logger)
128+
129+
t0:=dbtime.Now()
130+
131+
ls1:= uuid.UUID{0x11}
132+
err:=uut.enqueue(ls1, agentsdk.Log{
133+
CreatedAt:t0,
134+
Output:"test log 0, src 1",
135+
Level:codersdk.LogLevelInfo,
136+
})
137+
require.NoError(t,err)
138+
139+
loopErr:=make(chanerror,1)
140+
gofunc() {
141+
err:=uut.sendLoop(ctx,fDest)
142+
loopErr<-err
143+
}()
144+
145+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
146+
require.NotNil(t,req)
147+
testutil.RequireSendCtx(ctx,t,fDest.resps,
148+
&proto.BatchCreateLogsResponse{LogLimitExceeded:true})
149+
150+
err=testutil.RequireRecvCtx(ctx,t,loopErr)
151+
require.NoError(t,err)
152+
153+
// we can still enqueue more logs after sendLoop returns, but they don't
154+
// actually get enqueued
155+
err=uut.enqueue(ls1, agentsdk.Log{
156+
CreatedAt:t0,
157+
Output:"test log 2, src 1",
158+
Level:codersdk.LogLevelTrace,
159+
})
160+
require.NoError(t,err)
161+
uut.L.Lock()
162+
deferuut.L.Unlock()
163+
require.Len(t,uut.queues,0)
164+
}
165+
119166
typefakeLogDeststruct {
120-
reqschan*proto.BatchCreateLogsRequest
167+
reqschan*proto.BatchCreateLogsRequest
168+
respschan*proto.BatchCreateLogsResponse
121169
}
122170

123171
func (ffakeLogDest)BatchCreateLogs(ctx context.Context,req*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error) {
124172
select {
125173
case<-ctx.Done():
126174
returnnil,ctx.Err()
127175
casef.reqs<-req:
128-
return&proto.BatchCreateLogsResponse{},nil
176+
select {
177+
case<-ctx.Done():
178+
returnnil,ctx.Err()
179+
caseresp:=<-f.resps:
180+
returnresp,nil
181+
}
129182
}
130183
}
131184

132185
funcnewFakeLogDest()*fakeLogDest {
133186
return&fakeLogDest{
134-
reqs:make(chan*proto.BatchCreateLogsRequest),
187+
reqs:make(chan*proto.BatchCreateLogsRequest),
188+
resps:make(chan*proto.BatchCreateLogsResponse),
135189
}
136190
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp