Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: limit queued logs to database limit in agent#12067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletionsagent/logs.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -14,9 +14,14 @@ import (
)

const (
flushInterval = time.Second
logOutputMaxBytes = 1 << 20 // 1MiB
overheadPerLog = 21 // found by testing
flushInterval = time.Second
maxBytesPerBatch = 1 << 20 // 1MiB
overheadPerLog = 21 // found by testing

// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
// accept in the database.
maxBytesQueued = 1048576
)

type logQueue struct {
Expand All@@ -33,6 +38,7 @@ type logSender struct {
queues map[uuid.UUID]*logQueue
logger slog.Logger
exceededLogLimit bool
outputLen int
}

type logDest interface {
Expand All@@ -47,6 +53,8 @@ func newLogSender(logger slog.Logger) *logSender {
}
}

var MaxQueueExceededError = xerrors.New("maximum queued logs exceeded")

func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
logger := l.logger.With(slog.F("log_source_id", src))
if len(logs) == 0 {
Expand All@@ -66,12 +74,25 @@ func (l *logSender) enqueue(src uuid.UUID, logs ...agentsdk.Log) error {
q = &logQueue{}
l.queues[src] = q
}
for _, log := range logs {
for k, log := range logs {
// Here we check the queue size before adding a log because we want to queue up slightly
// more logs than the database would store to ensure we trigger "logs truncated" at the
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
// examined the Coder agent logs.
if l.outputLen > maxBytesQueued {
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
return MaxQueueExceededError
}
pl, err := agentsdk.ProtoFromLog(log)
if err != nil {
return xerrors.Errorf("failed to convert log: %w", err)
}
if len(pl.Output) > maxBytesPerBatch {
logger.Warn(context.Background(), "dropping log line that exceeds our limit")
continue
}
q.logs = append(q.logs, pl)
l.outputLen += len(pl.Output)
}
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
return nil
Expand DownExpand Up@@ -140,21 +161,22 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
req := &proto.BatchCreateLogsRequest{
LogSourceId: src[:],
}
o := 0

// outputToSend keeps track of the size of the protobuf message we send, while
// outputToRemove keeps track of the size of the output we'll remove from the queues on
// success. They are different because outputToSend also counts protocol message overheads.
outputToSend := 0
outputToRemove := 0
n := 0
for n < len(q.logs) {
log := q.logs[n]
if len(log.Output) > logOutputMaxBytes {
logger.Warn(ctx, "dropping log line that exceeds our limit")
n++
continue
}
o += len(log.Output) + overheadPerLog
if o > logOutputMaxBytes {
outputToSend += len(log.Output) + overheadPerLog
if outputToSend > maxBytesPerBatch {
break
}
req.Logs = append(req.Logs, log)
n++
outputToRemove += len(log.Output)
}

l.L.Unlock()
Expand All@@ -181,6 +203,7 @@ func (l *logSender) sendLoop(ctx context.Context, dest logDest) error {
q.logs[i] = nil
}
q.logs = q.logs[n:]
l.outputLen -= outputToRemove
if len(q.logs) == 0 {
// no empty queues
delete(l.queues, src)
Expand Down
68 changes: 65 additions & 3 deletionsagent/logs_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -176,7 +176,7 @@ func TestLogSender_SkipHugeLog(t *testing.T) {

t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
hugeLog := make([]byte,logOutputMaxBytes+1)
hugeLog := make([]byte,maxBytesPerBatch+1)
for i := range hugeLog {
hugeLog[i] = 'q'
}
Expand DownExpand Up@@ -246,14 +246,14 @@ func TestLogSender_Batch(t *testing.T) {
gotLogs += len(req.Logs)
wire, err := protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire),logOutputMaxBytes, "wire should not exceed 1MiB")
require.Less(t, len(wire),maxBytesPerBatch, "wire should not exceed 1MiB")
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
req = testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
gotLogs += len(req.Logs)
wire, err = protobuf.Marshal(req)
require.NoError(t, err)
require.Less(t, len(wire),logOutputMaxBytes, "wire should not exceed 1MiB")
require.Less(t, len(wire),maxBytesPerBatch, "wire should not exceed 1MiB")
require.Equal(t, 60000, gotLogs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})

Expand All@@ -262,6 +262,68 @@ func TestLogSender_Batch(t *testing.T) {
require.NoError(t, err)
}

func TestLogSender_MaxQueuedLogs(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
ctx, cancel := context.WithCancel(testCtx)
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
fDest := newFakeLogDest()
uut := newLogSender(logger)

t0 := dbtime.Now()
ls1 := uuid.UUID{0x11}
n := 4
hugeLog := make([]byte, maxBytesQueued/n)
for i := range hugeLog {
hugeLog[i] = 'q'
}
var logs []agentsdk.Log
for i := 0; i < n; i++ {
logs = append(logs, agentsdk.Log{
CreatedAt: t0,
Output: string(hugeLog),
Level: codersdk.LogLevelInfo,
})
}
err := uut.enqueue(ls1, logs...)
require.NoError(t, err)

// we're now right at the limit of output
require.Equal(t, maxBytesQueued, uut.outputLen)

// adding more logs should error...
ls2 := uuid.UUID{0x22}
err = uut.enqueue(ls2, logs...)
require.ErrorIs(t, err, MaxQueueExceededError)

loopErr := make(chan error, 1)
go func() {
err := uut.sendLoop(ctx, fDest)
loopErr <- err
}()

// ...but, it should still queue up one log from source #2, so that we would exceed the database
// limit. These come over a total of 3 updates, because due to overhead, the n logs from source
// #1 come in 2 updates, plus 1 update for source #2.
logsBySource := make(map[uuid.UUID]int)
for i := 0; i < 3; i++ {
req := testutil.RequireRecvCtx(ctx, t, fDest.reqs)
require.NotNil(t, req)
srcID, err := uuid.FromBytes(req.LogSourceId)
require.NoError(t, err)
logsBySource[srcID] += len(req.Logs)
testutil.RequireSendCtx(ctx, t, fDest.resps, &proto.BatchCreateLogsResponse{})
}
require.Equal(t, map[uuid.UUID]int{
ls1: n,
ls2: 1,
}, logsBySource)

cancel()
err = testutil.RequireRecvCtx(testCtx, t, loopErr)
require.NoError(t, err)
}

type fakeLogDest struct {
reqs chan *proto.BatchCreateLogsRequest
resps chan *proto.BatchCreateLogsResponse
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp