Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd71e92e

Browse files
committed
feat: add logSender for sending logs on agent v2 API
1 parentb850ab4 commitd71e92e

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed

‎agent/logs.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/exp/slices"
10+
"golang.org/x/xerrors"
11+
12+
"cdr.dev/slog"
13+
"github.com/coder/coder/v2/agent/proto"
14+
"github.com/coder/coder/v2/codersdk/agentsdk"
15+
)
16+
17+
constflushInterval=time.Second
18+
19+
typelogQueuestruct {
20+
logs []*proto.Log
21+
flushRequestedbool
22+
lastFlush time.Time
23+
}
24+
25+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
26+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
27+
// the agent calls sendLoop to send pending logs.
28+
typelogSenderstruct {
29+
*sync.Cond
30+
queuesmap[uuid.UUID]*logQueue
31+
logger slog.Logger
32+
}
33+
34+
typelogDestinterface {
35+
BatchCreateLogs(ctx context.Context,request*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error)
36+
}
37+
38+
funcnewLogSender(logger slog.Logger)*logSender {
39+
return&logSender{
40+
Cond:sync.NewCond(&sync.Mutex{}),
41+
logger:logger,
42+
queues:make(map[uuid.UUID]*logQueue),
43+
}
44+
}
45+
46+
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error {
47+
logger:=l.logger.With(slog.F("log_source_id",src))
48+
iflen(logs)==0 {
49+
logger.Debug(context.Background(),"enqueue called with no logs")
50+
returnnil
51+
}
52+
l.L.Lock()
53+
deferl.L.Unlock()
54+
deferl.Broadcast()
55+
q,ok:=l.queues[src]
56+
if!ok {
57+
q=&logQueue{}
58+
l.queues[src]=q
59+
}
60+
for_,log:=rangelogs {
61+
pl,err:=agentsdk.ProtoFromLog(log)
62+
iferr!=nil {
63+
returnxerrors.Errorf("failed to convert log: %w",err)
64+
}
65+
q.logs=append(q.logs,pl)
66+
}
67+
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
68+
returnnil
69+
}
70+
71+
func (l*logSender)flush(src uuid.UUID)error {
72+
l.L.Lock()
73+
deferl.L.Unlock()
74+
deferl.Broadcast()
75+
q,ok:=l.queues[src]
76+
ifok {
77+
q.flushRequested=true
78+
}
79+
// queue might not exist because it's already been flushed and removed from
80+
// the map.
81+
returnnil
82+
}
83+
84+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
85+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
86+
// calls sendLoop again.
87+
func (l*logSender)sendLoop(ctx context.Context,destlogDest)error {
88+
ctxDone:=false
89+
deferl.logger.Debug(ctx,"sendLoop exiting")
90+
91+
// wake 4 times per flush interval to check if anything needs to be flushed
92+
gofunc() {
93+
tkr:=time.NewTicker(flushInterval/4)
94+
defertkr.Stop()
95+
for {
96+
select {
97+
// also monitor the context here, so we notice immediately, rather
98+
// than waiting for the next tick or logs
99+
case<-ctx.Done():
100+
l.L.Lock()
101+
ctxDone=true
102+
l.L.Unlock()
103+
l.Broadcast()
104+
return
105+
case<-tkr.C:
106+
l.Broadcast()
107+
}
108+
}
109+
}()
110+
111+
l.L.Lock()
112+
deferl.L.Unlock()
113+
for {
114+
for!ctxDone&&!l.hasPendingWorkLocked() {
115+
l.Wait()
116+
}
117+
ifctxDone {
118+
returnnil
119+
}
120+
src,q:=l.getPendingWorkLocked()
121+
q.flushRequested=false// clear flag since we're now flushing
122+
req:=&proto.BatchCreateLogsRequest{
123+
LogSourceId:src[:],
124+
// when we release the lock, we don't want modifications to the slice to affect us
125+
Logs:slices.Clone(q.logs),
126+
}
127+
128+
l.L.Unlock()
129+
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
130+
_,err:=dest.BatchCreateLogs(ctx,req)
131+
l.L.Lock()
132+
iferr!=nil {
133+
returnxerrors.Errorf("failed to upload logs: %w",err)
134+
}
135+
136+
// since elsewhere we only append to the logs, here we can remove them
137+
// since we successfully sent them
138+
q.logs=q.logs[len(req.Logs):]
139+
iflen(q.logs)==0 {
140+
// no empty queues
141+
delete(l.queues,src)
142+
continue
143+
}
144+
q.lastFlush=time.Now()
145+
}
146+
}
147+
148+
func (l*logSender)hasPendingWorkLocked()bool {
149+
for_,q:=rangel.queues {
150+
ifq.flushRequested {
151+
returntrue
152+
}
153+
iftime.Since(q.lastFlush)>flushInterval {
154+
returntrue
155+
}
156+
}
157+
returnfalse
158+
}
159+
160+
func (l*logSender)getPendingWorkLocked() (src uuid.UUID,q*logQueue) {
161+
// take the one it's been the longest since we've flushed, so that we have some sense of
162+
// fairness across sources
163+
varearliestFlush time.Time
164+
foris,iq:=rangel.queues {
165+
ifq==nil||iq.lastFlush.Before(earliestFlush) {
166+
src=is
167+
q=iq
168+
earliestFlush=iq.lastFlush
169+
}
170+
}
171+
returnsrc,q
172+
}

‎agent/logs_internal_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/stretchr/testify/require"
10+
11+
"cdr.dev/slog"
12+
"cdr.dev/slog/sloggers/slogtest"
13+
"github.com/coder/coder/v2/agent/proto"
14+
"github.com/coder/coder/v2/coderd/database/dbtime"
15+
"github.com/coder/coder/v2/codersdk"
16+
"github.com/coder/coder/v2/codersdk/agentsdk"
17+
"github.com/coder/coder/v2/testutil"
18+
)
19+
20+
funcTestLogSender(t*testing.T) {
21+
t.Parallel()
22+
testCtx:=testutil.Context(t,testutil.WaitShort)
23+
ctx,cancel:=context.WithCancel(testCtx)
24+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
25+
fDest:=newFakeLogDest()
26+
uut:=newLogSender(logger)
27+
28+
t0:=dbtime.Now()
29+
30+
ls1:= uuid.UUID{0x11}
31+
err:=uut.enqueue(ls1, agentsdk.Log{
32+
CreatedAt:t0,
33+
Output:"test log 0, src 1",
34+
Level:codersdk.LogLevelInfo,
35+
})
36+
require.NoError(t,err)
37+
38+
ls2:= uuid.UUID{0x22}
39+
err=uut.enqueue(ls2,
40+
agentsdk.Log{
41+
CreatedAt:t0,
42+
Output:"test log 0, src 2",
43+
Level:codersdk.LogLevelError,
44+
},
45+
agentsdk.Log{
46+
CreatedAt:t0,
47+
Output:"test log 1, src 2",
48+
Level:codersdk.LogLevelWarn,
49+
},
50+
)
51+
require.NoError(t,err)
52+
53+
loopErr:=make(chanerror,1)
54+
gofunc() {
55+
err:=uut.sendLoop(ctx,fDest)
56+
loopErr<-err
57+
}()
58+
59+
// since neither source has even been flushed, it should immediately flush
60+
// both, although the order is not controlled
61+
varlogReqs []*proto.BatchCreateLogsRequest
62+
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
63+
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
64+
for_,req:=rangelogReqs {
65+
require.NotNil(t,req)
66+
srcID,err:=uuid.FromBytes(req.LogSourceId)
67+
require.NoError(t,err)
68+
switchsrcID {
69+
casels1:
70+
require.Len(t,req.Logs,1)
71+
require.Equal(t,"test log 0, src 1",req.Logs[0].GetOutput())
72+
require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel())
73+
require.Equal(t,t0,req.Logs[0].GetCreatedAt().AsTime())
74+
casels2:
75+
require.Len(t,req.Logs,2)
76+
require.Equal(t,"test log 0, src 2",req.Logs[0].GetOutput())
77+
require.Equal(t,proto.Log_ERROR,req.Logs[0].GetLevel())
78+
require.Equal(t,t0,req.Logs[0].GetCreatedAt().AsTime())
79+
require.Equal(t,"test log 1, src 2",req.Logs[1].GetOutput())
80+
require.Equal(t,proto.Log_WARN,req.Logs[1].GetLevel())
81+
require.Equal(t,t0,req.Logs[1].GetCreatedAt().AsTime())
82+
default:
83+
t.Fatal("unknown log source")
84+
}
85+
}
86+
87+
t1:=dbtime.Now()
88+
err=uut.enqueue(ls1, agentsdk.Log{
89+
CreatedAt:t1,
90+
Output:"test log 1, src 1",
91+
Level:codersdk.LogLevelDebug,
92+
})
93+
require.NoError(t,err)
94+
err=uut.flush(ls1)
95+
require.NoError(t,err)
96+
97+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
98+
// give ourselves a 25% buffer if we're right on the cusp of a tick
99+
require.LessOrEqual(t,time.Since(t1),flushInterval*5/4)
100+
require.NotNil(t,req)
101+
require.Len(t,req.Logs,1)
102+
require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput())
103+
require.Equal(t,proto.Log_DEBUG,req.Logs[0].GetLevel())
104+
require.Equal(t,t1,req.Logs[0].GetCreatedAt().AsTime())
105+
106+
cancel()
107+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
108+
require.NoError(t,err)
109+
110+
// we can still enqueue more logs after sendLoop returns
111+
err=uut.enqueue(ls1, agentsdk.Log{
112+
CreatedAt:t1,
113+
Output:"test log 2, src 1",
114+
Level:codersdk.LogLevelTrace,
115+
})
116+
require.NoError(t,err)
117+
}
118+
119+
typefakeLogDeststruct {
120+
reqschan*proto.BatchCreateLogsRequest
121+
}
122+
123+
func (ffakeLogDest)BatchCreateLogs(ctx context.Context,req*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error) {
124+
select {
125+
case<-ctx.Done():
126+
returnnil,ctx.Err()
127+
casef.reqs<-req:
128+
return&proto.BatchCreateLogsResponse{},nil
129+
}
130+
}
131+
132+
funcnewFakeLogDest()*fakeLogDest {
133+
return&fakeLogDest{
134+
reqs:make(chan*proto.BatchCreateLogsRequest),
135+
}
136+
}

‎codersdk/agentsdk/convert.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/uuid"
88
"golang.org/x/xerrors"
99
"google.golang.org/protobuf/types/known/durationpb"
10+
"google.golang.org/protobuf/types/known/timestamppb"
1011

1112
"github.com/coder/coder/v2/agent/proto"
1213
"github.com/coder/coder/v2/codersdk"
@@ -293,3 +294,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
293294
}
294295
returnpReq,nil
295296
}
297+
298+
funcProtoFromLog(logLog) (*proto.Log,error) {
299+
lvl,ok:=proto.Log_Level_value[strings.ToUpper(string(log.Level))]
300+
if!ok {
301+
returnnil,xerrors.Errorf("unknown log level: %s",log.Level)
302+
}
303+
return&proto.Log{
304+
CreatedAt:timestamppb.New(log.CreatedAt),
305+
Output:log.Output,
306+
Level:proto.Log_Level(lvl),
307+
},nil
308+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp