Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc99d83c

Browse files
committed
implement maxRetries for log processing
1 parentdc8bde6 commitc99d83c

File tree

5 files changed

+108
-5
lines changed

5 files changed

+108
-5
lines changed

‎helm/templates/service.yaml‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ spec:
120120
-name:SSL_CERT_DIR
121121
value:{{ .Values.image.sslCertDir }}
122122
{{- end }}
123+
{{- if .Values.maxRetries }}
124+
-name:CODER_MAX_RETRIES
125+
value:{{ .Values.maxRetries }}
126+
{{- end }}
123127
{{- with .Values.securityContext }}
124128
securityContext:
125129
{{- toYaml . | nindent 12 }}

‎helm/values.yaml‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ url: ""
55
# If unspecified or empty it will watch all namespaces.
66
namespaces:[]
77

8+
# maxRetries -- Maximum retry attempts for failed log sends (logs are discarded after this limit)
9+
maxRetries:10
10+
811
# volumes -- A list of extra volumes to add to the coder-logstream pod.
912
volumes:
1013
# emptyDir: {}

‎logger.go‎

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37+
maxRetriesint
3738

3839
// The following fields are optional!
3940
namespaces []string
@@ -52,6 +53,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5253
opts.clock=quartz.NewReal()
5354
}
5455

56+
ifopts.maxRetries==0 {
57+
opts.maxRetries=10
58+
}
59+
5560
logCh:=make(chanagentLog,512)
5661
ctx,cancelFunc:=context.WithCancel(ctx)
5762
reporter:=&podEventLogger{
@@ -75,6 +80,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
7580
logCache:logCache{
7681
logs:map[string][]agentsdk.Log{},
7782
},
83+
maxRetries:opts.maxRetries,
7884
},
7985
}
8086

@@ -408,7 +414,8 @@ type logQueuer struct {
408414
loggersmap[string]agentLoggerLifecycle
409415
logCachelogCache
410416

411-
retriesmap[string]*retryState
417+
retriesmap[string]*retryState
418+
maxRetriesint
412419
}
413420

414421
func (l*logQueuer)work(ctx context.Context) {
@@ -588,8 +595,9 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
588595

589596
// retryState tracks exponential backoff for an agent token.
590597
typeretryStatestruct {
591-
delay time.Duration
592-
timer*quartz.Timer
598+
delay time.Duration
599+
timer*quartz.Timer
600+
retryCountint
593601
}
594602

595603
func (l*logQueuer)scheduleRetry(ctx context.Context,tokenstring) {
@@ -603,6 +611,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
603611
l.retries[token]=rs
604612
}
605613

614+
rs.retryCount++
615+
616+
// If we've reached the max retries, clear the retry state and delete the log cache.
617+
ifrs.retryCount>=l.maxRetries {
618+
l.logger.Error(ctx,"max retries exceeded",
619+
slog.F("retryCount",rs.retryCount),
620+
slog.F("maxRetries",l.maxRetries))
621+
l.clearRetry(token)
622+
l.logCache.delete(token)
623+
return
624+
}
625+
606626
ifrs.timer!=nil {
607627
return
608628
}
@@ -613,7 +633,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
613633
rs.delay=30*time.Second
614634
}
615635

616-
l.logger.Info(ctx,"scheduling retry",slog.F("delay",rs.delay.String()))
636+
l.logger.Info(ctx,"scheduling retry",
637+
slog.F("delay",rs.delay.String()),
638+
slog.F("retryCount",rs.retryCount))
617639

618640
rs.timer=l.clock.AfterFunc(rs.delay,func() {
619641
l.mu.Lock()

‎logger_test.go‎

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,24 +594,30 @@ func Test_logQueuer(t *testing.T) {
594594
logCache:logCache{
595595
logs:map[string][]agentsdk.Log{},
596596
},
597+
maxRetries:10,
597598
}
598599

599600
ctx:=context.Background()
600601
token:="test-token"
601602

602603
// Set up a retry state with a large delay
603604
lq.retries=make(map[string]*retryState)
604-
lq.retries[token]=&retryState{delay:20*time.Second}
605+
lq.retries[token]=&retryState{
606+
delay:20*time.Second,
607+
retryCount:0,
608+
}
605609

606610
// Schedule a retry - should cap at 30 seconds
607611
lq.scheduleRetry(ctx,token)
608612

609613
rs:=lq.retries[token]
614+
require.NotNil(t,rs)
610615
require.Equal(t,30*time.Second,rs.delay)
611616

612617
// Schedule another retry - should stay at 30 seconds
613618
lq.scheduleRetry(ctx,token)
614619
rs=lq.retries[token]
620+
require.NotNil(t,rs)
615621
require.Equal(t,30*time.Second,rs.delay)
616622
})
617623

@@ -627,6 +633,7 @@ func Test_logQueuer(t *testing.T) {
627633
logCache:logCache{
628634
logs:map[string][]agentsdk.Log{},
629635
},
636+
maxRetries:2,
630637
}
631638

632639
ctx:=context.Background()
@@ -640,6 +647,64 @@ func Test_logQueuer(t *testing.T) {
640647
lq.clearRetry(token)
641648
require.Nil(t,lq.retries[token])
642649
})
650+
651+
t.Run("MaxRetries",func(t*testing.T) {
652+
t.Parallel()
653+
654+
// Create a failing API that will reject connections
655+
failingAPI:=newFailingAgentAPI(t)
656+
agentURL,err:=url.Parse(failingAPI.server.URL)
657+
require.NoError(t,err)
658+
clock:=quartz.NewMock(t)
659+
ttl:=time.Second
660+
661+
ch:=make(chanagentLog,10)
662+
logger:=slogtest.Make(t,&slogtest.Options{
663+
IgnoreErrors:true,
664+
})
665+
lq:=&logQueuer{
666+
logger:logger,
667+
clock:clock,
668+
q:ch,
669+
coderURL:agentURL,
670+
loggerTTL:ttl,
671+
loggers:map[string]agentLoggerLifecycle{},
672+
logCache:logCache{
673+
logs:map[string][]agentsdk.Log{},
674+
},
675+
maxRetries:2,
676+
}
677+
678+
ctx,cancel:=context.WithTimeout(context.Background(),5*time.Second)
679+
defercancel()
680+
golq.work(ctx)
681+
682+
token:="max-retry-token"
683+
ch<-agentLog{
684+
op:opLog,
685+
resourceName:"hello",
686+
agentToken:token,
687+
log: agentsdk.Log{
688+
CreatedAt:time.Now(),
689+
Output:"This is a log.",
690+
Level:codersdk.LogLevelInfo,
691+
},
692+
}
693+
694+
// Wait for retry state to be cleared after exceeding maxRetries
695+
require.Eventually(t,func()bool {
696+
lq.mu.Lock()
697+
deferlq.mu.Unlock()
698+
rs:=lq.retries[token]
699+
returnrs==nil
700+
},testutil.WaitShort,testutil.IntervalFast)
701+
702+
// Verify cache is also cleared
703+
lq.mu.Lock()
704+
cachedLogs:=lq.logCache.get(token)
705+
lq.mu.Unlock()
706+
require.Nil(t,cachedLogs)
707+
})
643708
}
644709

645710
funcTest_logCache(t*testing.T) {

‎main.go‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/url"
77
"os"
8+
"strconv"
89
"strings"
910

1011
"cdr.dev/slog"
@@ -30,6 +31,7 @@ func root() *cobra.Command {
3031
kubeConfigstring
3132
namespacesStrstring
3233
labelSelectorstring
34+
maxRetriesStrstring
3335
)
3436
cmd:=&cobra.Command{
3537
Use:"coder-logstream-kube",
@@ -72,13 +74,19 @@ func root() *cobra.Command {
7274
}
7375
}
7476

77+
maxRetries,err:=strconv.Atoi(maxRetriesStr)
78+
iferr!=nil {
79+
returnfmt.Errorf("parse max retries: %w",err)
80+
}
81+
7582
reporter,err:=newPodEventLogger(cmd.Context(),podEventLoggerOptions{
7683
coderURL:parsedURL,
7784
client:client,
7885
namespaces:namespaces,
7986
fieldSelector:fieldSelector,
8087
labelSelector:labelSelector,
8188
logger:slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug),
89+
maxRetries:maxRetries,
8290
})
8391
iferr!=nil {
8492
returnfmt.Errorf("create pod event reporter: %w",err)
@@ -97,6 +105,7 @@ func root() *cobra.Command {
97105
cmd.Flags().StringVarP(&namespacesStr,"namespaces","n",os.Getenv("CODER_NAMESPACES"),"List of namespaces to use when listing pods")
98106
cmd.Flags().StringVarP(&fieldSelector,"field-selector","f","","Field selector to use when listing pods")
99107
cmd.Flags().StringVarP(&labelSelector,"label-selector","l","","Label selector to use when listing pods")
108+
cmd.Flags().StringVarP(&maxRetriesStr,"max-retries","m",os.Getenv("CODER_MAX_RETRIES"),"Maximum retry attempts for failed log sends (logs are discarded after this limit)")
100109

101110
returncmd
102111
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp