@@ -34,6 +34,8 @@ type podEventLoggerOptions struct {
3434
3535logger slog.Logger
3636logDebounce time.Duration
37+ // maxRetries is the maximum number of retries for a log send failure.
38+ maxRetries int
3739
3840// The following fields are optional!
3941namespaces []string
@@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5254opts .clock = quartz .NewReal ()
5355}
5456
57+ if opts .maxRetries == 0 {
58+ opts .maxRetries = 10
59+ }
60+
5561logCh := make (chan agentLog ,512 )
5662ctx ,cancelFunc := context .WithCancel (ctx )
5763reporter := & podEventLogger {
@@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
7581logCache :logCache {
7682logs :map [string ][]agentsdk.Log {},
7783},
84+ maxRetries :opts .maxRetries ,
7885},
7986}
8087
@@ -407,6 +414,11 @@ type logQueuer struct {
407414loggerTTL time.Duration
408415loggers map [string ]agentLoggerLifecycle
409416logCache logCache
417+
418+ // retries maps agent tokens to their retry state for exponential backoff
419+ retries map [string ]* retryState
420+ // maxRetries is the maximum number of retries for a log send failure.
421+ maxRetries int
410422}
411423
412424func (l * logQueuer )work (ctx context.Context ) {
@@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) {
427439}
428440}
429441
442+ func (l * logQueuer )newLogger (ctx context.Context ,log agentLog ) (agentLoggerLifecycle ,error ) {
443+ client := agentsdk .New (l .coderURL )
444+ client .SetSessionToken (log .agentToken )
445+ logger := l .logger .With (slog .F ("resource_name" ,log .resourceName ))
446+ client .SDK .SetLogger (logger )
447+
448+ _ ,err := client .PostLogSource (ctx , agentsdk.PostLogSourceRequest {
449+ ID :sourceUUID ,
450+ Icon :"/icon/k8s.png" ,
451+ DisplayName :"Kubernetes" ,
452+ })
453+ if err != nil {
454+ // Posting the log source failed, which affects how logs appear.
455+ // We'll retry to ensure the log source is properly registered.
456+ logger .Error (ctx ,"post log source" ,slog .Error (err ))
457+ return agentLoggerLifecycle {},err
458+ }
459+
460+ ls := agentsdk .NewLogSender (logger )
461+ sl := ls .GetScriptLogger (sourceUUID )
462+
463+ gracefulCtx ,gracefulCancel := context .WithCancel (context .Background ())
464+
465+ // connect to Agent v2.0 API, since we don't need features added later.
466+ // This maximizes compatibility.
467+ arpc ,err := client .ConnectRPC20 (gracefulCtx )
468+ if err != nil {
469+ logger .Error (ctx ,"drpc connect" ,slog .Error (err ))
470+ gracefulCancel ()
471+ return agentLoggerLifecycle {},err
472+ }
473+ go func () {
474+ err := ls .SendLoop (gracefulCtx ,arpc )
475+ // if the send loop exits on its own without the context
476+ // canceling, timeout the logger and force it to recreate.
477+ if err != nil && ctx .Err ()== nil {
478+ l .loggerTimeout (log .agentToken )
479+ }
480+ }()
481+
482+ closeTimer := l .clock .AfterFunc (l .loggerTTL ,func () {
483+ logger .Info (ctx ,"logger timeout firing" )
484+ l .loggerTimeout (log .agentToken )
485+ })
486+ lifecycle := agentLoggerLifecycle {
487+ scriptLogger :sl ,
488+ close :func () {
489+ defer arpc .DRPCConn ().Close ()
490+ defer client .SDK .HTTPClient .CloseIdleConnections ()
491+ // We could be stopping for reasons other than the timeout. If
492+ // so, stop the timer.
493+ closeTimer .Stop ()
494+ defer gracefulCancel ()
495+ timeout := l .clock .AfterFunc (5 * time .Second ,gracefulCancel )
496+ defer timeout .Stop ()
497+ logger .Info (ctx ,"logger closing" )
498+
499+ if err := sl .Flush (gracefulCtx );err != nil {
500+ // ctx err
501+ logger .Warn (gracefulCtx ,"timeout reached while flushing" )
502+ return
503+ }
504+
505+ if err := ls .WaitUntilEmpty (gracefulCtx );err != nil {
506+ // ctx err
507+ logger .Warn (gracefulCtx ,"timeout reached while waiting for log queue to empty" )
508+ }
509+ },
510+ }
511+ lifecycle .closeTimer = closeTimer
512+ return lifecycle ,nil
513+ }
514+
430515func (l * logQueuer )processLog (ctx context.Context ,log agentLog ) {
431516l .mu .Lock ()
432517defer l .mu .Unlock ()
433- queuedLogs := l .logCache .push (log )
518+
519+ queuedLogs := l .logCache .get (log .agentToken )
520+ if isAgentLogEmpty (log ) {
521+ if queuedLogs == nil {
522+ return
523+ }
524+ }else {
525+ queuedLogs = l .logCache .push (log )
526+ }
527+
434528lgr ,ok := l .loggers [log .agentToken ]
435529if ! ok {
436- client := agentsdk .New (l .coderURL )
437- client .SetSessionToken (log .agentToken )
438- logger := l .logger .With (slog .F ("resource_name" ,log .resourceName ))
439- client .SDK .SetLogger (logger )
440-
441- _ ,err := client .PostLogSource (ctx , agentsdk.PostLogSourceRequest {
442- ID :sourceUUID ,
443- Icon :"/icon/k8s.png" ,
444- DisplayName :"Kubernetes" ,
445- })
446- if err != nil {
447- // This shouldn't fail sending the log, as it only affects how they
448- // appear.
449- logger .Error (ctx ,"post log source" ,slog .Error (err ))
530+ // skip if we're in a retry cooldown window
531+ if rs := l .retries [log .agentToken ];rs != nil && rs .timer != nil {
532+ return
450533}
451534
452- ls := agentsdk .NewLogSender (logger )
453- sl := ls .GetScriptLogger (sourceUUID )
454-
455- gracefulCtx ,gracefulCancel := context .WithCancel (context .Background ())
456-
457- // connect to Agent v2.0 API, since we don't need features added later.
458- // This maximizes compatibility.
459- arpc ,err := client .ConnectRPC20 (gracefulCtx )
535+ var err error
536+ lgr ,err = l .newLogger (ctx ,log )
460537if err != nil {
461- logger .Error (ctx ,"drpc connect" ,slog .Error (err ))
462- gracefulCancel ()
538+ l .scheduleRetry (ctx ,log .agentToken )
463539return
464540}
465- go func () {
466- err := ls .SendLoop (gracefulCtx ,arpc )
467- // if the send loop exits on its own without the context
468- // canceling, timeout the logger and force it to recreate.
469- if err != nil && ctx .Err ()== nil {
470- l .loggerTimeout (log .agentToken )
471- }
472- }()
473-
474- closeTimer := l .clock .AfterFunc (l .loggerTTL ,func () {
475- logger .Info (ctx ,"logger timeout firing" )
476- l .loggerTimeout (log .agentToken )
477- })
478- lifecycle := agentLoggerLifecycle {
479- scriptLogger :sl ,
480- close :func () {
481- // We could be stopping for reasons other than the timeout. If
482- // so, stop the timer.
483- closeTimer .Stop ()
484- defer gracefulCancel ()
485- timeout := l .clock .AfterFunc (5 * time .Second ,gracefulCancel )
486- defer timeout .Stop ()
487- logger .Info (ctx ,"logger closing" )
488-
489- if err := sl .Flush (gracefulCtx );err != nil {
490- // ctx err
491- logger .Warn (gracefulCtx ,"timeout reached while flushing" )
492- return
493- }
494-
495- if err := ls .WaitUntilEmpty (gracefulCtx );err != nil {
496- // ctx err
497- logger .Warn (gracefulCtx ,"timeout reached while waiting for log queue to empty" )
498- }
499-
500- _ = arpc .DRPCConn ().Close ()
501- client .SDK .HTTPClient .CloseIdleConnections ()
502- },
503- }
504- lifecycle .closeTimer = closeTimer
505- l .loggers [log .agentToken ]= lifecycle
506- lgr = lifecycle
541+ l .loggers [log .agentToken ]= lgr
507542}
508543
509544lgr .resetCloseTimer (l .loggerTTL )
510- _ = lgr .scriptLogger .Send (ctx ,queuedLogs ... )
545+ if len (queuedLogs )== 0 {
546+ return
547+ }
548+ if err := lgr .scriptLogger .Send (ctx ,queuedLogs ... );err != nil {
549+ l .scheduleRetry (ctx ,log .agentToken )
550+ return
551+ }
552+ l .clearRetryLocked (log .agentToken )
511553l .logCache .delete (log .agentToken )
512554}
513555
@@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) {
516558lgr ,ok := l .loggers [log .agentToken ]
517559if ok {
518560delete (l .loggers ,log .agentToken )
519-
520561}
562+ l .clearRetryLocked (log .agentToken )
563+ l .logCache .delete (log .agentToken )
521564l .mu .Unlock ()
522565
523566if ok {
@@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549592}
550593}
551594
595+ // retryState tracks exponential backoff for an agent token.
596+ type retryState struct {
597+ delay time.Duration
598+ timer * quartz.Timer
599+ retryCount int
600+ exhausted bool // prevent retry state recreation after max retries
601+ }
602+
603+ func (l * logQueuer )scheduleRetry (ctx context.Context ,token string ) {
604+ if l .retries == nil {
605+ l .retries = make (map [string ]* retryState )
606+ }
607+
608+ rs := l .retries [token ]
609+
610+ if rs != nil && rs .exhausted {
611+ return
612+ }
613+
614+ if rs == nil {
615+ rs = & retryState {delay :time .Second ,retryCount :0 ,exhausted :false }
616+ l .retries [token ]= rs
617+ }
618+
619+ rs .retryCount ++
620+
621+ // If we've reached the max retries, clear the retry state and delete the log cache.
622+ if rs .retryCount >= l .maxRetries {
623+ l .logger .Error (ctx ,"max retries exceeded" ,
624+ slog .F ("retryCount" ,rs .retryCount ),
625+ slog .F ("maxRetries" ,l .maxRetries ))
626+ rs .exhausted = true
627+ if rs .timer != nil {
628+ rs .timer .Stop ()
629+ rs .timer = nil
630+ }
631+ l .logCache .delete (token )
632+ return
633+ }
634+
635+ if rs .timer != nil {
636+ return
637+ }
638+
639+ l .logger .Info (ctx ,"scheduling retry" ,
640+ slog .F ("delay" ,rs .delay .String ()),
641+ slog .F ("retryCount" ,rs .retryCount ))
642+
643+ rs .timer = l .clock .AfterFunc (rs .delay ,func () {
644+ l .mu .Lock ()
645+ defer l .mu .Unlock ()
646+
647+ if cur := l .retries [token ];cur != nil && ! cur .exhausted {
648+ cur .timer = nil
649+ l .q <- agentLog {op :opLog ,agentToken :token }
650+ }
651+ })
652+
653+ rs .delay *= 2
654+ if rs .delay > 30 * time .Second {
655+ rs .delay = 30 * time .Second
656+ }
657+ }
658+
659+ // clearRetryLocked clears the retry state for the given token.
660+ // The caller must hold the mutex lock.
661+ func (l * logQueuer )clearRetryLocked (token string ) {
662+ if rs := l .retries [token ];rs != nil {
663+ if rs .timer != nil {
664+ rs .timer .Stop ()
665+ }
666+ delete (l .retries ,token )
667+ }
668+ }
669+
552670func newColor (value ... color.Attribute )* color.Color {
553671c := color .New (value ... )
554672c .EnableColor ()
@@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572690func (l * logCache )delete (token string ) {
573691delete (l .logs ,token )
574692}
693+
694+ func (l * logCache )get (token string ) []agentsdk.Log {
695+ logs ,ok := l .logs [token ]
696+ if ! ok {
697+ return nil
698+ }
699+ return logs
700+ }
701+
702+ func isAgentLogEmpty (log agentLog )bool {
703+ return log .resourceName == "" && log .log .Output == "" && log .log .CreatedAt .IsZero ()
704+ }