Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit15615d1

Browse files
committed
feat: add logSender for sending logs on agent v2 API
1 parent1bb4aec commit15615d1

File tree

3 files changed

+328
-0
lines changed

3 files changed

+328
-0
lines changed

‎agent/logs.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"golang.org/x/xerrors"
10+
11+
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/proto"
13+
"github.com/coder/coder/v2/codersdk/agentsdk"
14+
)
15+
16+
constflushInterval=time.Second
17+
18+
typelogQueuestruct {
19+
logs []*proto.Log
20+
flushRequestedbool
21+
lastFlush time.Time
22+
}
23+
24+
// logSender is a subcomponent of agent that handles enqueuing logs and then sending them over the
25+
// agent API. Things that need to log call enqueue and flush. When the agent API becomes available,
26+
// the agent calls sendLoop to send pending logs.
27+
typelogSenderstruct {
28+
*sync.Cond
29+
queuesmap[uuid.UUID]*logQueue
30+
logger slog.Logger
31+
}
32+
33+
typelogDestinterface {
34+
BatchCreateLogs(ctx context.Context,request*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error)
35+
}
36+
37+
funcnewLogSender(logger slog.Logger)*logSender {
38+
return&logSender{
39+
Cond:sync.NewCond(&sync.Mutex{}),
40+
logger:logger,
41+
queues:make(map[uuid.UUID]*logQueue),
42+
}
43+
}
44+
45+
func (l*logSender)enqueue(src uuid.UUID,logs...agentsdk.Log)error {
46+
logger:=l.logger.With(slog.F("log_source_id",src))
47+
iflen(logs)==0 {
48+
logger.Debug(context.Background(),"enqueue called with no logs")
49+
returnnil
50+
}
51+
l.L.Lock()
52+
deferl.L.Unlock()
53+
deferl.Broadcast()
54+
q,ok:=l.queues[src]
55+
if!ok {
56+
q=&logQueue{}
57+
l.queues[src]=q
58+
}
59+
for_,log:=rangelogs {
60+
pl,err:=agentsdk.ProtoFromLog(log)
61+
iferr!=nil {
62+
returnxerrors.Errorf("failed to convert log: %w",err)
63+
}
64+
q.logs=append(q.logs,pl)
65+
}
66+
logger.Debug(context.Background(),"enqueued agent logs",slog.F("new_logs",len(logs)),slog.F("queued_logs",len(q.logs)))
67+
returnnil
68+
}
69+
70+
func (l*logSender)flush(src uuid.UUID)error {
71+
l.L.Lock()
72+
deferl.L.Unlock()
73+
deferl.Broadcast()
74+
q,ok:=l.queues[src]
75+
ifok {
76+
q.flushRequested=true
77+
}
78+
// queue might not exist because it's already been flushed and removed from
79+
// the map.
80+
returnnil
81+
}
82+
83+
// sendLoop sends any pending logs until it hits an error or the context is canceled. It does not
84+
// retry as it is expected that a higher layer retries establishing connection to the agent API and
85+
// calls sendLoop again.
86+
func (l*logSender)sendLoop(ctx context.Context,destlogDest)error {
87+
ctxDone:=false
88+
deferl.logger.Debug(ctx,"sendLoop exiting")
89+
90+
// wake 4 times per flush interval to check if anything needs to be flushed
91+
gofunc() {
92+
tkr:=time.NewTicker(flushInterval/4)
93+
defertkr.Stop()
94+
for {
95+
select {
96+
// also monitor the context here, so we notice immediately, rather
97+
// than waiting for the next tick or logs
98+
case<-ctx.Done():
99+
l.L.Lock()
100+
ctxDone=true
101+
l.L.Unlock()
102+
l.Broadcast()
103+
return
104+
case<-tkr.C:
105+
l.Broadcast()
106+
}
107+
}
108+
}()
109+
110+
l.L.Lock()
111+
deferl.L.Unlock()
112+
for {
113+
for!ctxDone&&!l.hasPendingWorkLocked() {
114+
l.Wait()
115+
}
116+
ifctxDone {
117+
returnnil
118+
}
119+
src,q:=l.getPendingWorkLocked()
120+
q.flushRequested=false// clear flag since we're now flushing
121+
req:=&proto.BatchCreateLogsRequest{
122+
LogSourceId:src[:],
123+
Logs:q.logs[:],
124+
}
125+
126+
l.L.Unlock()
127+
l.logger.Debug(ctx,"sending logs to agent API",slog.F("log_source_id",src),slog.F("num_logs",len(req.Logs)))
128+
_,err:=dest.BatchCreateLogs(ctx,req)
129+
l.L.Lock()
130+
iferr!=nil {
131+
returnxerrors.Errorf("failed to upload logs: %w",err)
132+
}
133+
134+
// Since elsewhere we only append to the logs, here we can remove them
135+
// since we successfully sent them. First we nil the pointers though,
136+
// so that they can be gc'd.
137+
fori:=0;i<len(req.Logs);i++ {
138+
q.logs[i]=nil
139+
}
140+
q.logs=q.logs[len(req.Logs):]
141+
iflen(q.logs)==0 {
142+
// no empty queues
143+
delete(l.queues,src)
144+
continue
145+
}
146+
q.lastFlush=time.Now()
147+
}
148+
}
149+
150+
func (l*logSender)hasPendingWorkLocked()bool {
151+
for_,q:=rangel.queues {
152+
iftime.Since(q.lastFlush)>flushInterval {
153+
returntrue
154+
}
155+
ifq.flushRequested {
156+
returntrue
157+
}
158+
}
159+
returnfalse
160+
}
161+
162+
func (l*logSender)getPendingWorkLocked() (src uuid.UUID,q*logQueue) {
163+
// take the one it's been the longest since we've flushed, so that we have some sense of
164+
// fairness across sources
165+
varearliestFlush time.Time
166+
foris,iq:=rangel.queues {
167+
ifq==nil||iq.lastFlush.Before(earliestFlush) {
168+
src=is
169+
q=iq
170+
earliestFlush=iq.lastFlush
171+
}
172+
}
173+
returnsrc,q
174+
}

‎agent/logs_internal_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"golang.org/x/exp/slices"
9+
10+
"github.com/google/uuid"
11+
"github.com/stretchr/testify/require"
12+
13+
"cdr.dev/slog"
14+
"cdr.dev/slog/sloggers/slogtest"
15+
"github.com/coder/coder/v2/agent/proto"
16+
"github.com/coder/coder/v2/coderd/database/dbtime"
17+
"github.com/coder/coder/v2/codersdk"
18+
"github.com/coder/coder/v2/codersdk/agentsdk"
19+
"github.com/coder/coder/v2/testutil"
20+
)
21+
22+
funcTestLogSender(t*testing.T) {
23+
t.Parallel()
24+
testCtx:=testutil.Context(t,testutil.WaitShort)
25+
ctx,cancel:=context.WithCancel(testCtx)
26+
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
27+
fDest:=newFakeLogDest()
28+
uut:=newLogSender(logger)
29+
30+
t0:=dbtime.Now()
31+
32+
ls1:= uuid.UUID{0x11}
33+
err:=uut.enqueue(ls1, agentsdk.Log{
34+
CreatedAt:t0,
35+
Output:"test log 0, src 1",
36+
Level:codersdk.LogLevelInfo,
37+
})
38+
require.NoError(t,err)
39+
40+
ls2:= uuid.UUID{0x22}
41+
err=uut.enqueue(ls2,
42+
agentsdk.Log{
43+
CreatedAt:t0,
44+
Output:"test log 0, src 2",
45+
Level:codersdk.LogLevelError,
46+
},
47+
agentsdk.Log{
48+
CreatedAt:t0,
49+
Output:"test log 1, src 2",
50+
Level:codersdk.LogLevelWarn,
51+
},
52+
)
53+
require.NoError(t,err)
54+
55+
loopErr:=make(chanerror,1)
56+
gofunc() {
57+
err:=uut.sendLoop(ctx,fDest)
58+
loopErr<-err
59+
}()
60+
61+
// since neither source has even been flushed, it should immediately flush
62+
// both, although the order is not controlled
63+
varlogReqs []*proto.BatchCreateLogsRequest
64+
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
65+
logReqs=append(logReqs,testutil.RequireRecvCtx(ctx,t,fDest.reqs))
66+
for_,req:=rangelogReqs {
67+
require.NotNil(t,req)
68+
srcID,err:=uuid.FromBytes(req.LogSourceId)
69+
require.NoError(t,err)
70+
switchsrcID {
71+
casels1:
72+
require.Len(t,req.Logs,1)
73+
require.Equal(t,"test log 0, src 1",req.Logs[0].GetOutput())
74+
require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel())
75+
require.Equal(t,t0,req.Logs[0].GetCreatedAt().AsTime())
76+
casels2:
77+
require.Len(t,req.Logs,2)
78+
require.Equal(t,"test log 0, src 2",req.Logs[0].GetOutput())
79+
require.Equal(t,proto.Log_ERROR,req.Logs[0].GetLevel())
80+
require.Equal(t,t0,req.Logs[0].GetCreatedAt().AsTime())
81+
require.Equal(t,"test log 1, src 2",req.Logs[1].GetOutput())
82+
require.Equal(t,proto.Log_WARN,req.Logs[1].GetLevel())
83+
require.Equal(t,t0,req.Logs[1].GetCreatedAt().AsTime())
84+
default:
85+
t.Fatal("unknown log source")
86+
}
87+
}
88+
89+
t1:=dbtime.Now()
90+
err=uut.enqueue(ls1, agentsdk.Log{
91+
CreatedAt:t1,
92+
Output:"test log 1, src 1",
93+
Level:codersdk.LogLevelDebug,
94+
})
95+
require.NoError(t,err)
96+
err=uut.flush(ls1)
97+
require.NoError(t,err)
98+
99+
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
100+
// give ourselves a 25% buffer if we're right on the cusp of a tick
101+
require.LessOrEqual(t,time.Since(t1),flushInterval*5/4)
102+
require.NotNil(t,req)
103+
require.Len(t,req.Logs,1)
104+
require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput())
105+
require.Equal(t,proto.Log_DEBUG,req.Logs[0].GetLevel())
106+
require.Equal(t,t1,req.Logs[0].GetCreatedAt().AsTime())
107+
108+
cancel()
109+
err=testutil.RequireRecvCtx(testCtx,t,loopErr)
110+
require.NoError(t,err)
111+
112+
// we can still enqueue more logs after sendLoop returns
113+
err=uut.enqueue(ls1, agentsdk.Log{
114+
CreatedAt:t1,
115+
Output:"test log 2, src 1",
116+
Level:codersdk.LogLevelTrace,
117+
})
118+
require.NoError(t,err)
119+
}
120+
121+
typefakeLogDeststruct {
122+
reqschan*proto.BatchCreateLogsRequest
123+
}
124+
125+
func (ffakeLogDest)BatchCreateLogs(ctx context.Context,req*proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse,error) {
126+
// clone the logs so that modifications the sender makes don't affect our tests. In production
127+
// these would be serialized/deserialized so we don't have to worry too much.
128+
req.Logs=slices.Clone(req.Logs)
129+
select {
130+
case<-ctx.Done():
131+
returnnil,ctx.Err()
132+
casef.reqs<-req:
133+
return&proto.BatchCreateLogsResponse{},nil
134+
}
135+
}
136+
137+
funcnewFakeLogDest()*fakeLogDest {
138+
return&fakeLogDest{
139+
reqs:make(chan*proto.BatchCreateLogsRequest),
140+
}
141+
}

‎codersdk/agentsdk/convert.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/uuid"
88
"golang.org/x/xerrors"
99
"google.golang.org/protobuf/types/known/durationpb"
10+
"google.golang.org/protobuf/types/known/timestamppb"
1011

1112
"github.com/coder/coder/v2/agent/proto"
1213
"github.com/coder/coder/v2/codersdk"
@@ -298,3 +299,15 @@ func ProtoFromAppHealthsRequest(req PostAppHealthsRequest) (*proto.BatchUpdateAp
298299
}
299300
returnpReq,nil
300301
}
302+
303+
funcProtoFromLog(logLog) (*proto.Log,error) {
304+
lvl,ok:=proto.Log_Level_value[strings.ToUpper(string(log.Level))]
305+
if!ok {
306+
returnnil,xerrors.Errorf("unknown log level: %s",log.Level)
307+
}
308+
return&proto.Log{
309+
CreatedAt:timestamppb.New(log.CreatedAt),
310+
Output:log.Output,
311+
Level:proto.Log_Level(lvl),
312+
},nil
313+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp