Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: handle log limit exceeded in logSender#12079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletionsagent/logs.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -26,8 +26,9 @@ type logQueue struct {
// the agent calls sendLoop to send pending logs.
type logSender struct {
*sync.Cond
queues map[uuid.UUID]*logQueue
logger slog.Logger
queues map[uuid.UUID]*logQueue
logger slog.Logger
exceededLogLimit bool
}

type logDest interface {
Expand All@@ -50,6 +51,11 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
}
l.L.Lock()
defer l.L.Unlock()
if l.exceededLogLimit {
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
// don't error, as we also write to file and don't want the overall write to fail
return nil
}
defer l.Broadcast()
q, ok := l.queues[src]
if !ok {
Expand DownExpand Up@@ -88,6 +94,8 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
defer l.logger.Debug(ctx, "sendLoop exiting")

// wake 4 times per flush interval to check if anything needs to be flushed
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
tkr := time.NewTicker(flushInterval / 4)
defer tkr.Stop()
Expand All@@ -110,12 +118,18 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
l.L.Lock()
defer l.L.Unlock()
for {
for !ctxDone && !l.hasPendingWorkLocked() {
for !ctxDone && !l.exceededLogLimit && !l.hasPendingWorkLocked() {
l.Wait()
}
if ctxDone {
return nil
}
if l.exceededLogLimit {
l.logger.Debug(ctx, "aborting sendLoop because log limit is already exceeded")
// no point in keeping this loop going, if log limit is exceeded, but don't return an
// error because we're already handled it
return nil
}
src, q := l.getPendingWorkLocked()
q.flushRequested = false // clear flag since we're now flushing
req := &proto.BatchCreateLogsRequest{
Expand All@@ -125,11 +139,20 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {

l.L.Unlock()
l.logger.Debug(ctx, "sending logs to agent API", slog.F("log_source_id", src), slog.F("num_logs", len(req.Logs)))
_, err := dest.BatchCreateLogs(ctx, req)
resp, err := dest.BatchCreateLogs(ctx, req)
l.L.Lock()
if err != nil {
return xerrors.Errorf("failed to upload logs: %w", err)
}
if resp.LogLimitExceeded {
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
l.exceededLogLimit = true
// no point in keeping anything we have queued around, server will not accept them
l.queues = make(map[uuid.UUID]*logQueue)
// We've handled the error as best as we can. We don't want the server limit to grind
// other things to a halt, so this is all we can do.
return nil
}

// Since elsewhere we only append to the logs, here we can remove them
// since we successfully sent them. First we nil the pointers though,
Expand Down
62 changes: 58 additions & 4 deletionsagent/logs_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,7 +19,7 @@ import (
"github.com/coder/coder/v2/testutil"
)

funcTestLogSender(t *testing.T) {
funcTestLogSender_Mainline(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
Expand DownExpand Up@@ -62,7 +62,9 @@ func TestLogSender(t *testing.T) {
// both, although the order is not controlled
var logReqs []*proto.BatchCreateLogsRequest
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
logReqs = append(logReqs, testutil.RequireRecvCtx(ctx, t, fDest.reqs))
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
for _, req := range logReqs {
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
Expand DownExpand Up@@ -97,6 +99,7 @@ func TestLogSender(t *testing.T) {
require.NoError(t, err)

req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
// give ourselves a 25% buffer if we're right on the cusp of a tick
require.LessOrEqual(t, time.Since(t1), flushInterval*5/4)
require.NotNil(t, req)
Expand All@@ -118,8 +121,53 @@ func TestLogSender(t *testing.T) {
require.NoError(t, err)
}

func TestLogSender_LogLimitExceeded(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := newLogSender(logger)

t0 := dbtime.Now()

ls1 := uuid.UUID{0x11}
err := uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t0,
Output: "test log 0, src 1",
Level: codersdk.LogLevelInfo,
})
require.NoError(t, err)

loopErr := make(chan error, 1)
go func() {
err := uut.sendLoop(ctx, fDest)
loopErr <- err
}()

req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
testutil.RequireSendCtx(ctx, t, fDest.resps,
&proto.BatchCreateLogsResponse{LogLimitExceeded: true})

err = testutil.RequireRecvCtx(ctx, t, loopErr)
require.NoError(t, err)

// we can still enqueue more logs after sendLoop returns, but they don't
// actually get enqueued
err = uut.enqueue(ls1, agentsdk.Log{
CreatedAt: t0,
Output: "test log 2, src 1",
Level: codersdk.LogLevelTrace,
})
require.NoError(t, err)
uut.L.Lock()
defer uut.L.Unlock()
require.Len(t, uut.queues, 0)
}

type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse
}

func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error) {
Expand All@@ -130,12 +178,18 @@ func (f fakeLogDest) BatchCreateLogs(ctx context.Context, req *proto.BatchCreate
case <-ctx.Done():
return nil, ctx.Err()
case f.reqs <- req:
return &proto.BatchCreateLogsResponse{}, nil
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-f.resps:
return resp, nil
}
}
}

func newFakeLogDest() *fakeLogDest {
return &fakeLogDest{
reqs: make(chan *proto.BatchCreateLogsRequest),
reqs: make(chan *proto.BatchCreateLogsRequest),
resps: make(chan *proto.BatchCreateLogsResponse),
}
}

[8]ページ先頭

©2009-2025 Movatter.jp