Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1cf4b62

Browse files
authored
feat: change agent to use v2 API for reporting stats (#12024)
Modifies the agent to use the v2 API to report its statistics, using the `statsReporter` subcomponent.
1 parent70ad833 commit1cf4b62

File tree

8 files changed

+135
-301
lines changed

8 files changed

+135
-301
lines changed

‎agent/agent.go

Lines changed: 82 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type Options struct {
8989

9090
typeClientinterface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn,error)
92-
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
9392
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
9493
PostMetadata(ctx context.Context,req agentsdk.PostMetadataRequest)error
9594
PatchLogs(ctx context.Context,req agentsdk.PatchLogs)error
@@ -158,7 +157,6 @@ func New(options Options) Agent {
158157
lifecycleStates: []agentsdk.PostLifecycleRequest{{State:codersdk.WorkspaceAgentLifecycleCreated}},
159158
ignorePorts:options.IgnorePorts,
160159
portCacheDuration:options.PortCacheDuration,
161-
connStatsChan:make(chan*agentsdk.Stats,1),
162160
reportMetadataInterval:options.ReportMetadataInterval,
163161
serviceBannerRefreshInterval:options.ServiceBannerRefreshInterval,
164162
sshMaxTimeout:options.SSHMaxTimeout,
@@ -216,8 +214,7 @@ type agent struct {
216214

217215
network*tailnet.Conn
218216
addresses []netip.Prefix
219-
connStatsChanchan*agentsdk.Stats
220-
latestStat atomic.Pointer[agentsdk.Stats]
217+
statsReporter*statsReporter
221218

222219
connCountReconnectingPTY atomic.Int64
223220

@@ -822,14 +819,13 @@ func (a *agent) run(ctx context.Context) error {
822819
closed:=a.isClosed()
823820
if!closed {
824821
a.network=network
822+
a.statsReporter=newStatsReporter(a.logger,network,a)
825823
}
826824
a.closeMutex.Unlock()
827825
ifclosed {
828826
_=network.Close()
829827
returnxerrors.New("agent is closed")
830828
}
831-
832-
a.startReportingConnectionStats(ctx)
833829
}else {
834830
// Update the wireguard IPs if the agent ID changed.
835831
err:=network.SetAddresses(a.wireguardAddresses(manifest.AgentID))
@@ -871,6 +867,15 @@ func (a *agent) run(ctx context.Context) error {
871867
returnnil
872868
})
873869

870+
eg.Go(func()error {
871+
a.logger.Debug(egCtx,"running stats report loop")
872+
err:=a.statsReporter.reportLoop(egCtx,aAPI)
873+
iferr!=nil {
874+
returnxerrors.Errorf("report stats loop: %w",err)
875+
}
876+
returnnil
877+
})
878+
874879
returneg.Wait()
875880
}
876881

@@ -1218,115 +1223,83 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
12181223
returnrpty.Attach(ctx,connectionID,conn,msg.Height,msg.Width,connLogger)
12191224
}
12201225

1221-
// startReportingConnectionStats runs the connection stats reporting goroutine.
1222-
func (a*agent)startReportingConnectionStats(ctx context.Context) {
1223-
reportStats:=func(networkStatsmap[netlogtype.Connection]netlogtype.Counts) {
1224-
a.logger.Debug(ctx,"computing stats report")
1225-
stats:=&agentsdk.Stats{
1226-
ConnectionCount:int64(len(networkStats)),
1227-
ConnectionsByProto:map[string]int64{},
1228-
}
1229-
forconn,counts:=rangenetworkStats {
1230-
stats.ConnectionsByProto[conn.Proto.String()]++
1231-
stats.RxBytes+=int64(counts.RxBytes)
1232-
stats.RxPackets+=int64(counts.RxPackets)
1233-
stats.TxBytes+=int64(counts.TxBytes)
1234-
stats.TxPackets+=int64(counts.TxPackets)
1235-
}
1236-
1237-
// The count of active sessions.
1238-
sshStats:=a.sshServer.ConnStats()
1239-
stats.SessionCountSSH=sshStats.Sessions
1240-
stats.SessionCountVSCode=sshStats.VSCode
1241-
stats.SessionCountJetBrains=sshStats.JetBrains
1242-
1243-
stats.SessionCountReconnectingPTY=a.connCountReconnectingPTY.Load()
1244-
1245-
// Compute the median connection latency!
1246-
a.logger.Debug(ctx,"starting peer latency measurement for stats")
1247-
varwg sync.WaitGroup
1248-
varmu sync.Mutex
1249-
status:=a.network.Status()
1250-
durations:= []float64{}
1251-
pingCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
1252-
defercancelFunc()
1253-
fornodeID,peer:=rangestatus.Peer {
1254-
if!peer.Active {
1255-
continue
1256-
}
1257-
addresses,found:=a.network.NodeAddresses(nodeID)
1258-
if!found {
1259-
continue
1260-
}
1261-
iflen(addresses)==0 {
1262-
continue
1263-
}
1264-
wg.Add(1)
1265-
gofunc() {
1266-
deferwg.Done()
1267-
duration,_,_,err:=a.network.Ping(pingCtx,addresses[0].Addr())
1268-
iferr!=nil {
1269-
return
1270-
}
1271-
mu.Lock()
1272-
durations=append(durations,float64(duration.Microseconds()))
1273-
mu.Unlock()
1274-
}()
1226+
// Collect collects additional stats from the agent
1227+
func (a*agent)Collect(ctx context.Context,networkStatsmap[netlogtype.Connection]netlogtype.Counts)*proto.Stats {
1228+
a.logger.Debug(context.Background(),"computing stats report")
1229+
stats:=&proto.Stats{
1230+
ConnectionCount:int64(len(networkStats)),
1231+
ConnectionsByProto:map[string]int64{},
1232+
}
1233+
forconn,counts:=rangenetworkStats {
1234+
stats.ConnectionsByProto[conn.Proto.String()]++
1235+
stats.RxBytes+=int64(counts.RxBytes)
1236+
stats.RxPackets+=int64(counts.RxPackets)
1237+
stats.TxBytes+=int64(counts.TxBytes)
1238+
stats.TxPackets+=int64(counts.TxPackets)
1239+
}
1240+
1241+
// The count of active sessions.
1242+
sshStats:=a.sshServer.ConnStats()
1243+
stats.SessionCountSsh=sshStats.Sessions
1244+
stats.SessionCountVscode=sshStats.VSCode
1245+
stats.SessionCountJetbrains=sshStats.JetBrains
1246+
1247+
stats.SessionCountReconnectingPty=a.connCountReconnectingPTY.Load()
1248+
1249+
// Compute the median connection latency!
1250+
a.logger.Debug(ctx,"starting peer latency measurement for stats")
1251+
varwg sync.WaitGroup
1252+
varmu sync.Mutex
1253+
status:=a.network.Status()
1254+
durations:= []float64{}
1255+
pingCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
1256+
defercancelFunc()
1257+
fornodeID,peer:=rangestatus.Peer {
1258+
if!peer.Active {
1259+
continue
12751260
}
1276-
wg.Wait()
1277-
sort.Float64s(durations)
1278-
durationsLength:=len(durations)
1279-
ifdurationsLength==0 {
1280-
stats.ConnectionMedianLatencyMS=-1
1281-
}elseifdurationsLength%2==0 {
1282-
stats.ConnectionMedianLatencyMS= (durations[durationsLength/2-1]+durations[durationsLength/2])/2
1283-
}else {
1284-
stats.ConnectionMedianLatencyMS=durations[durationsLength/2]
1261+
addresses,found:=a.network.NodeAddresses(nodeID)
1262+
if!found {
1263+
continue
12851264
}
1286-
// Convert from microseconds to milliseconds.
1287-
stats.ConnectionMedianLatencyMS/=1000
1288-
1289-
// Collect agent metrics.
1290-
// Agent metrics are changing all the time, so there is no need to perform
1291-
// reflect.DeepEqual to see if stats should be transferred.
1292-
1293-
metricsCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
1294-
defercancelFunc()
1295-
a.logger.Debug(ctx,"collecting agent metrics for stats")
1296-
stats.Metrics=a.collectMetrics(metricsCtx)
1297-
1298-
a.latestStat.Store(stats)
1299-
1300-
a.logger.Debug(ctx,"about to send stats")
1301-
select {
1302-
casea.connStatsChan<-stats:
1303-
a.logger.Debug(ctx,"successfully sent stats")
1304-
case<-a.closed:
1305-
a.logger.Debug(ctx,"didn't send stats because we are closed")
1265+
iflen(addresses)==0 {
1266+
continue
13061267
}
1268+
wg.Add(1)
1269+
gofunc() {
1270+
deferwg.Done()
1271+
duration,_,_,err:=a.network.Ping(pingCtx,addresses[0].Addr())
1272+
iferr!=nil {
1273+
return
1274+
}
1275+
mu.Lock()
1276+
defermu.Unlock()
1277+
durations=append(durations,float64(duration.Microseconds()))
1278+
}()
13071279
}
1308-
1309-
// Report statistics from the created network.
1310-
cl,err:=a.client.ReportStats(ctx,a.logger,a.connStatsChan,func(d time.Duration) {
1311-
a.network.SetConnStatsCallback(d,2048,
1312-
func(_,_ time.Time,virtual,_map[netlogtype.Connection]netlogtype.Counts) {
1313-
reportStats(virtual)
1314-
},
1315-
)
1316-
})
1317-
iferr!=nil {
1318-
a.logger.Error(ctx,"agent failed to report stats",slog.Error(err))
1280+
wg.Wait()
1281+
sort.Float64s(durations)
1282+
durationsLength:=len(durations)
1283+
ifdurationsLength==0 {
1284+
stats.ConnectionMedianLatencyMs=-1
1285+
}elseifdurationsLength%2==0 {
1286+
stats.ConnectionMedianLatencyMs= (durations[durationsLength/2-1]+durations[durationsLength/2])/2
13191287
}else {
1320-
iferr=a.trackConnGoroutine(func() {
1321-
// This is OK because the agent never re-creates the tailnet
1322-
// and the only shutdown indicator is agent.Close().
1323-
<-a.closed
1324-
_=cl.Close()
1325-
});err!=nil {
1326-
a.logger.Debug(ctx,"report stats goroutine",slog.Error(err))
1327-
_=cl.Close()
1328-
}
1288+
stats.ConnectionMedianLatencyMs=durations[durationsLength/2]
13291289
}
1290+
// Convert from microseconds to milliseconds.
1291+
stats.ConnectionMedianLatencyMs/=1000
1292+
1293+
// Collect agent metrics.
1294+
// Agent metrics are changing all the time, so there is no need to perform
1295+
// reflect.DeepEqual to see if stats should be transferred.
1296+
1297+
metricsCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
1298+
defercancelFunc()
1299+
a.logger.Debug(ctx,"collecting agent metrics for stats")
1300+
stats.Metrics=a.collectMetrics(metricsCtx)
1301+
1302+
returnstats
13301303
}
13311304

13321305
varprioritizedProcs= []string{"coder agent"}

‎agent/agent_test.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"github.com/coder/coder/v2/agent/agentproc/agentproctest"
5353
"github.com/coder/coder/v2/agent/agentssh"
5454
"github.com/coder/coder/v2/agent/agenttest"
55+
"github.com/coder/coder/v2/agent/proto"
5556
"github.com/coder/coder/v2/codersdk"
5657
"github.com/coder/coder/v2/codersdk/agentsdk"
5758
"github.com/coder/coder/v2/pty/ptytest"
@@ -85,11 +86,11 @@ func TestAgent_Stats_SSH(t *testing.T) {
8586
err=session.Shell()
8687
require.NoError(t,err)
8788

88-
vars*agentsdk.Stats
89+
vars*proto.Stats
8990
require.Eventuallyf(t,func()bool {
9091
varokbool
9192
s,ok=<-stats
92-
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountSSH==1
93+
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountSsh==1
9394
},testutil.WaitLong,testutil.IntervalFast,
9495
"never saw stats: %+v",s,
9596
)
@@ -118,11 +119,11 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
118119
_,err=ptyConn.Write(data)
119120
require.NoError(t,err)
120121

121-
vars*agentsdk.Stats
122+
vars*proto.Stats
122123
require.Eventuallyf(t,func()bool {
123124
varokbool
124125
s,ok=<-stats
125-
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountReconnectingPTY==1
126+
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountReconnectingPty==1
126127
},testutil.WaitLong,testutil.IntervalFast,
127128
"never saw stats: %+v",s,
128129
)
@@ -177,14 +178,14 @@ func TestAgent_Stats_Magic(t *testing.T) {
177178
require.Eventuallyf(t,func()bool {
178179
s,ok:=<-stats
179180
t.Logf("got stats: ok=%t, ConnectionCount=%d, RxBytes=%d, TxBytes=%d, SessionCountVSCode=%d, ConnectionMedianLatencyMS=%f",
180-
ok,s.ConnectionCount,s.RxBytes,s.TxBytes,s.SessionCountVSCode,s.ConnectionMedianLatencyMS)
181+
ok,s.ConnectionCount,s.RxBytes,s.TxBytes,s.SessionCountVscode,s.ConnectionMedianLatencyMs)
181182
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&
182183
// Ensure that the connection didn't count as a "normal" SSH session.
183184
// This was a special one, so it should be labeled specially in the stats!
184-
s.SessionCountVSCode==1&&
185+
s.SessionCountVscode==1&&
185186
// Ensure that connection latency is being counted!
186187
// If it isn't, it's set to -1.
187-
s.ConnectionMedianLatencyMS>=0
188+
s.ConnectionMedianLatencyMs>=0
188189
},testutil.WaitLong,testutil.IntervalFast,
189190
"never saw stats",
190191
)
@@ -243,9 +244,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
243244
require.Eventuallyf(t,func()bool {
244245
s,ok:=<-stats
245246
t.Logf("got stats with conn open: ok=%t, ConnectionCount=%d, SessionCountJetBrains=%d",
246-
ok,s.ConnectionCount,s.SessionCountJetBrains)
247+
ok,s.ConnectionCount,s.SessionCountJetbrains)
247248
returnok&&s.ConnectionCount>0&&
248-
s.SessionCountJetBrains==1
249+
s.SessionCountJetbrains==1
249250
},testutil.WaitLong,testutil.IntervalFast,
250251
"never saw stats with conn open",
251252
)
@@ -258,9 +259,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
258259
require.Eventuallyf(t,func()bool {
259260
s,ok:=<-stats
260261
t.Logf("got stats after disconnect %t, %d",
261-
ok,s.SessionCountJetBrains)
262+
ok,s.SessionCountJetbrains)
262263
returnok&&
263-
s.SessionCountJetBrains==0
264+
s.SessionCountJetbrains==0
264265
},testutil.WaitLong,testutil.IntervalFast,
265266
"never saw stats after conn closes",
266267
)
@@ -1346,7 +1347,7 @@ func TestAgent_Lifecycle(t *testing.T) {
13461347
RunOnStop:true,
13471348
}},
13481349
},
1349-
make(chan*agentsdk.Stats,50),
1350+
make(chan*proto.Stats,50),
13501351
tailnet.NewCoordinator(logger),
13511352
)
13521353
deferclient.Close()
@@ -1667,7 +1668,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
16671668
_=coordinator.Close()
16681669
})
16691670
agentID:=uuid.New()
1670-
statsCh:=make(chan*agentsdk.Stats,50)
1671+
statsCh:=make(chan*proto.Stats,50)
16711672
fs:=afero.NewMemMapFs()
16721673
client:=agenttest.NewClient(t,
16731674
logger.Named("agent"),
@@ -1816,7 +1817,7 @@ func TestAgent_Reconnect(t *testing.T) {
18161817
defercoordinator.Close()
18171818

18181819
agentID:=uuid.New()
1819-
statsCh:=make(chan*agentsdk.Stats,50)
1820+
statsCh:=make(chan*proto.Stats,50)
18201821
derpMap,_:=tailnettest.RunDERPAndSTUN(t)
18211822
client:=agenttest.NewClient(t,
18221823
logger,
@@ -1861,7 +1862,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
18611862
GitAuthConfigs:1,
18621863
DERPMap:&tailcfg.DERPMap{},
18631864
},
1864-
make(chan*agentsdk.Stats,50),
1865+
make(chan*proto.Stats,50),
18651866
coordinator,
18661867
)
18671868
deferclient.Close()
@@ -2018,7 +2019,7 @@ func setupSSHSession(
20182019
funcsetupAgent(t*testing.T,metadata agentsdk.Manifest,ptyTimeout time.Duration,opts...func(*agenttest.Client,*agent.Options)) (
20192020
*codersdk.WorkspaceAgentConn,
20202021
*agenttest.Client,
2021-
<-chan*agentsdk.Stats,
2022+
<-chan*proto.Stats,
20222023
afero.Fs,
20232024
agent.Agent,
20242025
) {
@@ -2046,7 +2047,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
20462047
t.Cleanup(func() {
20472048
_=coordinator.Close()
20482049
})
2049-
statsCh:=make(chan*agentsdk.Stats,50)
2050+
statsCh:=make(chan*proto.Stats,50)
20502051
fs:=afero.NewMemMapFs()
20512052
c:=agenttest.NewClient(t,logger.Named("agent"),metadata.AgentID,metadata,statsCh,coordinator)
20522053
t.Cleanup(c.Close)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp