Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita5bd376

Browse files
authored
feat: implement retry mechanism for log processing (#136)
* feat: implement retry mechanism for log processing* add tests for retry mechanism and logCache functionality* simplify retry state initialization in logQueuer* implement maxRetries for log processing* Apply review suggestions and fix maxRetries* Remove maxRetries configuration from CLI and Helm values, setting a default of 15 retries for log send failures.
1 parent13d885f commita5bd376

File tree

3 files changed

+680
-69
lines changed

3 files changed

+680
-69
lines changed

‎logger.go‎

Lines changed: 199 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37+
// maxRetries is the maximum number of retries for a log send failure.
38+
maxRetriesint
3739

3840
// The following fields are optional!
3941
namespaces []string
@@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5254
opts.clock=quartz.NewReal()
5355
}
5456

57+
ifopts.maxRetries==0 {
58+
opts.maxRetries=10
59+
}
60+
5561
logCh:=make(chanagentLog,512)
5662
ctx,cancelFunc:=context.WithCancel(ctx)
5763
reporter:=&podEventLogger{
@@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
7581
logCache:logCache{
7682
logs:map[string][]agentsdk.Log{},
7783
},
84+
maxRetries:opts.maxRetries,
7885
},
7986
}
8087

@@ -407,6 +414,11 @@ type logQueuer struct {
407414
loggerTTL time.Duration
408415
loggersmap[string]agentLoggerLifecycle
409416
logCachelogCache
417+
418+
// retries maps agent tokens to their retry state for exponential backoff
419+
retriesmap[string]*retryState
420+
// maxRetries is the maximum number of retries for a log send failure.
421+
maxRetriesint
410422
}
411423

412424
func (l*logQueuer)work(ctx context.Context) {
@@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) {
427439
}
428440
}
429441

442+
func (l*logQueuer)newLogger(ctx context.Context,logagentLog) (agentLoggerLifecycle,error) {
443+
client:=agentsdk.New(l.coderURL)
444+
client.SetSessionToken(log.agentToken)
445+
logger:=l.logger.With(slog.F("resource_name",log.resourceName))
446+
client.SDK.SetLogger(logger)
447+
448+
_,err:=client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
449+
ID:sourceUUID,
450+
Icon:"/icon/k8s.png",
451+
DisplayName:"Kubernetes",
452+
})
453+
iferr!=nil {
454+
// Posting the log source failed, which affects how logs appear.
455+
// We'll retry to ensure the log source is properly registered.
456+
logger.Error(ctx,"post log source",slog.Error(err))
457+
returnagentLoggerLifecycle{},err
458+
}
459+
460+
ls:=agentsdk.NewLogSender(logger)
461+
sl:=ls.GetScriptLogger(sourceUUID)
462+
463+
gracefulCtx,gracefulCancel:=context.WithCancel(context.Background())
464+
465+
// connect to Agent v2.0 API, since we don't need features added later.
466+
// This maximizes compatibility.
467+
arpc,err:=client.ConnectRPC20(gracefulCtx)
468+
iferr!=nil {
469+
logger.Error(ctx,"drpc connect",slog.Error(err))
470+
gracefulCancel()
471+
returnagentLoggerLifecycle{},err
472+
}
473+
gofunc() {
474+
err:=ls.SendLoop(gracefulCtx,arpc)
475+
// if the send loop exits on its own without the context
476+
// canceling, timeout the logger and force it to recreate.
477+
iferr!=nil&&ctx.Err()==nil {
478+
l.loggerTimeout(log.agentToken)
479+
}
480+
}()
481+
482+
closeTimer:=l.clock.AfterFunc(l.loggerTTL,func() {
483+
logger.Info(ctx,"logger timeout firing")
484+
l.loggerTimeout(log.agentToken)
485+
})
486+
lifecycle:=agentLoggerLifecycle{
487+
scriptLogger:sl,
488+
close:func() {
489+
deferarpc.DRPCConn().Close()
490+
deferclient.SDK.HTTPClient.CloseIdleConnections()
491+
// We could be stopping for reasons other than the timeout. If
492+
// so, stop the timer.
493+
closeTimer.Stop()
494+
defergracefulCancel()
495+
timeout:=l.clock.AfterFunc(5*time.Second,gracefulCancel)
496+
defertimeout.Stop()
497+
logger.Info(ctx,"logger closing")
498+
499+
iferr:=sl.Flush(gracefulCtx);err!=nil {
500+
// ctx err
501+
logger.Warn(gracefulCtx,"timeout reached while flushing")
502+
return
503+
}
504+
505+
iferr:=ls.WaitUntilEmpty(gracefulCtx);err!=nil {
506+
// ctx err
507+
logger.Warn(gracefulCtx,"timeout reached while waiting for log queue to empty")
508+
}
509+
},
510+
}
511+
lifecycle.closeTimer=closeTimer
512+
returnlifecycle,nil
513+
}
514+
430515
func (l*logQueuer)processLog(ctx context.Context,logagentLog) {
431516
l.mu.Lock()
432517
deferl.mu.Unlock()
433-
queuedLogs:=l.logCache.push(log)
518+
519+
queuedLogs:=l.logCache.get(log.agentToken)
520+
ifisAgentLogEmpty(log) {
521+
ifqueuedLogs==nil {
522+
return
523+
}
524+
}else {
525+
queuedLogs=l.logCache.push(log)
526+
}
527+
434528
lgr,ok:=l.loggers[log.agentToken]
435529
if!ok {
436-
client:=agentsdk.New(l.coderURL)
437-
client.SetSessionToken(log.agentToken)
438-
logger:=l.logger.With(slog.F("resource_name",log.resourceName))
439-
client.SDK.SetLogger(logger)
440-
441-
_,err:=client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
442-
ID:sourceUUID,
443-
Icon:"/icon/k8s.png",
444-
DisplayName:"Kubernetes",
445-
})
446-
iferr!=nil {
447-
// This shouldn't fail sending the log, as it only affects how they
448-
// appear.
449-
logger.Error(ctx,"post log source",slog.Error(err))
530+
// skip if we're in a retry cooldown window
531+
ifrs:=l.retries[log.agentToken];rs!=nil&&rs.timer!=nil {
532+
return
450533
}
451534

452-
ls:=agentsdk.NewLogSender(logger)
453-
sl:=ls.GetScriptLogger(sourceUUID)
454-
455-
gracefulCtx,gracefulCancel:=context.WithCancel(context.Background())
456-
457-
// connect to Agent v2.0 API, since we don't need features added later.
458-
// This maximizes compatibility.
459-
arpc,err:=client.ConnectRPC20(gracefulCtx)
535+
varerrerror
536+
lgr,err=l.newLogger(ctx,log)
460537
iferr!=nil {
461-
logger.Error(ctx,"drpc connect",slog.Error(err))
462-
gracefulCancel()
538+
l.scheduleRetry(ctx,log.agentToken)
463539
return
464540
}
465-
gofunc() {
466-
err:=ls.SendLoop(gracefulCtx,arpc)
467-
// if the send loop exits on its own without the context
468-
// canceling, timeout the logger and force it to recreate.
469-
iferr!=nil&&ctx.Err()==nil {
470-
l.loggerTimeout(log.agentToken)
471-
}
472-
}()
473-
474-
closeTimer:=l.clock.AfterFunc(l.loggerTTL,func() {
475-
logger.Info(ctx,"logger timeout firing")
476-
l.loggerTimeout(log.agentToken)
477-
})
478-
lifecycle:=agentLoggerLifecycle{
479-
scriptLogger:sl,
480-
close:func() {
481-
// We could be stopping for reasons other than the timeout. If
482-
// so, stop the timer.
483-
closeTimer.Stop()
484-
defergracefulCancel()
485-
timeout:=l.clock.AfterFunc(5*time.Second,gracefulCancel)
486-
defertimeout.Stop()
487-
logger.Info(ctx,"logger closing")
488-
489-
iferr:=sl.Flush(gracefulCtx);err!=nil {
490-
// ctx err
491-
logger.Warn(gracefulCtx,"timeout reached while flushing")
492-
return
493-
}
494-
495-
iferr:=ls.WaitUntilEmpty(gracefulCtx);err!=nil {
496-
// ctx err
497-
logger.Warn(gracefulCtx,"timeout reached while waiting for log queue to empty")
498-
}
499-
500-
_=arpc.DRPCConn().Close()
501-
client.SDK.HTTPClient.CloseIdleConnections()
502-
},
503-
}
504-
lifecycle.closeTimer=closeTimer
505-
l.loggers[log.agentToken]=lifecycle
506-
lgr=lifecycle
541+
l.loggers[log.agentToken]=lgr
507542
}
508543

509544
lgr.resetCloseTimer(l.loggerTTL)
510-
_=lgr.scriptLogger.Send(ctx,queuedLogs...)
545+
iflen(queuedLogs)==0 {
546+
return
547+
}
548+
iferr:=lgr.scriptLogger.Send(ctx,queuedLogs...);err!=nil {
549+
l.scheduleRetry(ctx,log.agentToken)
550+
return
551+
}
552+
l.clearRetryLocked(log.agentToken)
511553
l.logCache.delete(log.agentToken)
512554
}
513555

@@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) {
516558
lgr,ok:=l.loggers[log.agentToken]
517559
ifok {
518560
delete(l.loggers,log.agentToken)
519-
520561
}
562+
l.clearRetryLocked(log.agentToken)
563+
l.logCache.delete(log.agentToken)
521564
l.mu.Unlock()
522565

523566
ifok {
@@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549592
}
550593
}
551594

595+
// retryState tracks exponential backoff for an agent token.
596+
typeretryStatestruct {
597+
delay time.Duration
598+
timer*quartz.Timer
599+
retryCountint
600+
exhaustedbool// prevent retry state recreation after max retries
601+
}
602+
603+
func (l*logQueuer)scheduleRetry(ctx context.Context,tokenstring) {
604+
ifl.retries==nil {
605+
l.retries=make(map[string]*retryState)
606+
}
607+
608+
rs:=l.retries[token]
609+
610+
ifrs!=nil&&rs.exhausted {
611+
return
612+
}
613+
614+
ifrs==nil {
615+
rs=&retryState{delay:time.Second,retryCount:0,exhausted:false}
616+
l.retries[token]=rs
617+
}
618+
619+
rs.retryCount++
620+
621+
// If we've reached the max retries, clear the retry state and delete the log cache.
622+
ifrs.retryCount>=l.maxRetries {
623+
l.logger.Error(ctx,"max retries exceeded",
624+
slog.F("retryCount",rs.retryCount),
625+
slog.F("maxRetries",l.maxRetries))
626+
rs.exhausted=true
627+
ifrs.timer!=nil {
628+
rs.timer.Stop()
629+
rs.timer=nil
630+
}
631+
l.logCache.delete(token)
632+
return
633+
}
634+
635+
ifrs.timer!=nil {
636+
return
637+
}
638+
639+
l.logger.Info(ctx,"scheduling retry",
640+
slog.F("delay",rs.delay.String()),
641+
slog.F("retryCount",rs.retryCount))
642+
643+
rs.timer=l.clock.AfterFunc(rs.delay,func() {
644+
l.mu.Lock()
645+
deferl.mu.Unlock()
646+
647+
ifcur:=l.retries[token];cur!=nil&&!cur.exhausted {
648+
cur.timer=nil
649+
l.q<-agentLog{op:opLog,agentToken:token}
650+
}
651+
})
652+
653+
rs.delay*=2
654+
ifrs.delay>30*time.Second {
655+
rs.delay=30*time.Second
656+
}
657+
}
658+
659+
// clearRetryLocked clears the retry state for the given token.
660+
// The caller must hold the mutex lock.
661+
func (l*logQueuer)clearRetryLocked(tokenstring) {
662+
ifrs:=l.retries[token];rs!=nil {
663+
ifrs.timer!=nil {
664+
rs.timer.Stop()
665+
}
666+
delete(l.retries,token)
667+
}
668+
}
669+
552670
funcnewColor(value...color.Attribute)*color.Color {
553671
c:=color.New(value...)
554672
c.EnableColor()
@@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572690
func (l*logCache)delete(tokenstring) {
573691
delete(l.logs,token)
574692
}
693+
694+
func (l*logCache)get(tokenstring) []agentsdk.Log {
695+
logs,ok:=l.logs[token]
696+
if!ok {
697+
returnnil
698+
}
699+
returnlogs
700+
}
701+
702+
funcisAgentLogEmpty(logagentLog)bool {
703+
returnlog.resourceName==""&&log.log.Output==""&&log.log.CreatedAt.IsZero()
704+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp