Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement retry mechanism for log processing#136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kacpersaw merged 6 commits intomainfromkacpersaw/feat-log-queue
Sep 30, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some comments aren't visible on the classic Files Changed page.

268 changes: 199 additions & 69 deletionslogger.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -34,6 +34,8 @@ type podEventLoggerOptions struct {

logger slog.Logger
logDebounce time.Duration
// maxRetries is the maximum number of retries for a log send failure.
maxRetries int

// The following fields are optional!
namespaces []string
Expand All@@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
opts.clock = quartz.NewReal()
}

if opts.maxRetries == 0 {
opts.maxRetries = 10
}

logCh := make(chan agentLog, 512)
ctx, cancelFunc := context.WithCancel(ctx)
reporter := &podEventLogger{
Expand All@@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
logCache: logCache{
logs: map[string][]agentsdk.Log{},
},
maxRetries: opts.maxRetries,
},
}

Expand DownExpand Up@@ -407,6 +414,11 @@ type logQueuer struct {
loggerTTL time.Duration
loggers map[string]agentLoggerLifecycle
logCache logCache

// retries maps agent tokens to their retry state for exponential backoff
retries map[string]*retryState
// maxRetries is the maximum number of retries for a log send failure.
maxRetries int
}

func (l *logQueuer) work(ctx context.Context) {
Expand All@@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) {
}
}

func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) {
client := agentsdk.New(l.coderURL)
client.SetSessionToken(log.agentToken)
logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)

_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
ID: sourceUUID,
Icon: "/icon/k8s.png",
DisplayName: "Kubernetes",
})
if err != nil {
// Posting the log source failed, which affects how logs appear.
// We'll retry to ensure the log source is properly registered.
logger.Error(ctx, "post log source", slog.Error(err))
return agentLoggerLifecycle{}, err
}

ls := agentsdk.NewLogSender(logger)
sl := ls.GetScriptLogger(sourceUUID)

gracefulCtx, gracefulCancel := context.WithCancel(context.Background())

// connect to Agent v2.0 API, since we don't need features added later.
// This maximizes compatibility.
arpc, err := client.ConnectRPC20(gracefulCtx)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
go func() {
err := ls.SendLoop(gracefulCtx, arpc)
// if the send loop exits on its own without the context
// canceling, timeout the logger and force it to recreate.
if err != nil && ctx.Err() == nil {
l.loggerTimeout(log.agentToken)
}
}()

closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
logger.Info(ctx, "logger timeout firing")
l.loggerTimeout(log.agentToken)
})
lifecycle := agentLoggerLifecycle{
scriptLogger: sl,
close: func() {
defer arpc.DRPCConn().Close()
defer client.SDK.HTTPClient.CloseIdleConnections()
// We could be stopping for reasons other than the timeout. If
// so, stop the timer.
closeTimer.Stop()
defer gracefulCancel()
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
defer timeout.Stop()
logger.Info(ctx, "logger closing")

if err := sl.Flush(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while flushing")
return
}

if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
}
},
}
lifecycle.closeTimer = closeTimer
return lifecycle, nil
}

func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
l.mu.Lock()
defer l.mu.Unlock()
queuedLogs := l.logCache.push(log)

queuedLogs := l.logCache.get(log.agentToken)
if isAgentLogEmpty(log) {
if queuedLogs == nil {
return
}
} else {
queuedLogs = l.logCache.push(log)
}

lgr, ok := l.loggers[log.agentToken]
if !ok {
client := agentsdk.New(l.coderURL)
client.SetSessionToken(log.agentToken)
logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)

_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
ID: sourceUUID,
Icon: "/icon/k8s.png",
DisplayName: "Kubernetes",
})
if err != nil {
// This shouldn't fail sending the log, as it only affects how they
// appear.
logger.Error(ctx, "post log source", slog.Error(err))
// skip if we're in a retry cooldown window
if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil {
return
}

ls := agentsdk.NewLogSender(logger)
sl := ls.GetScriptLogger(sourceUUID)

gracefulCtx, gracefulCancel := context.WithCancel(context.Background())

// connect to Agent v2.0 API, since we don't need features added later.
// This maximizes compatibility.
arpc, err := client.ConnectRPC20(gracefulCtx)
var err error
lgr, err = l.newLogger(ctx, log)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
l.scheduleRetry(ctx, log.agentToken)
return
}
go func() {
err := ls.SendLoop(gracefulCtx, arpc)
// if the send loop exits on its own without the context
// canceling, timeout the logger and force it to recreate.
if err != nil && ctx.Err() == nil {
l.loggerTimeout(log.agentToken)
}
}()

closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
logger.Info(ctx, "logger timeout firing")
l.loggerTimeout(log.agentToken)
})
lifecycle := agentLoggerLifecycle{
scriptLogger: sl,
close: func() {
// We could be stopping for reasons other than the timeout. If
// so, stop the timer.
closeTimer.Stop()
defer gracefulCancel()
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
defer timeout.Stop()
logger.Info(ctx, "logger closing")

if err := sl.Flush(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while flushing")
return
}

if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
}

_ = arpc.DRPCConn().Close()
client.SDK.HTTPClient.CloseIdleConnections()
},
}
lifecycle.closeTimer = closeTimer
l.loggers[log.agentToken] = lifecycle
lgr = lifecycle
l.loggers[log.agentToken] = lgr
}

lgr.resetCloseTimer(l.loggerTTL)
_ = lgr.scriptLogger.Send(ctx, queuedLogs...)
if len(queuedLogs) == 0 {
return
}
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
l.scheduleRetry(ctx, log.agentToken)
return
}
l.clearRetryLocked(log.agentToken)
l.logCache.delete(log.agentToken)
}

Expand All@@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) {
lgr, ok := l.loggers[log.agentToken]
if ok {
delete(l.loggers, log.agentToken)

}
l.clearRetryLocked(log.agentToken)
l.logCache.delete(log.agentToken)
l.mu.Unlock()

if ok {
Expand DownExpand Up@@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
}
}

// retryState tracks exponential backoff for an agent token.
type retryState struct {
delay time.Duration
timer *quartz.Timer
retryCount int
exhausted bool // prevent retry state recreation after max retries
}

func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
if l.retries == nil {
l.retries = make(map[string]*retryState)
}

rs := l.retries[token]

if rs != nil && rs.exhausted {
return
}

if rs == nil {
rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false}
l.retries[token] = rs
}

rs.retryCount++

// If we've reached the max retries, clear the retry state and delete the log cache.
if rs.retryCount >= l.maxRetries {
l.logger.Error(ctx, "max retries exceeded",
slog.F("retryCount", rs.retryCount),
slog.F("maxRetries", l.maxRetries))
rs.exhausted = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This will be kept in memory forever now right?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

if rs.timer != nil {
rs.timer.Stop()
rs.timer = nil
}
l.logCache.delete(token)
return
}

if rs.timer != nil {
return
}

l.logger.Info(ctx, "scheduling retry",
slog.F("delay", rs.delay.String()),
slog.F("retryCount", rs.retryCount))

rs.timer = l.clock.AfterFunc(rs.delay, func() {
l.mu.Lock()
defer l.mu.Unlock()

if cur := l.retries[token]; cur != nil && !cur.exhausted {
cur.timer = nil
l.q <- agentLog{op: opLog, agentToken: token}
}
})

rs.delay *= 2
if rs.delay > 30*time.Second {
rs.delay = 30 * time.Second
}
}

// clearRetryLocked clears the retry state for the given token.
// The caller must hold the mutex lock.
func (l *logQueuer) clearRetryLocked(token string) {
if rs := l.retries[token]; rs != nil {
if rs.timer != nil {
rs.timer.Stop()
}
delete(l.retries, token)
}
}

func newColor(value ...color.Attribute) *color.Color {
c := color.New(value...)
c.EnableColor()
Expand All@@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
func (l *logCache) delete(token string) {
delete(l.logs, token)
}

func (l *logCache) get(token string) []agentsdk.Log {
logs, ok := l.logs[token]
if !ok {
return nil
}
return logs
}

func isAgentLogEmpty(log agentLog) bool {
return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero()
}
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp