Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1e6de3

Browse files
committed
Apply review suggestions and fix maxRetries
1 parentc99d83c commitf1e6de3

File tree

3 files changed

+55
-32
lines changed

3 files changed

+55
-32
lines changed

‎helm/templates/service.yaml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ spec:
122122
{{- end }}
123123
{{- if .Values.maxRetries }}
124124
-name:CODER_MAX_RETRIES
125-
value:{{ .Values.maxRetries }}
125+
value:"{{ .Values.maxRetries }}"
126126
{{- end }}
127127
{{- with .Values.securityContext }}
128128
securityContext:

‎logger.go‎

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37-
maxRetriesint
37+
// maxRetries is the maximum number of retries for a log send failure.
38+
maxRetriesint
3839

3940
// The following fields are optional!
4041
namespaces []string
@@ -414,7 +415,9 @@ type logQueuer struct {
414415
loggersmap[string]agentLoggerLifecycle
415416
logCachelogCache
416417

417-
retriesmap[string]*retryState
418+
// retries maps agent tokens to their retry state for exponential backoff
419+
retriesmap[string]*retryState
420+
// maxRetries is the maximum number of retries for a log send failure.
418421
maxRetriesint
419422
}
420423

@@ -436,7 +439,7 @@ func (l *logQueuer) work(ctx context.Context) {
436439
}
437440
}
438441

439-
func (l*logQueuer)newLogger(ctx context.Context,logagentLog,queuedLogs []agentsdk.Log) (agentLoggerLifecycle,error) {
442+
func (l*logQueuer)newLogger(ctx context.Context,logagentLog) (agentLoggerLifecycle,error) {
440443
client:=agentsdk.New(l.coderURL)
441444
client.SetSessionToken(log.agentToken)
442445
logger:=l.logger.With(slog.F("resource_name",log.resourceName))
@@ -448,10 +451,9 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
448451
DisplayName:"Kubernetes",
449452
})
450453
iferr!=nil {
451-
//This shouldn't fail sendingthe log, as it only affects howthey
452-
//appear.
454+
//Postingthe log source failed, which affects howlogs appear.
455+
//We'll retry to ensure the log source is properly registered.
453456
logger.Error(ctx,"post log source",slog.Error(err))
454-
l.scheduleRetry(ctx,log.agentToken)
455457
returnagentLoggerLifecycle{},err
456458
}
457459

@@ -466,7 +468,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
466468
iferr!=nil {
467469
logger.Error(ctx,"drpc connect",slog.Error(err))
468470
gracefulCancel()
469-
l.scheduleRetry(ctx,log.agentToken)
470471
returnagentLoggerLifecycle{},err
471472
}
472473
gofunc() {
@@ -485,6 +486,8 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
485486
lifecycle:=agentLoggerLifecycle{
486487
scriptLogger:sl,
487488
close:func() {
489+
deferarpc.DRPCConn().Close()
490+
deferclient.SDK.HTTPClient.CloseIdleConnections()
488491
// We could be stopping for reasons other than the timeout. If
489492
// so, stop the timer.
490493
closeTimer.Stop()
@@ -503,9 +506,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
503506
// ctx err
504507
logger.Warn(gracefulCtx,"timeout reached while waiting for log queue to empty")
505508
}
506-
507-
_=arpc.DRPCConn().Close()
508-
client.SDK.HTTPClient.CloseIdleConnections()
509509
},
510510
}
511511
lifecycle.closeTimer=closeTimer
@@ -533,7 +533,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
533533
}
534534

535535
varerrerror
536-
lgr,err=l.newLogger(ctx,log,queuedLogs)
536+
lgr,err=l.newLogger(ctx,log)
537537
iferr!=nil {
538538
l.scheduleRetry(ctx,log.agentToken)
539539
return
@@ -549,7 +549,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
549549
l.scheduleRetry(ctx,log.agentToken)
550550
return
551551
}
552-
l.clearRetry(log.agentToken)
552+
l.clearRetryLocked(log.agentToken)
553553
l.logCache.delete(log.agentToken)
554554
}
555555

@@ -558,9 +558,8 @@ func (l *logQueuer) processDelete(log agentLog) {
558558
lgr,ok:=l.loggers[log.agentToken]
559559
ifok {
560560
delete(l.loggers,log.agentToken)
561-
562561
}
563-
l.clearRetry(log.agentToken)
562+
l.clearRetryLocked(log.agentToken)
564563
l.logCache.delete(log.agentToken)
565564
l.mu.Unlock()
566565

@@ -598,6 +597,7 @@ type retryState struct {
598597
delay time.Duration
599598
timer*quartz.Timer
600599
retryCountint
600+
exhaustedbool// prevent retry state recreation after max retries
601601
}
602602

603603
func (l*logQueuer)scheduleRetry(ctx context.Context,tokenstring) {
@@ -606,8 +606,13 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
606606
}
607607

608608
rs:=l.retries[token]
609+
610+
ifrs!=nil&&rs.exhausted {
611+
return
612+
}
613+
609614
ifrs==nil {
610-
rs=&retryState{delay:time.Second}
615+
rs=&retryState{delay:time.Second,retryCount:0,exhausted:false}
611616
l.retries[token]=rs
612617
}
613618

@@ -618,7 +623,11 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
618623
l.logger.Error(ctx,"max retries exceeded",
619624
slog.F("retryCount",rs.retryCount),
620625
slog.F("maxRetries",l.maxRetries))
621-
l.clearRetry(token)
626+
rs.exhausted=true
627+
ifrs.timer!=nil {
628+
rs.timer.Stop()
629+
rs.timer=nil
630+
}
622631
l.logCache.delete(token)
623632
return
624633
}
@@ -627,24 +636,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
627636
return
628637
}
629638

630-
ifrs.delay<time.Second {
631-
rs.delay=time.Second
632-
}elseifrs.delay>30*time.Second {
633-
rs.delay=30*time.Second
634-
}
635-
636639
l.logger.Info(ctx,"scheduling retry",
637640
slog.F("delay",rs.delay.String()),
638641
slog.F("retryCount",rs.retryCount))
639642

640643
rs.timer=l.clock.AfterFunc(rs.delay,func() {
641644
l.mu.Lock()
642-
ifcur:=l.retries[token];cur!=nil {
645+
deferl.mu.Unlock()
646+
647+
ifcur:=l.retries[token];cur!=nil&&!cur.exhausted {
643648
cur.timer=nil
649+
l.q<-agentLog{op:opLog,agentToken:token}
644650
}
645-
l.mu.Unlock()
646-
647-
l.q<-agentLog{op:opLog,agentToken:token}
648651
})
649652

650653
rs.delay*=2
@@ -653,7 +656,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
653656
}
654657
}
655658

656-
func (l*logQueuer)clearRetry(tokenstring) {
659+
// clearRetryLocked clears the retry state for the given token.
660+
// The caller must hold the mutex lock.
661+
func (l*logQueuer)clearRetryLocked(tokenstring) {
657662
ifrs:=l.retries[token];rs!=nil {
658663
ifrs.timer!=nil {
659664
rs.timer.Stop()

‎logger_test.go‎

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ func Test_logQueuer(t *testing.T) {
511511
logCache:logCache{
512512
logs:map[string][]agentsdk.Log{},
513513
},
514+
maxRetries:10,
514515
}
515516

516517
ctx,cancel:=context.WithTimeout(context.Background(),10*time.Second)
@@ -644,7 +645,7 @@ func Test_logQueuer(t *testing.T) {
644645
require.NotNil(t,lq.retries[token])
645646

646647
// Clear the retry
647-
lq.clearRetry(token)
648+
lq.clearRetryLocked(token)
648649
require.Nil(t,lq.retries[token])
649650
})
650651

@@ -672,6 +673,7 @@ func Test_logQueuer(t *testing.T) {
672673
logCache:logCache{
673674
logs:map[string][]agentsdk.Log{},
674675
},
676+
retries:make(map[string]*retryState),
675677
maxRetries:2,
676678
}
677679

@@ -691,15 +693,31 @@ func Test_logQueuer(t *testing.T) {
691693
},
692694
}
693695

694-
// Wait for retry state to be cleared after exceeding maxRetries
695696
require.Eventually(t,func()bool {
696697
lq.mu.Lock()
697698
deferlq.mu.Unlock()
698699
rs:=lq.retries[token]
699-
returnrs==nil
700+
returnrs!=nil&&rs.retryCount==1
701+
},testutil.WaitShort,testutil.IntervalFast)
702+
703+
clock.Advance(time.Second)
704+
705+
require.Eventually(t,func()bool {
706+
lq.mu.Lock()
707+
deferlq.mu.Unlock()
708+
rs:=lq.retries[token]
709+
returnrs!=nil&&rs.retryCount==2
710+
},testutil.WaitShort,testutil.IntervalFast)
711+
712+
clock.Advance(2*time.Second)
713+
714+
require.Eventually(t,func()bool {
715+
lq.mu.Lock()
716+
deferlq.mu.Unlock()
717+
rs:=lq.retries[token]
718+
returnrs==nil||rs.exhausted
700719
},testutil.WaitShort,testutil.IntervalFast)
701720

702-
// Verify cache is also cleared
703721
lq.mu.Lock()
704722
cachedLogs:=lq.logCache.get(token)
705723
lq.mu.Unlock()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp