Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0515169

Browse files
committed
feat: handle log limit exceeded in logSender
1 parentad9d94a commit0515169

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

‎agent/logs.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ type logQueue struct {
2626
// the agent calls sendLoop to send pending logs.
2727
typelogSenderstruct {
2828
*sync.Cond
29-
queuesmap[uuid.UUID]*logQueue
30-
logger slog.Logger
29+
queuesmap[uuid.UUID]*logQueue
30+
logger slog.Logger
31+
exceededLogLimitbool
3132
}
3233

3334
typelogDestinterface {
@@ -50,6 +51,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
5051
}
5152
l.L.Lock()
5253
deferl.L.Unlock()
54+
ifl.exceededLogLimit {
55+
logger.Warn(context.Background(),"dropping enqueued logs because we have reached the server limit")
56+
// don't error, as we also write to file and don't want the overall write to fail
57+
returnnil
58+
}
5359
deferl.Broadcast()
5460
q,ok:=l.queues[src]
5561
if!ok {
@@ -88,6 +94,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
8894
deferl.logger.Debug(ctx,"sendLoop exiting")
8995

9096
// wake 4 times per flush interval to check if anything needs to be flushed
97+
ctx,cancel:=context.WithCancel(ctx)
98+
defercancel()
9199
gofunc() {
92100
tkr:=time.NewTicker(flushInterval/4)
93101
defertkr.Stop()
@@ -110,12 +118,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
110118
l.L.Lock()
111119
deferl.L.Unlock()
112120
for {
113-
for!ctxDone&&!l.hasPendingWorkLocked() {
121+
for!ctxDone&&!l.exceededLogLimit&&!l.hasPendingWorkLocked() {
114122
l.Wait()
115123
}
116124
ifctxDone {
117125
returnnil
118126
}
127+
ifl.exceededLogLimit {
128+
l.logger.Debug(ctx,"aborting sendLoop because log limit is already exceeded")
129+
// no point in keeping this loop going, if log limit is exceeded, but don't return an
130+
// error because we're already handled it
131+
returnnil
132+
}
119133
src,q:=l.getPendingWorkLocked()
120134
q.flushRequested=false// clear flag since we're now flushing
121135
req:=&proto.BatchCreateLogsRequest{
@@ -125,11 +139,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
125139

126140
l.L.Unlock()
127141
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
128-
_,err:=dest.BatchCreateLogs(ctx,req)
142+
resp,err:=dest.BatchCreateLogs(ctx,req)
129143
l.L.Lock()
130144
iferr!=nil {
131145
returnxerrors.Errorf("failed to upload logs: %w",err)
132146
}
147+
ifresp.LogLimitExceeded {
148+
l.logger.Warn(ctx,"server log limit exceeded; logs truncated")
149+
l.exceededLogLimit=true
150+
// no point in keeping anything we have queued around, server will not accept them
151+
l.queues=make(map[uuid.UUID]*logQueue)
152+
// We've handled the error as best as we can. We don't want the server limit to grind
153+
// other things to a halt, so this is all we can do.
154+
returnnil
155+
}
133156

134157
// Since elsewhere we only append to the logs, here we can remove them
135158
// since we successfully sent them. First we nil the pointers though,

‎agent/logs_internal_test.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/coder/coder/v2/testutil"
2020
)
2121

22-
funcTestLogSender(t*testing.T) {
22+
funcTestLogSender_Mainline(t*testing.T) {
2323
t.Parallel()
2424
testCtx:=testutil.Context(t,testutil.WaitShort)
2525
ctx,cancel:=context.WithCancel(testCtx)
@@ -62,7 +62,9 @@ func TestLogSender(t *testing.T) {
6262
// both, although the order is not controlled
6363
varlogReqs []*proto.BatchCreateLogsRequest
6464
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
65+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
6566
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
67+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
6668
for_,req:=rangelogReqs {
6769
require.NotNil(t,req)
6870
srcID,err:=uuid.FromBytes(req.LogSourceId)
@@ -97,6 +99,7 @@ func TestLogSender(t *testing.T) {
9799
require.NoError(t,err)
98100

99101
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
102+
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{})
100103
// give ourselves a 25% buffer if we're right on the cusp of a tick
101104
require.LessOrEqual(t,time.Since(t1),flushInterval*5/4)
102105
require.NotNil(t,req)
@@ -118,8 +121,53 @@ func TestLogSender(t *testing.T) {
118121
require.NoError(t,err)
119122
}
120123

124+
funcTestLogSender_LogLimitExceeded(t*testing.T) {
125+
t.Parallel()
126+
ctx:=testutil.Context(t,testutil.WaitShort)
127+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
128+
fDest:=newFakeLogDest()
129+
uut:=newLogSender(logger)
130+
131+
t0:=dbtime.Now()
132+
133+
ls1:= uuid.UUID{0x11}
134+
err:=uut.enqueue(ls1, agentsdk.Log{
135+
CreatedAt:t0,
136+
Output:"test log 0, src 1",
137+
Level:codersdk.LogLevelInfo,
138+
})
139+
require.NoError(t,err)
140+
141+
loopErr:=make(chanerror,1)
142+
gofunc() {
143+
err:=uut.sendLoop(ctx,fDest)
144+
loopErr<-err
145+
}()
146+
147+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
148+
require.NotNil(t,req)
149+
testutil.RequireSendCtx(ctx,t,fDest.resps,
150+
&proto.BatchCreateLogsResponse{LogLimitExceeded:true})
151+
152+
err=testutil.RequireRecvCtx(ctx,t,loopErr)
153+
require.NoError(t,err)
154+
155+
// we can still enqueue more logs after sendLoop returns, but they don't
156+
// actually get enqueued
157+
err=uut.enqueue(ls1, agentsdk.Log{
158+
CreatedAt:t0,
159+
Output:"test log 2, src 1",
160+
Level:codersdk.LogLevelTrace,
161+
})
162+
require.NoError(t,err)
163+
uut.L.Lock()
164+
deferuut.L.Unlock()
165+
require.Len(t,uut.queues,0)
166+
}
167+
121168
typefakeLogDeststruct {
122-
reqschan*proto.BatchCreateLogsRequest
169+
reqschan*proto.BatchCreateLogsRequest
170+
respschan*proto.BatchCreateLogsResponse
123171
}
124172

125173
func (ffakeLogDest)BatchCreateLogs(ctx context.Context,req*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error) {
@@ -130,12 +178,18 @@ func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreate
130178
case<-ctx.Done():
131179
returnnil,ctx.Err()
132180
casef.reqs<-req:
133-
return&proto.BatchCreateLogsResponse{},nil
181+
select {
182+
case<-ctx.Done():
183+
returnnil,ctx.Err()
184+
caseresp:=<-f.resps:
185+
returnresp,nil
186+
}
134187
}
135188
}
136189

137190
funcnewFakeLogDest()*fakeLogDest {
138191
return&fakeLogDest{
139-
reqs:make(chan*proto.BatchCreateLogsRequest),
192+
reqs:make(chan*proto.BatchCreateLogsRequest),
193+
resps:make(chan*proto.BatchCreateLogsResponse),
140194
}
141195
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp