Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab4cb66

Browse files
authored
feat: add WaitUntilEmpty to LogSender (#12159)
We'll need this to be able to tell when all outstanding logs have been sent, as part of graceful shutdown.
1 parent081e37d commitab4cb66

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

‎codersdk/agentsdk/logs.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
437437
l.exceededLogLimit=true
438438
// no point in keeping anything we have queued around, server will not accept them
439439
l.queues=make(map[uuid.UUID]*logQueue)
440+
l.Broadcast()// might unblock WaitUntilEmpty
440441
returnLogLimitExceededError
441442
}
442443

@@ -451,6 +452,7 @@ func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
451452
iflen(q.logs)==0 {
452453
// no empty queues
453454
delete(l.queues,src)
455+
l.Broadcast()// might unblock WaitUntilEmpty
454456
continue
455457
}
456458
q.lastFlush=time.Now()
@@ -487,6 +489,34 @@ func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
487489
returnScriptLogger{srcID:logSourceID,sender:l}
488490
}
489491

492+
// WaitUntilEmpty waits until the LogSender's queues are empty or the given context expires.
493+
func (l*LogSender)WaitUntilEmpty(ctx context.Context)error {
494+
ctxDone:=false
495+
nevermind:=make(chanstruct{})
496+
deferclose(nevermind)
497+
gofunc() {
498+
select {
499+
case<-ctx.Done():
500+
l.L.Lock()
501+
deferl.L.Unlock()
502+
ctxDone=true
503+
l.Broadcast()
504+
return
505+
case<-nevermind:
506+
return
507+
}
508+
}()
509+
l.L.Lock()
510+
deferl.L.Unlock()
511+
forlen(l.queues)!=0&&!ctxDone {
512+
l.Wait()
513+
}
514+
iflen(l.queues)==0 {
515+
returnnil
516+
}
517+
returnctx.Err()
518+
}
519+
490520
typeScriptLoggerstruct {
491521
sender*LogSender
492522
srcID uuid.UUID

‎codersdk/agentsdk/logs_internal_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func TestLogSender_Mainline(t *testing.T) {
5656
loopErr<-err
5757
}()
5858

59+
empty:=make(chanerror,1)
60+
gofunc() {
61+
err:=uut.WaitUntilEmpty(ctx)
62+
empty<-err
63+
}()
64+
5965
// since neither source has even been flushed, it should immediately Flush
6066
// both, although the order is not controlled
6167
varlogReqs []*proto.BatchCreateLogsRequest
@@ -104,8 +110,11 @@ func TestLogSender_Mainline(t *testing.T) {
104110
require.Equal(t,proto.Log_DEBUG,req.Logs[0].GetLevel())
105111
require.Equal(t,t1,req.Logs[0].GetCreatedAt().AsTime())
106112

113+
err:=testutil.RequireRecvCtx(ctx,t,empty)
114+
require.NoError(t,err)
115+
107116
cancel()
108-
err:=testutil.RequireRecvCtx(testCtx,t,loopErr)
117+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
109118
require.ErrorIs(t,err,context.Canceled)
110119

111120
// we can still enqueue more logs after SendLoop returns
@@ -132,6 +141,12 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
132141
Level:codersdk.LogLevelInfo,
133142
})
134143

144+
empty:=make(chanerror,1)
145+
gofunc() {
146+
err:=uut.WaitUntilEmpty(ctx)
147+
empty<-err
148+
}()
149+
135150
loopErr:=make(chanerror,1)
136151
gofunc() {
137152
err:=uut.SendLoop(ctx,fDest)
@@ -146,6 +161,10 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
146161
err:=testutil.RequireRecvCtx(ctx,t,loopErr)
147162
require.ErrorIs(t,err,LogLimitExceededError)
148163

164+
// Should also unblock WaitUntilEmpty
165+
err=testutil.RequireRecvCtx(ctx,t,empty)
166+
require.NoError(t,err)
167+
149168
// we can still enqueue more logs after SendLoop returns, but they don't
150169
// actually get enqueued
151170
uut.Enqueue(ls1,Log{
@@ -363,6 +382,33 @@ func TestLogSender_SendError(t *testing.T) {
363382
uut.L.Unlock()
364383
}
365384

385+
funcTestLogSender_WaitUntilEmpty_ContextExpired(t*testing.T) {
386+
t.Parallel()
387+
testCtx:=testutil.Context(t,testutil.WaitShort)
388+
ctx,cancel:=context.WithCancel(testCtx)
389+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
390+
uut:=NewLogSender(logger)
391+
392+
t0:=dbtime.Now()
393+
394+
ls1:= uuid.UUID{0x11}
395+
uut.Enqueue(ls1,Log{
396+
CreatedAt:t0,
397+
Output:"test log 0, src 1",
398+
Level:codersdk.LogLevelInfo,
399+
})
400+
401+
empty:=make(chanerror,1)
402+
gofunc() {
403+
err:=uut.WaitUntilEmpty(ctx)
404+
empty<-err
405+
}()
406+
407+
cancel()
408+
err:=testutil.RequireRecvCtx(testCtx,t,empty)
409+
require.ErrorIs(t,err,context.Canceled)
410+
}
411+
366412
typefakeLogDeststruct {
367413
reqschan*proto.BatchCreateLogsRequest
368414
respschan*proto.BatchCreateLogsResponse

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp