Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc4b57b9

Browse files
committed
derpmap telemetry clean
1 parent4a2e18c commitc4b57b9

File tree

4 files changed

+148
-41
lines changed

4 files changed

+148
-41
lines changed

‎tailnet/configmaps.go‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,15 +283,17 @@ func (c *configMaps) getBlockEndpoints() bool {
283283

284284
// setDERPMap sets the DERP map, triggering a configuration of the engine if it has changed.
285285
// c.L MUST NOT be held.
286-
func (c*configMaps)setDERPMap(derpMap*tailcfg.DERPMap) {
286+
// Returns if the derpMap is dirty.
287+
func (c*configMaps)setDERPMap(derpMap*tailcfg.DERPMap)bool {
287288
c.L.Lock()
288289
deferc.L.Unlock()
289290
ifCompareDERPMaps(c.derpMap,derpMap) {
290-
return
291+
returnfalse
291292
}
292293
c.derpMap=derpMap
293294
c.derpMapDirty=true
294295
c.Broadcast()
296+
returntrue
295297
}
296298

297299
// derMapLocked returns the current DERPMap. c.L must be held

‎tailnet/conn.go‎

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ func NewConn(options *Options) (conn *Conn, err error) {
138138

139139
var (
140140
logger=newMultiLogger(options.Logger)
141-
telemetryLogSink*bufferLogSink
141+
telemetryLogSink*TelemetryStore
142142
)
143143
ifoptions.TelemetrySink!=nil {
144144
varerrerror
145-
telemetryLogSink,err=newBufferLogSink()
145+
telemetryLogSink,err=newTelemetryStore()
146146
iferr!=nil {
147147
returnnil,xerrors.Errorf("create telemetry log sink: %w",err)
148148
}
@@ -352,7 +352,7 @@ type Conn struct {
352352

353353
telemetrySinkTelemetrySink
354354
// telemetryLogs will be nil if telemetrySink is nil.
355-
telemetryLogs*bufferLogSink
355+
telemetryLogs*TelemetryStore
356356
telemetryWg sync.WaitGroup
357357

358358
trafficStats*connstats.Statistics
@@ -388,7 +388,9 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
388388

389389
// SetDERPMap updates the DERPMap of a connection.
390390
func (c*Conn)SetDERPMap(derpMap*tailcfg.DERPMap) {
391-
c.configMaps.setDERPMap(derpMap)
391+
ifc.configMaps.setDERPMap(derpMap)&&c.telemetryLogs!=nil {
392+
c.telemetryLogs.updateDerpMap(derpMap)
393+
}
392394
}
393395

394396
func (c*Conn)SetDERPForceWebSockets(vbool) {
@@ -512,6 +514,8 @@ func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
512514
for {
513515
select {
514516
case<-completedCtx.Done():
517+
// TODO(ethanndickson): For now, I'm interpreting 'connected' as when the
518+
// agent is reachable.
515519
_=c.connectedTelemetryEvent()
516520
returntrue
517521
case<-t.C:
@@ -719,6 +723,7 @@ func (c *Conn) connectedTelemetryEvent() error {
719723
returnnil
720724
}
721725

726+
// The returned telemetry event will not have it's status set.
722727
func (c*Conn)newTelemetryEvent() (*proto.TelemetryEvent,error) {
723728
id,err:=c.id.MarshalBinary()
724729
iferr!=nil {
@@ -728,22 +733,22 @@ func (c *Conn) newTelemetryEvent() (*proto.TelemetryEvent, error) {
728733
node:=c.nodeUpdater.nodeLocked()
729734
c.nodeUpdater.L.Unlock()
730735

731-
logs,ips:=c.telemetryLogs.getLogs()
736+
logs,ips,dm:=c.telemetryLogs.getStore()
732737
return&proto.TelemetryEvent{
733738
Id:id,
734739
Time:timestamppb.Now(),
735740
ClientType:c.clientType,
736741
NodeIdSelf:uint64(node.ID),
737742
Logs:logs,
738743
LogIpHashes:ips,
744+
DerpMap:DERPMapToProto(dm),
739745

740746
// TODO:
741747
Application:"",
742748
NodeIdRemote:0,
743749
P2PEndpoint:&proto.TelemetryEvent_P2PEndpoint{},
744750
ThroughputMbits:&wrapperspb.FloatValue{},
745751
HomeDerp:"",
746-
DerpMap:&proto.DERPMap{},
747752
LatestNetcheck:&proto.Netcheck{},
748753
ConnectionAge:&durationpb.Duration{},
749754
ConnectionSetup:&durationpb.Duration{},

‎tailnet/logger_internal_test.go‎

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,27 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/require"
9+
"tailscale.com/tailcfg"
910

1011
"cdr.dev/slog"
1112
"github.com/coder/coder/v2/tailnet/proto"
1213
)
1314

14-
funcTestBufferLogSink(t*testing.T) {
15+
funcTestTelemetryStore(t*testing.T) {
1516
t.Parallel()
1617

1718
t.Run("NoIP",func(t*testing.T) {
1819
t.Parallel()
1920
ctx:=context.Background()
20-
sink,err:=newBufferLogSink()
21+
sink,err:=newTelemetryStore()
2122
require.NoError(t,err)
2223
logger:=slog.Make(sink).Leveled(slog.LevelDebug)
2324

2425
logger.Debug(ctx,"line1")
2526
logger.Debug(ctx,"line2 fe80")
2627
logger.Debug(ctx,"line3 xxxx::x")
2728

28-
logs,hashes:=sink.getLogs()
29+
logs,hashes,_:=sink.getStore()
2930
require.Len(t,logs,3)
3031
require.Len(t,hashes,0)
3132
require.Contains(t,logs[0],"line1")
@@ -103,7 +104,7 @@ func TestBufferLogSink(t *testing.T) {
103104
t.Run(c.name,func(t*testing.T) {
104105
t.Parallel()
105106
ctx:=context.Background()
106-
sink,err:=newBufferLogSink()
107+
sink,err:=newTelemetryStore()
107108
require.NoError(t,err)
108109
logger:=slog.Make(sink).Leveled(slog.LevelDebug)
109110

@@ -116,15 +117,15 @@ func TestBufferLogSink(t *testing.T) {
116117
logger.Debug(ctx,fmt.Sprintf("line2: %s/24",c.ip))
117118
logger.Debug(ctx,fmt.Sprintf("line3: %s foo (%s)",ipWithPort,c.ip))
118119

119-
logs,hashes:=sink.getLogs()
120+
logs,ips,_:=sink.getStore()
120121
require.Len(t,logs,3)
121-
require.Len(t,hashes,1)
122+
require.Len(t,ips,1)
122123
for_,log:=rangelogs {
123124
t.Log(log)
124125
}
125126

126127
// This only runs once since we only processed a single IP.
127-
forexpectedHash,ipFields:=rangehashes {
128+
forexpectedHash,ipFields:=rangeips {
128129
hashedIPWithPort:=expectedHash+":8080"
129130
ifc.expectedVersion==6 {
130131
hashedIPWithPort=fmt.Sprintf("[%s]:8080",expectedHash)
@@ -141,4 +142,55 @@ func TestBufferLogSink(t *testing.T) {
141142
})
142143
}
143144
})
145+
146+
t.Run("DerpMapClean",func(t*testing.T) {
147+
t.Parallel()
148+
ctx:=context.Background()
149+
telemetry,err:=newTelemetryStore()
150+
require.NoError(t,err)
151+
logger:=slog.Make(telemetry).Leveled(slog.LevelDebug)
152+
153+
derpMap:=&tailcfg.DERPMap{
154+
Regions:make(map[int]*tailcfg.DERPRegion),
155+
}
156+
// Add a region and node that uses every single field.
157+
derpMap.Regions[999]=&tailcfg.DERPRegion{
158+
RegionID:999,
159+
EmbeddedRelay:true,
160+
RegionCode:"zzz",
161+
RegionName:"Cool Region",
162+
Avoid:true,
163+
164+
Nodes: []*tailcfg.DERPNode{
165+
{
166+
Name:"zzz1",
167+
RegionID:999,
168+
HostName:"coolderp.com",
169+
CertName:"coolderpcert",
170+
IPv4:"1.2.3.4",
171+
IPv6:"2001:db8::1",
172+
STUNTestIP:"5.6.7.8",
173+
},
174+
},
175+
}
176+
telemetry.updateDerpMap(derpMap)
177+
178+
logger.Debug(ctx,"line1 coolderp.com qwerty")
179+
logger.Debug(ctx,"line2 1.2.3.4 asdf")
180+
logger.Debug(ctx,"line3 2001:db8::1 foo")
181+
182+
logs,ips,dm:=telemetry.getStore()
183+
require.Len(t,logs,3)
184+
require.Len(t,ips,3)
185+
require.Len(t,dm.Regions[999].Nodes,1)
186+
node:=dm.Regions[999].Nodes[0]
187+
require.NotContains(t,node.HostName,"coolderp.com")
188+
require.NotContains(t,node.IPv4,"1.2.3.4")
189+
require.NotContains(t,node.IPv6,"2001:db8::1")
190+
require.NotContains(t,node.STUNTestIP,"5.6.7.8")
191+
require.Contains(t,logs[0],node.HostName)
192+
require.Contains(t,ips,node.STUNTestIP)
193+
require.Contains(t,ips,node.IPv6)
194+
require.Contains(t,ips,node.IPv4)
195+
})
144196
}

‎tailnet/logger.go‎renamed to ‎tailnet/telemetry.go‎

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212

1313
"golang.org/x/xerrors"
14+
"tailscale.com/tailcfg"
1415

1516
"cdr.dev/slog"
1617
"cdr.dev/slog/sloggers/sloghuman"
@@ -87,45 +88,81 @@ func (m multiLogger) With(fields ...slog.Field) multiLogger {
8788

8889
// A logger sink that extracts (anonymized) IP addresses from logs for building
8990
// network telemetry events
90-
typebufferLogSinkstruct {
91+
typeTelemetryStorestruct {
92+
// Always self-referential
9193
sink slog.Sink
9294
mu sync.Mutex
9395
// TODO: Store only useful logs
94-
logs []string
95-
// We use the same salt so the same IP hashes to the same value.
96+
logs []string
9697
hashSaltstring
97-
// A cache to avoid hashing the same IP multiple times.
98-
ipToHashmap[string]string
98+
// A cache to avoid hashing the same IPor hostnamemultiple times.
99+
hashCachemap[string]string
99100
hashedIPsmap[string]*proto.IPFields
101+
102+
cleanDerpMap*tailcfg.DERPMap
103+
derpMapFilter*regexp.Regexp
100104
}
101105

102-
var_ slog.Sink=&bufferLogSink{}
106+
var_ slog.Sink=&TelemetryStore{}
103107

104-
var_ io.Writer=&bufferLogSink{}
108+
var_ io.Writer=&TelemetryStore{}
105109

106-
funcnewBufferLogSink() (*bufferLogSink,error) {
110+
funcnewTelemetryStore() (*TelemetryStore,error) {
107111
hashSalt,err:=cryptorand.String(16)
108112
iferr!=nil {
109113
returnnil,err
110114
}
111-
out:=&bufferLogSink{
112-
logs: []string{},
113-
hashSalt:hashSalt,
114-
ipToHash:make(map[string]string),
115-
hashedIPs:make(map[string]*proto.IPFields),
115+
out:=&TelemetryStore{
116+
logs: []string{},
117+
hashSalt:hashSalt,
118+
hashCache:make(map[string]string),
119+
hashedIPs:make(map[string]*proto.IPFields),
120+
derpMapFilter:regexp.MustCompile(`^$`),
116121
}
117122
out.sink=sloghuman.Sink(out)
118123
returnout,nil
119124
}
120125

121-
func (b*bufferLogSink)getLogs() ([]string,map[string]*proto.IPFields) {
126+
func (b*TelemetryStore)getStore() ([]string,map[string]*proto.IPFields,*tailcfg.DERPMap) {
122127
b.mu.Lock()
123128
deferb.mu.Unlock()
124-
returnappend([]string{},b.logs...),b.hashedIPs
129+
returnappend([]string{},b.logs...),b.hashedIPs,b.cleanDerpMap.Clone()
130+
}
131+
132+
// Given a DERPMap, anonymise all IPs and hostnames.
133+
// Keep track of seen hostnames/cert names to anonymize them from future logs.
134+
// b.mu must NOT be held.
135+
func (b*TelemetryStore)updateDerpMap(cur*tailcfg.DERPMap) {
136+
b.mu.Lock()
137+
deferb.mu.Unlock()
138+
varnames []string
139+
cleanMap:=cur.Clone()
140+
for_,r:=rangecleanMap.Regions {
141+
for_,n:=ranger.Nodes {
142+
escapedName:=regexp.QuoteMeta(n.HostName)
143+
escapedCertName:=regexp.QuoteMeta(n.CertName)
144+
names=append(names,escapedName,escapedCertName)
145+
146+
ipv4,_:=b.processIPLocked(n.IPv4)
147+
n.IPv4=ipv4
148+
ipv6,_:=b.processIPLocked(n.IPv6)
149+
n.IPv6=ipv6
150+
stunIP,_:=b.processIPLocked(n.STUNTestIP)
151+
n.STUNTestIP=stunIP
152+
hn:=b.hashAddr(n.HostName)
153+
n.HostName=hn
154+
cn:=b.hashAddr(n.CertName)
155+
n.CertName=cn
156+
}
157+
}
158+
iflen(names)!=0 {
159+
b.derpMapFilter=regexp.MustCompile((strings.Join(names,"|")))
160+
}
161+
b.cleanDerpMap=cleanMap
125162
}
126163

127164
// Write implements io.Writer.
128-
func (b*bufferLogSink)Write(p []byte) (nint,errerror) {
165+
func (b*TelemetryStore)Write(p []byte) (nint,errerror) {
129166
b.mu.Lock()
130167
deferb.mu.Unlock()
131168

@@ -138,37 +175,39 @@ func (b *bufferLogSink) Write(p []byte) (n int, err error) {
138175
iflen(logLineAfterLevel)==2 {
139176
logLineAfterLevel=logLineSplit[1]
140177
}
178+
// Anonymize IP addresses
141179
for_,match:=rangeipv4And6Regex.FindAllString(logLineAfterLevel,-1) {
142180
hash,err:=b.processIPLocked(match)
143181
iferr==nil {
144182
logLine=strings.ReplaceAll(logLine,match,hash)
145183
}
146184
}
185+
// Anonymize derp map host names
186+
for_,match:=rangeb.derpMapFilter.FindAllString(logLineAfterLevel,-1) {
187+
hash:=b.hashAddr(match)
188+
logLine=strings.ReplaceAll(logLine,match,hash)
189+
}
147190

148191
b.logs=append(b.logs,logLine)
149192
returnlen(p),nil
150193
}
151194

152195
// LogEntry implements slog.Sink.
153-
func (b*bufferLogSink)LogEntry(ctx context.Context,e slog.SinkEntry) {
196+
func (b*TelemetryStore)LogEntry(ctx context.Context,e slog.SinkEntry) {
154197
// This will call (*bufferLogSink).Write
155198
b.sink.LogEntry(ctx,e)
156199
}
157200

158201
// Sync implements slog.Sink.
159-
func (b*bufferLogSink)Sync() {
202+
func (b*TelemetryStore)Sync() {
160203
b.sink.Sync()
161204
}
162205

163206
// processIPLocked will look up the IP in the cache, or hash and salt it and add
164207
// to the cache. It will also add it to hashedIPs.
165208
//
166209
// b.mu must be held.
167-
func (b*bufferLogSink)processIPLocked(ipstring) (string,error) {
168-
ifhashStr,ok:=b.ipToHash[ip];ok {
169-
returnhashStr,nil
170-
}
171-
210+
func (b*TelemetryStore)processIPLocked(ipstring) (string,error) {
172211
addr,err:=netip.ParseAddr(ip)
173212
iferr!=nil {
174213
return"",xerrors.Errorf("failed to parse IP %q: %w",ip,err)
@@ -190,12 +229,21 @@ func (b *bufferLogSink) processIPLocked(ip string) (string, error) {
190229
class=proto.IPFields_PRIVATE
191230
}
192231

193-
hash:=sha256.Sum256([]byte(b.hashSalt+ip))
194-
hashStr:=hex.EncodeToString(hash[:])
195-
b.ipToHash[ip]=hashStr
232+
hashStr:=b.hashAddr(ip)
196233
b.hashedIPs[hashStr]=&proto.IPFields{
197234
Version:version,
198235
Class:class,
199236
}
200237
returnhashStr,nil
201238
}
239+
240+
func (b*TelemetryStore)hashAddr(addrstring)string {
241+
ifhashStr,ok:=b.hashCache[addr];ok {
242+
returnhashStr
243+
}
244+
245+
hash:=sha256.Sum256([]byte(b.hashSalt+addr))
246+
hashStr:=hex.EncodeToString(hash[:])
247+
b.hashCache[addr]=hashStr
248+
returnhashStr
249+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp