Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit834bc2b

Browse files
committed
feat: implement retry mechanism for log processing
1 parent13d885f commit834bc2b

File tree

1 file changed

+175
-68
lines changed

1 file changed

+175
-68
lines changed

‎logger.go‎

Lines changed: 175 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ type logQueuer struct {
407407
loggerTTL time.Duration
408408
loggersmap[string]agentLoggerLifecycle
409409
logCachelogCache
410+
411+
retriesmap[string]*retryState
410412
}
411413

412414
func (l*logQueuer)work(ctx context.Context) {
@@ -427,87 +429,120 @@ func (l *logQueuer) work(ctx context.Context) {
427429
}
428430
}
429431

432+
func (l*logQueuer)newLogger(ctx context.Context,logagentLog,queuedLogs []agentsdk.Log) (agentLoggerLifecycle,error) {
433+
client:=agentsdk.New(l.coderURL)
434+
client.SetSessionToken(log.agentToken)
435+
logger:=l.logger.With(slog.F("resource_name",log.resourceName))
436+
client.SDK.SetLogger(logger)
437+
438+
_,err:=client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
439+
ID:sourceUUID,
440+
Icon:"/icon/k8s.png",
441+
DisplayName:"Kubernetes",
442+
})
443+
iferr!=nil {
444+
// This shouldn't fail sending the log, as it only affects how they
445+
// appear.
446+
logger.Error(ctx,"post log source",slog.Error(err))
447+
l.scheduleRetry(ctx,log.agentToken)
448+
returnagentLoggerLifecycle{},err
449+
}
450+
451+
ls:=agentsdk.NewLogSender(logger)
452+
sl:=ls.GetScriptLogger(sourceUUID)
453+
454+
gracefulCtx,gracefulCancel:=context.WithCancel(context.Background())
455+
456+
// connect to Agent v2.0 API, since we don't need features added later.
457+
// This maximizes compatibility.
458+
arpc,err:=client.ConnectRPC20(gracefulCtx)
459+
iferr!=nil {
460+
logger.Error(ctx,"drpc connect",slog.Error(err))
461+
gracefulCancel()
462+
l.scheduleRetry(ctx,log.agentToken)
463+
returnagentLoggerLifecycle{},err
464+
}
465+
gofunc() {
466+
err:=ls.SendLoop(gracefulCtx,arpc)
467+
// if the send loop exits on its own without the context
468+
// canceling, timeout the logger and force it to recreate.
469+
iferr!=nil&&ctx.Err()==nil {
470+
l.loggerTimeout(log.agentToken)
471+
}
472+
}()
473+
474+
closeTimer:=l.clock.AfterFunc(l.loggerTTL,func() {
475+
logger.Info(ctx,"logger timeout firing")
476+
l.loggerTimeout(log.agentToken)
477+
})
478+
lifecycle:=agentLoggerLifecycle{
479+
scriptLogger:sl,
480+
close:func() {
481+
// We could be stopping for reasons other than the timeout. If
482+
// so, stop the timer.
483+
closeTimer.Stop()
484+
defergracefulCancel()
485+
timeout:=l.clock.AfterFunc(5*time.Second,gracefulCancel)
486+
defertimeout.Stop()
487+
logger.Info(ctx,"logger closing")
488+
489+
iferr:=sl.Flush(gracefulCtx);err!=nil {
490+
// ctx err
491+
logger.Warn(gracefulCtx,"timeout reached while flushing")
492+
return
493+
}
494+
495+
iferr:=ls.WaitUntilEmpty(gracefulCtx);err!=nil {
496+
// ctx err
497+
logger.Warn(gracefulCtx,"timeout reached while waiting for log queue to empty")
498+
}
499+
500+
_=arpc.DRPCConn().Close()
501+
client.SDK.HTTPClient.CloseIdleConnections()
502+
},
503+
}
504+
lifecycle.closeTimer=closeTimer
505+
returnlifecycle,nil
506+
}
507+
430508
func (l*logQueuer)processLog(ctx context.Context,logagentLog) {
431509
l.mu.Lock()
432510
deferl.mu.Unlock()
433-
queuedLogs:=l.logCache.push(log)
511+
512+
queuedLogs:=l.logCache.get(log.agentToken)
513+
ifisAgentLogEmpty(log) {
514+
ifqueuedLogs==nil {
515+
return
516+
}
517+
}else {
518+
queuedLogs=l.logCache.push(log)
519+
}
520+
434521
lgr,ok:=l.loggers[log.agentToken]
435522
if!ok {
436-
client:=agentsdk.New(l.coderURL)
437-
client.SetSessionToken(log.agentToken)
438-
logger:=l.logger.With(slog.F("resource_name",log.resourceName))
439-
client.SDK.SetLogger(logger)
440-
441-
_,err:=client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
442-
ID:sourceUUID,
443-
Icon:"/icon/k8s.png",
444-
DisplayName:"Kubernetes",
445-
})
446-
iferr!=nil {
447-
// This shouldn't fail sending the log, as it only affects how they
448-
// appear.
449-
logger.Error(ctx,"post log source",slog.Error(err))
523+
// skip if we're in a retry cooldown window
524+
ifrs:=l.retries[log.agentToken];rs!=nil&&rs.timer!=nil {
525+
return
450526
}
451527

452-
ls:=agentsdk.NewLogSender(logger)
453-
sl:=ls.GetScriptLogger(sourceUUID)
454-
455-
gracefulCtx,gracefulCancel:=context.WithCancel(context.Background())
456-
457-
// connect to Agent v2.0 API, since we don't need features added later.
458-
// This maximizes compatibility.
459-
arpc,err:=client.ConnectRPC20(gracefulCtx)
528+
varerrerror
529+
lgr,err=l.newLogger(ctx,log,queuedLogs)
460530
iferr!=nil {
461-
logger.Error(ctx,"drpc connect",slog.Error(err))
462-
gracefulCancel()
531+
l.scheduleRetry(ctx,log.agentToken)
463532
return
464533
}
465-
gofunc() {
466-
err:=ls.SendLoop(gracefulCtx,arpc)
467-
// if the send loop exits on its own without the context
468-
// canceling, timeout the logger and force it to recreate.
469-
iferr!=nil&&ctx.Err()==nil {
470-
l.loggerTimeout(log.agentToken)
471-
}
472-
}()
473-
474-
closeTimer:=l.clock.AfterFunc(l.loggerTTL,func() {
475-
logger.Info(ctx,"logger timeout firing")
476-
l.loggerTimeout(log.agentToken)
477-
})
478-
lifecycle:=agentLoggerLifecycle{
479-
scriptLogger:sl,
480-
close:func() {
481-
// We could be stopping for reasons other than the timeout. If
482-
// so, stop the timer.
483-
closeTimer.Stop()
484-
defergracefulCancel()
485-
timeout:=l.clock.AfterFunc(5*time.Second,gracefulCancel)
486-
defertimeout.Stop()
487-
logger.Info(ctx,"logger closing")
488-
489-
iferr:=sl.Flush(gracefulCtx);err!=nil {
490-
// ctx err
491-
logger.Warn(gracefulCtx,"timeout reached while flushing")
492-
return
493-
}
494-
495-
iferr:=ls.WaitUntilEmpty(gracefulCtx);err!=nil {
496-
// ctx err
497-
logger.Warn(gracefulCtx,"timeout reached while waiting for log queue to empty")
498-
}
499-
500-
_=arpc.DRPCConn().Close()
501-
client.SDK.HTTPClient.CloseIdleConnections()
502-
},
503-
}
504-
lifecycle.closeTimer=closeTimer
505-
l.loggers[log.agentToken]=lifecycle
506-
lgr=lifecycle
534+
l.loggers[log.agentToken]=lgr
507535
}
508536

509537
lgr.resetCloseTimer(l.loggerTTL)
510-
_=lgr.scriptLogger.Send(ctx,queuedLogs...)
538+
iflen(queuedLogs)==0 {
539+
return
540+
}
541+
iferr:=lgr.scriptLogger.Send(ctx,queuedLogs...);err!=nil {
542+
l.scheduleRetry(ctx,log.agentToken)
543+
return
544+
}
545+
l.clearRetry(log.agentToken)
511546
l.logCache.delete(log.agentToken)
512547
}
513548

@@ -518,6 +553,8 @@ func (l *logQueuer) processDelete(log agentLog) {
518553
delete(l.loggers,log.agentToken)
519554

520555
}
556+
l.clearRetry(log.agentToken)
557+
l.logCache.delete(log.agentToken)
521558
l.mu.Unlock()
522559

523560
ifok {
@@ -549,6 +586,64 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549586
}
550587
}
551588

589+
// retryState tracks exponential backoff for an agent token.
590+
typeretryStatestruct {
591+
delay time.Duration
592+
timer*quartz.Timer
593+
}
594+
595+
func (l*logQueuer)ensureRetryMap() {
596+
ifl.retries==nil {
597+
l.retries=make(map[string]*retryState)
598+
}
599+
}
600+
601+
func (l*logQueuer)scheduleRetry(ctx context.Context,tokenstring) {
602+
l.ensureRetryMap()
603+
604+
rs:=l.retries[token]
605+
ifrs==nil {
606+
rs=&retryState{delay:time.Second}
607+
l.retries[token]=rs
608+
}
609+
610+
ifrs.timer!=nil {
611+
return
612+
}
613+
614+
ifrs.delay<time.Second {
615+
rs.delay=time.Second
616+
}elseifrs.delay>30*time.Second {
617+
rs.delay=30*time.Second
618+
}
619+
620+
l.logger.Info(ctx,"scheduling retry",slog.F("delay",rs.delay.String()))
621+
622+
rs.timer=l.clock.AfterFunc(rs.delay,func() {
623+
l.mu.Lock()
624+
ifcur:=l.retries[token];cur!=nil {
625+
cur.timer=nil
626+
}
627+
l.mu.Unlock()
628+
629+
l.q<-agentLog{op:opLog,agentToken:token}
630+
})
631+
632+
rs.delay*=2
633+
ifrs.delay>30*time.Second {
634+
rs.delay=30*time.Second
635+
}
636+
}
637+
638+
func (l*logQueuer)clearRetry(tokenstring) {
639+
ifrs:=l.retries[token];rs!=nil {
640+
ifrs.timer!=nil {
641+
rs.timer.Stop()
642+
}
643+
delete(l.retries,token)
644+
}
645+
}
646+
552647
funcnewColor(value...color.Attribute)*color.Color {
553648
c:=color.New(value...)
554649
c.EnableColor()
@@ -572,3 +667,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572667
func (l*logCache)delete(tokenstring) {
573668
delete(l.logs,token)
574669
}
670+
671+
func (l*logCache)get(tokenstring) []agentsdk.Log {
672+
logs,ok:=l.logs[token]
673+
if!ok {
674+
returnnil
675+
}
676+
returnlogs
677+
}
678+
679+
funcisAgentLogEmpty(logagentLog)bool {
680+
returnlog.resourceName==""&&log.log.Output==""&&log.log.CreatedAt.IsZero()
681+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp