@@ -407,6 +407,8 @@ type logQueuer struct {
407407loggerTTL time.Duration
408408loggers map [string ]agentLoggerLifecycle
409409logCache logCache
410+
411+ retries map [string ]* retryState
410412}
411413
412414func (l * logQueuer )work (ctx context.Context ) {
@@ -427,87 +429,120 @@ func (l *logQueuer) work(ctx context.Context) {
427429}
428430}
429431
432+ func (l * logQueuer )newLogger (ctx context.Context ,log agentLog ,queuedLogs []agentsdk.Log ) (agentLoggerLifecycle ,error ) {
433+ client := agentsdk .New (l .coderURL )
434+ client .SetSessionToken (log .agentToken )
435+ logger := l .logger .With (slog .F ("resource_name" ,log .resourceName ))
436+ client .SDK .SetLogger (logger )
437+
438+ _ ,err := client .PostLogSource (ctx , agentsdk.PostLogSourceRequest {
439+ ID :sourceUUID ,
440+ Icon :"/icon/k8s.png" ,
441+ DisplayName :"Kubernetes" ,
442+ })
443+ if err != nil {
444+ // This shouldn't fail sending the log, as it only affects how they
445+ // appear.
446+ logger .Error (ctx ,"post log source" ,slog .Error (err ))
447+ l .scheduleRetry (ctx ,log .agentToken )
448+ return agentLoggerLifecycle {},err
449+ }
450+
451+ ls := agentsdk .NewLogSender (logger )
452+ sl := ls .GetScriptLogger (sourceUUID )
453+
454+ gracefulCtx ,gracefulCancel := context .WithCancel (context .Background ())
455+
456+ // connect to Agent v2.0 API, since we don't need features added later.
457+ // This maximizes compatibility.
458+ arpc ,err := client .ConnectRPC20 (gracefulCtx )
459+ if err != nil {
460+ logger .Error (ctx ,"drpc connect" ,slog .Error (err ))
461+ gracefulCancel ()
462+ l .scheduleRetry (ctx ,log .agentToken )
463+ return agentLoggerLifecycle {},err
464+ }
465+ go func () {
466+ err := ls .SendLoop (gracefulCtx ,arpc )
467+ // if the send loop exits on its own without the context
468+ // canceling, timeout the logger and force it to recreate.
469+ if err != nil && ctx .Err ()== nil {
470+ l .loggerTimeout (log .agentToken )
471+ }
472+ }()
473+
474+ closeTimer := l .clock .AfterFunc (l .loggerTTL ,func () {
475+ logger .Info (ctx ,"logger timeout firing" )
476+ l .loggerTimeout (log .agentToken )
477+ })
478+ lifecycle := agentLoggerLifecycle {
479+ scriptLogger :sl ,
480+ close :func () {
481+ // We could be stopping for reasons other than the timeout. If
482+ // so, stop the timer.
483+ closeTimer .Stop ()
484+ defer gracefulCancel ()
485+ timeout := l .clock .AfterFunc (5 * time .Second ,gracefulCancel )
486+ defer timeout .Stop ()
487+ logger .Info (ctx ,"logger closing" )
488+
489+ if err := sl .Flush (gracefulCtx );err != nil {
490+ // ctx err
491+ logger .Warn (gracefulCtx ,"timeout reached while flushing" )
492+ return
493+ }
494+
495+ if err := ls .WaitUntilEmpty (gracefulCtx );err != nil {
496+ // ctx err
497+ logger .Warn (gracefulCtx ,"timeout reached while waiting for log queue to empty" )
498+ }
499+
500+ _ = arpc .DRPCConn ().Close ()
501+ client .SDK .HTTPClient .CloseIdleConnections ()
502+ },
503+ }
504+ lifecycle .closeTimer = closeTimer
505+ return lifecycle ,nil
506+ }
507+
430508func (l * logQueuer )processLog (ctx context.Context ,log agentLog ) {
431509l .mu .Lock ()
432510defer l .mu .Unlock ()
433- queuedLogs := l .logCache .push (log )
511+
512+ queuedLogs := l .logCache .get (log .agentToken )
513+ if isAgentLogEmpty (log ) {
514+ if queuedLogs == nil {
515+ return
516+ }
517+ }else {
518+ queuedLogs = l .logCache .push (log )
519+ }
520+
434521lgr ,ok := l .loggers [log .agentToken ]
435522if ! ok {
436- client := agentsdk .New (l .coderURL )
437- client .SetSessionToken (log .agentToken )
438- logger := l .logger .With (slog .F ("resource_name" ,log .resourceName ))
439- client .SDK .SetLogger (logger )
440-
441- _ ,err := client .PostLogSource (ctx , agentsdk.PostLogSourceRequest {
442- ID :sourceUUID ,
443- Icon :"/icon/k8s.png" ,
444- DisplayName :"Kubernetes" ,
445- })
446- if err != nil {
447- // This shouldn't fail sending the log, as it only affects how they
448- // appear.
449- logger .Error (ctx ,"post log source" ,slog .Error (err ))
523+ // skip if we're in a retry cooldown window
524+ if rs := l .retries [log .agentToken ];rs != nil && rs .timer != nil {
525+ return
450526}
451527
452- ls := agentsdk .NewLogSender (logger )
453- sl := ls .GetScriptLogger (sourceUUID )
454-
455- gracefulCtx ,gracefulCancel := context .WithCancel (context .Background ())
456-
457- // connect to Agent v2.0 API, since we don't need features added later.
458- // This maximizes compatibility.
459- arpc ,err := client .ConnectRPC20 (gracefulCtx )
528+ var err error
529+ lgr ,err = l .newLogger (ctx ,log ,queuedLogs )
460530if err != nil {
461- logger .Error (ctx ,"drpc connect" ,slog .Error (err ))
462- gracefulCancel ()
531+ l .scheduleRetry (ctx ,log .agentToken )
463532return
464533}
465- go func () {
466- err := ls .SendLoop (gracefulCtx ,arpc )
467- // if the send loop exits on its own without the context
468- // canceling, timeout the logger and force it to recreate.
469- if err != nil && ctx .Err ()== nil {
470- l .loggerTimeout (log .agentToken )
471- }
472- }()
473-
474- closeTimer := l .clock .AfterFunc (l .loggerTTL ,func () {
475- logger .Info (ctx ,"logger timeout firing" )
476- l .loggerTimeout (log .agentToken )
477- })
478- lifecycle := agentLoggerLifecycle {
479- scriptLogger :sl ,
480- close :func () {
481- // We could be stopping for reasons other than the timeout. If
482- // so, stop the timer.
483- closeTimer .Stop ()
484- defer gracefulCancel ()
485- timeout := l .clock .AfterFunc (5 * time .Second ,gracefulCancel )
486- defer timeout .Stop ()
487- logger .Info (ctx ,"logger closing" )
488-
489- if err := sl .Flush (gracefulCtx );err != nil {
490- // ctx err
491- logger .Warn (gracefulCtx ,"timeout reached while flushing" )
492- return
493- }
494-
495- if err := ls .WaitUntilEmpty (gracefulCtx );err != nil {
496- // ctx err
497- logger .Warn (gracefulCtx ,"timeout reached while waiting for log queue to empty" )
498- }
499-
500- _ = arpc .DRPCConn ().Close ()
501- client .SDK .HTTPClient .CloseIdleConnections ()
502- },
503- }
504- lifecycle .closeTimer = closeTimer
505- l .loggers [log .agentToken ]= lifecycle
506- lgr = lifecycle
534+ l .loggers [log .agentToken ]= lgr
507535}
508536
509537lgr .resetCloseTimer (l .loggerTTL )
510- _ = lgr .scriptLogger .Send (ctx ,queuedLogs ... )
538+ if len (queuedLogs )== 0 {
539+ return
540+ }
541+ if err := lgr .scriptLogger .Send (ctx ,queuedLogs ... );err != nil {
542+ l .scheduleRetry (ctx ,log .agentToken )
543+ return
544+ }
545+ l .clearRetry (log .agentToken )
511546l .logCache .delete (log .agentToken )
512547}
513548
@@ -518,6 +553,8 @@ func (l *logQueuer) processDelete(log agentLog) {
518553delete (l .loggers ,log .agentToken )
519554
520555}
556+ l .clearRetry (log .agentToken )
557+ l .logCache .delete (log .agentToken )
521558l .mu .Unlock ()
522559
523560if ok {
@@ -549,6 +586,64 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549586}
550587}
551588
589+ // retryState tracks exponential backoff for an agent token.
590+ type retryState struct {
591+ delay time.Duration
592+ timer * quartz.Timer
593+ }
594+
595+ func (l * logQueuer )ensureRetryMap () {
596+ if l .retries == nil {
597+ l .retries = make (map [string ]* retryState )
598+ }
599+ }
600+
601+ func (l * logQueuer )scheduleRetry (ctx context.Context ,token string ) {
602+ l .ensureRetryMap ()
603+
604+ rs := l .retries [token ]
605+ if rs == nil {
606+ rs = & retryState {delay :time .Second }
607+ l .retries [token ]= rs
608+ }
609+
610+ if rs .timer != nil {
611+ return
612+ }
613+
614+ if rs .delay < time .Second {
615+ rs .delay = time .Second
616+ }else if rs .delay > 30 * time .Second {
617+ rs .delay = 30 * time .Second
618+ }
619+
620+ l .logger .Info (ctx ,"scheduling retry" ,slog .F ("delay" ,rs .delay .String ()))
621+
622+ rs .timer = l .clock .AfterFunc (rs .delay ,func () {
623+ l .mu .Lock ()
624+ if cur := l .retries [token ];cur != nil {
625+ cur .timer = nil
626+ }
627+ l .mu .Unlock ()
628+
629+ l .q <- agentLog {op :opLog ,agentToken :token }
630+ })
631+
632+ rs .delay *= 2
633+ if rs .delay > 30 * time .Second {
634+ rs .delay = 30 * time .Second
635+ }
636+ }
637+
638+ func (l * logQueuer )clearRetry (token string ) {
639+ if rs := l .retries [token ];rs != nil {
640+ if rs .timer != nil {
641+ rs .timer .Stop ()
642+ }
643+ delete (l .retries ,token )
644+ }
645+ }
646+
552647func newColor (value ... color.Attribute )* color.Color {
553648c := color .New (value ... )
554649c .EnableColor ()
@@ -572,3 +667,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572667func (l * logCache )delete (token string ) {
573668delete (l .logs ,token )
574669}
670+
671+ func (l * logCache )get (token string ) []agentsdk.Log {
672+ logs ,ok := l .logs [token ]
673+ if ! ok {
674+ return nil
675+ }
676+ return logs
677+ }
678+
679+ func isAgentLogEmpty (log agentLog )bool {
680+ return log .resourceName == "" && log .log .Output == "" && log .log .CreatedAt .IsZero ()
681+ }