Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit43d87db

Browse files
committed
feat: add socket auditor for forwarding logs to coder agent
Add SocketAuditor that sends audit logs to the Coder workspace agentvia a Unix socket. This enables boundary audit events to be forwardedto coderd for centralized logging.Implementation notes:- Batching: 10 logs or 5 seconds, whichever comes first- Wire format: length-prefixed protobuf. proto imported from AgentAPI to simplify boundary -> agent -> coderd forwarding to start.
1 parent9c9c878 commit43d87db

File tree

6 files changed

+786
-87
lines changed

6 files changed

+786
-87
lines changed

‎audit/multi_auditor.go‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package audit
2+
3+
// MultiAuditor wraps multiple auditors and sends audit events to all of them.
4+
typeMultiAuditorstruct {
5+
auditors []Auditor
6+
}
7+
8+
// NewMultiAuditor creates a new MultiAuditor that sends to all provided auditors.
9+
funcNewMultiAuditor(auditors...Auditor)*MultiAuditor {
10+
return&MultiAuditor{auditors:auditors}
11+
}
12+
13+
// AuditRequest sends the request to all wrapped auditors.
14+
func (m*MultiAuditor)AuditRequest(reqRequest) {
15+
for_,a:=rangem.auditors {
16+
a.AuditRequest(req)
17+
}
18+
}

‎audit/socket_auditor.go‎

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package audit
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"log/slog"
7+
"net"
8+
"time"
9+
10+
"google.golang.org/protobuf/proto"
11+
"google.golang.org/protobuf/types/known/timestamppb"
12+
13+
agentproto"github.com/coder/coder/v2/agent/proto"
14+
)
15+
16+
const (
17+
defaultBatchSize=10
18+
defaultBatchTimerDuration=5*time.Second
19+
// DefaultAuditSocketPath is the well-known path for the boundary audit socket.
20+
// The expectation is the Coder agent listens on this socket to receive audit logs.
21+
DefaultAuditSocketPath="/tmp/boundary-audit.sock"
22+
)
23+
24+
// SocketAuditor implements the Auditor interface. It sends logs to the
25+
// workspace agent's boundary log proxy socket. It queues logs and sends
26+
// them in batches using a batch size and timer. The internal queue operates
27+
// as a FIFO i.e., logs are sent in the order they are received and dropped
28+
// if the queue is full.
29+
typeSocketAuditorstruct {
30+
socketPathstring
31+
logger*slog.Logger
32+
logChchan*agentproto.BoundaryLog
33+
batchSizeint
34+
batchTimerDuration time.Duration
35+
36+
// onFlushAttempt is called after each flush attempt (intended for testing).
37+
onFlushAttemptfunc()
38+
}
39+
40+
// NewSocketAuditor creates a new SocketAuditor that sends logs to the agent's
41+
// boundary log proxy socket at DefaultAuditSocketPath after SocketAuditor.Loop
42+
// is called.
43+
funcNewSocketAuditor(logger*slog.Logger)*SocketAuditor {
44+
return&SocketAuditor{
45+
socketPath:DefaultAuditSocketPath,
46+
logger:logger,
47+
logCh:make(chan*agentproto.BoundaryLog,2*defaultBatchSize),
48+
batchSize:defaultBatchSize,
49+
batchTimerDuration:defaultBatchTimerDuration,
50+
}
51+
}
52+
53+
// AuditRequest implements the Auditor interface. It queues the log to be sent to the
54+
// agent in a batch.
55+
func (s*SocketAuditor)AuditRequest(reqRequest) {
56+
httpReq:=&agentproto.BoundaryLog_HttpRequest{
57+
Method:req.Method,
58+
Url:req.URL,
59+
}
60+
// Only include the matched rule for denied requests, as documented in
61+
// the proto schema.
62+
if!req.Allowed {
63+
httpReq.MatchedRule=req.Rule
64+
}
65+
66+
log:=&agentproto.BoundaryLog{
67+
Allowed:req.Allowed,
68+
Time:timestamppb.Now(),
69+
Resource:&agentproto.BoundaryLog_HttpRequest_{HttpRequest:httpReq},
70+
}
71+
72+
select {
73+
cases.logCh<-log:
74+
default:
75+
s.logger.Warn("audit log dropped, channel full")
76+
}
77+
}
78+
79+
// flushErr represents an error from flush, distinguishing between
80+
// permanent errors (bad data) and transient errors (network issues).
81+
typeflushErrstruct {
82+
errerror
83+
permanentbool
84+
}
85+
86+
func (e*flushErr)Error()string {returne.err.Error() }
87+
88+
// flush sends the current batch of logs to the given connection.
89+
funcflush(conn net.Conn,logs []*agentproto.BoundaryLog)*flushErr {
90+
iflen(logs)==0 {
91+
returnnil
92+
}
93+
94+
req:=&agentproto.ReportBoundaryLogsRequest{
95+
Logs:logs,
96+
}
97+
98+
data,err:=proto.Marshal(req)
99+
iferr!=nil {
100+
return&flushErr{err:err,permanent:true}
101+
}
102+
103+
iferr:=binary.Write(conn,binary.BigEndian,uint32(len(data)));err!=nil {
104+
return&flushErr{err:err}
105+
}
106+
if_,err:=conn.Write(data);err!=nil {
107+
return&flushErr{err:err}
108+
}
109+
returnnil
110+
}
111+
112+
// Loop handles the I/O to send audit logs to the agent.
113+
func (s*SocketAuditor)Loop(ctx context.Context) {
114+
varconn net.Conn
115+
batch:=make([]*agentproto.BoundaryLog,0,s.batchSize)
116+
t:=time.NewTimer(0)
117+
t.Stop()
118+
119+
// connect attempts to establish a connection to the socket.
120+
connect:=func() {
121+
ifconn!=nil {
122+
return
123+
}
124+
varerrerror
125+
conn,err=net.Dial("unix",s.socketPath)
126+
iferr!=nil {
127+
s.logger.Warn("failed to connect to audit socket","path",s.socketPath,"error",err)
128+
conn=nil
129+
}
130+
}
131+
132+
// closeConn closes the current connection if open.
133+
closeConn:=func() {
134+
ifconn!=nil {
135+
_=conn.Close()
136+
conn=nil
137+
}
138+
}
139+
140+
// clearBatch resets the length of the batch and frees memory while preserving
141+
// the batch slice backing array.
142+
clearBatch:=func() {
143+
fori:=rangelen(batch) {
144+
batch[i]=nil
145+
}
146+
batch=batch[:0]
147+
}
148+
149+
// doFlush flushes the batch and handles errors by reconnecting.
150+
doFlush:=func() {
151+
t.Stop()
152+
deferfunc() {
153+
ifs.onFlushAttempt!=nil {
154+
s.onFlushAttempt()
155+
}
156+
}()
157+
iflen(batch)==0 {
158+
return
159+
}
160+
connect()
161+
ifconn==nil {
162+
// No connection: logs will be retried on next flush.
163+
return
164+
}
165+
166+
iferr:=flush(conn,batch);err!=nil {
167+
s.logger.Warn("failed to flush audit logs","error",err)
168+
iferr.permanent {
169+
// Data error: discard batch to avoid infinite retries.
170+
clearBatch()
171+
}else {
172+
// Network error: close connection but keep batch for a future retry.
173+
closeConn()
174+
}
175+
return
176+
}
177+
178+
clearBatch()
179+
}
180+
181+
connect()
182+
183+
for {
184+
select {
185+
case<-ctx.Done():
186+
// Drain any pending logs before the last flush. Not concerned about
187+
// growing the batch slice here since we're exiting.
188+
drain:
189+
for {
190+
select {
191+
caselog:=<-s.logCh:
192+
batch=append(batch,log)
193+
default:
194+
break drain
195+
}
196+
}
197+
198+
doFlush()
199+
closeConn()
200+
return
201+
case<-t.C:
202+
doFlush()
203+
caselog:=<-s.logCh:
204+
// If batch is at capacity, attempt flushing first and drop the log if
205+
// the batch still full.
206+
iflen(batch)>=s.batchSize {
207+
doFlush()
208+
iflen(batch)>=s.batchSize {
209+
s.logger.Warn("audit log dropped, batch full")
210+
continue
211+
}
212+
}
213+
214+
batch=append(batch,log)
215+
216+
iflen(batch)==1 {
217+
t.Reset(s.batchTimerDuration)
218+
}
219+
220+
iflen(batch)>=s.batchSize {
221+
doFlush()
222+
}
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp