Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: change agent to use v2 API for reporting stats#12024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/10534-report-stats
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 82 additions & 109 deletionsagent/agent.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -89,7 +89,6 @@ type Options struct {

typeClientinterface {
ConnectRPC(ctx context.Context) (drpc.Conn,error)
ReportStats(ctx context.Context,log slog.Logger,statsChan<-chan*agentsdk.Stats,setIntervalfunc(time.Duration)) (io.Closer,error)
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
PostMetadata(ctx context.Context,req agentsdk.PostMetadataRequest)error
PatchLogs(ctx context.Context,req agentsdk.PatchLogs)error
Expand DownExpand Up@@ -158,7 +157,6 @@ func New(options Options) Agent {
lifecycleStates: []agentsdk.PostLifecycleRequest{{State:codersdk.WorkspaceAgentLifecycleCreated}},
ignorePorts:options.IgnorePorts,
portCacheDuration:options.PortCacheDuration,
connStatsChan:make(chan*agentsdk.Stats,1),
reportMetadataInterval:options.ReportMetadataInterval,
serviceBannerRefreshInterval:options.ServiceBannerRefreshInterval,
sshMaxTimeout:options.SSHMaxTimeout,
Expand DownExpand Up@@ -216,8 +214,7 @@ type agent struct {

network*tailnet.Conn
addresses []netip.Prefix
connStatsChanchan*agentsdk.Stats
latestStat atomic.Pointer[agentsdk.Stats]
statsReporter*statsReporter

connCountReconnectingPTY atomic.Int64

Expand DownExpand Up@@ -822,14 +819,13 @@ func (a *agent) run(ctx context.Context) error {
closed:=a.isClosed()
if!closed {
a.network=network
a.statsReporter=newStatsReporter(a.logger,network,a)
}
a.closeMutex.Unlock()
ifclosed {
_=network.Close()
returnxerrors.New("agent is closed")
}

a.startReportingConnectionStats(ctx)
}else {
// Update the wireguard IPs if the agent ID changed.
err:=network.SetAddresses(a.wireguardAddresses(manifest.AgentID))
Expand DownExpand Up@@ -871,6 +867,15 @@ func (a *agent) run(ctx context.Context) error {
returnnil
})

eg.Go(func()error {
a.logger.Debug(egCtx,"running stats report loop")
err:=a.statsReporter.reportLoop(egCtx,aAPI)
iferr!=nil {
returnxerrors.Errorf("report stats loop: %w",err)
}
returnnil
})

returneg.Wait()
}

Expand DownExpand Up@@ -1218,115 +1223,83 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
returnrpty.Attach(ctx,connectionID,conn,msg.Height,msg.Width,connLogger)
}

// startReportingConnectionStats runs the connection stats reporting goroutine.
func (a*agent)startReportingConnectionStats(ctx context.Context) {
reportStats:=func(networkStatsmap[netlogtype.Connection]netlogtype.Counts) {
a.logger.Debug(ctx,"computing stats report")
stats:=&agentsdk.Stats{
ConnectionCount:int64(len(networkStats)),
ConnectionsByProto:map[string]int64{},
}
forconn,counts:=rangenetworkStats {
stats.ConnectionsByProto[conn.Proto.String()]++
stats.RxBytes+=int64(counts.RxBytes)
stats.RxPackets+=int64(counts.RxPackets)
stats.TxBytes+=int64(counts.TxBytes)
stats.TxPackets+=int64(counts.TxPackets)
}

// The count of active sessions.
sshStats:=a.sshServer.ConnStats()
stats.SessionCountSSH=sshStats.Sessions
stats.SessionCountVSCode=sshStats.VSCode
stats.SessionCountJetBrains=sshStats.JetBrains

stats.SessionCountReconnectingPTY=a.connCountReconnectingPTY.Load()

// Compute the median connection latency!
a.logger.Debug(ctx,"starting peer latency measurement for stats")
varwg sync.WaitGroup
varmu sync.Mutex
status:=a.network.Status()
durations:= []float64{}
pingCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
defercancelFunc()
fornodeID,peer:=rangestatus.Peer {
if!peer.Active {
continue
}
addresses,found:=a.network.NodeAddresses(nodeID)
if!found {
continue
}
iflen(addresses)==0 {
continue
}
wg.Add(1)
gofunc() {
deferwg.Done()
duration,_,_,err:=a.network.Ping(pingCtx,addresses[0].Addr())
iferr!=nil {
return
}
mu.Lock()
durations=append(durations,float64(duration.Microseconds()))
mu.Unlock()
}()
// Collect collects additional stats from the agent
func (a*agent)Collect(ctx context.Context,networkStatsmap[netlogtype.Connection]netlogtype.Counts)*proto.Stats {
a.logger.Debug(context.Background(),"computing stats report")
stats:=&proto.Stats{
ConnectionCount:int64(len(networkStats)),
ConnectionsByProto:map[string]int64{},
}
forconn,counts:=rangenetworkStats {
stats.ConnectionsByProto[conn.Proto.String()]++
stats.RxBytes+=int64(counts.RxBytes)
stats.RxPackets+=int64(counts.RxPackets)
stats.TxBytes+=int64(counts.TxBytes)
stats.TxPackets+=int64(counts.TxPackets)
}

// The count of active sessions.
sshStats:=a.sshServer.ConnStats()
stats.SessionCountSsh=sshStats.Sessions
stats.SessionCountVscode=sshStats.VSCode
stats.SessionCountJetbrains=sshStats.JetBrains

stats.SessionCountReconnectingPty=a.connCountReconnectingPTY.Load()

// Compute the median connection latency!
a.logger.Debug(ctx,"starting peer latency measurement for stats")
varwg sync.WaitGroup
varmu sync.Mutex
status:=a.network.Status()
durations:= []float64{}
pingCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
defercancelFunc()
fornodeID,peer:=rangestatus.Peer {
if!peer.Active {
continue
}
wg.Wait()
sort.Float64s(durations)
durationsLength:=len(durations)
ifdurationsLength==0 {
stats.ConnectionMedianLatencyMS=-1
}elseifdurationsLength%2==0 {
stats.ConnectionMedianLatencyMS= (durations[durationsLength/2-1]+durations[durationsLength/2])/2
}else {
stats.ConnectionMedianLatencyMS=durations[durationsLength/2]
addresses,found:=a.network.NodeAddresses(nodeID)
if!found {
continue
}
// Convert from microseconds to milliseconds.
stats.ConnectionMedianLatencyMS/=1000

// Collect agent metrics.
// Agent metrics are changing all the time, so there is no need to perform
// reflect.DeepEqual to see if stats should be transferred.

metricsCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
defercancelFunc()
a.logger.Debug(ctx,"collecting agent metrics for stats")
stats.Metrics=a.collectMetrics(metricsCtx)

a.latestStat.Store(stats)

a.logger.Debug(ctx,"about to send stats")
select {
casea.connStatsChan<-stats:
a.logger.Debug(ctx,"successfully sent stats")
case<-a.closed:
a.logger.Debug(ctx,"didn't send stats because we are closed")
iflen(addresses)==0 {
continue
}
wg.Add(1)
gofunc() {
deferwg.Done()
duration,_,_,err:=a.network.Ping(pingCtx,addresses[0].Addr())
iferr!=nil {
return
}
mu.Lock()
defermu.Unlock()
durations=append(durations,float64(duration.Microseconds()))
}()
}

// Report statistics from the created network.
cl,err:=a.client.ReportStats(ctx,a.logger,a.connStatsChan,func(d time.Duration) {
a.network.SetConnStatsCallback(d,2048,
func(_,_ time.Time,virtual,_map[netlogtype.Connection]netlogtype.Counts) {
reportStats(virtual)
},
)
})
iferr!=nil {
a.logger.Error(ctx,"agent failed to report stats",slog.Error(err))
wg.Wait()
sort.Float64s(durations)
durationsLength:=len(durations)
ifdurationsLength==0 {
stats.ConnectionMedianLatencyMs=-1
}elseifdurationsLength%2==0 {
stats.ConnectionMedianLatencyMs= (durations[durationsLength/2-1]+durations[durationsLength/2])/2
}else {
iferr=a.trackConnGoroutine(func() {
// This is OK because the agent never re-creates the tailnet
// and the only shutdown indicator is agent.Close().
<-a.closed
_=cl.Close()
});err!=nil {
a.logger.Debug(ctx,"report stats goroutine",slog.Error(err))
_=cl.Close()
}
stats.ConnectionMedianLatencyMs=durations[durationsLength/2]
}
// Convert from microseconds to milliseconds.
stats.ConnectionMedianLatencyMs/=1000

// Collect agent metrics.
// Agent metrics are changing all the time, so there is no need to perform
// reflect.DeepEqual to see if stats should be transferred.

metricsCtx,cancelFunc:=context.WithTimeout(ctx,5*time.Second)
defercancelFunc()
a.logger.Debug(ctx,"collecting agent metrics for stats")
stats.Metrics=a.collectMetrics(metricsCtx)

returnstats
}

varprioritizedProcs= []string{"coder agent"}
Expand Down
35 changes: 18 additions & 17 deletionsagent/agent_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -52,6 +52,7 @@ import (
"github.com/coder/coder/v2/agent/agentproc/agentproctest"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/agenttest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/pty/ptytest"
Expand DownExpand Up@@ -85,11 +86,11 @@ func TestAgent_Stats_SSH(t *testing.T) {
err=session.Shell()
require.NoError(t,err)

vars*agentsdk.Stats
vars*proto.Stats
require.Eventuallyf(t,func()bool {
varokbool
s,ok=<-stats
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountSSH==1
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountSsh==1
},testutil.WaitLong,testutil.IntervalFast,
"never saw stats: %+v",s,
)
Expand DownExpand Up@@ -118,11 +119,11 @@ func TestAgent_Stats_ReconnectingPTY(t *testing.T) {
_,err=ptyConn.Write(data)
require.NoError(t,err)

vars*agentsdk.Stats
vars*proto.Stats
require.Eventuallyf(t,func()bool {
varokbool
s,ok=<-stats
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountReconnectingPTY==1
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&s.SessionCountReconnectingPty==1
},testutil.WaitLong,testutil.IntervalFast,
"never saw stats: %+v",s,
)
Expand DownExpand Up@@ -177,14 +178,14 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t,func()bool {
s,ok:=<-stats
t.Logf("got stats: ok=%t, ConnectionCount=%d, RxBytes=%d, TxBytes=%d, SessionCountVSCode=%d, ConnectionMedianLatencyMS=%f",
ok,s.ConnectionCount,s.RxBytes,s.TxBytes,s.SessionCountVSCode,s.ConnectionMedianLatencyMS)
ok,s.ConnectionCount,s.RxBytes,s.TxBytes,s.SessionCountVscode,s.ConnectionMedianLatencyMs)
Comment on lines -180 to +181
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

VSCode vs Vscode
PTY vs Pty

are these renamed on purpose?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The SDK type fields were written by humans, the proto type fields are machine translated from lowercase_with_underscores so it doesn't get capitalization correct in all cases. Unfortunate from a style perspective, but I haven't chased down how to fix it yet.

mtojek reacted with thumbs up emoji
returnok&&s.ConnectionCount>0&&s.RxBytes>0&&s.TxBytes>0&&
// Ensure that the connection didn't count as a "normal" SSH session.
// This was a special one, so it should be labeled specially in the stats!
s.SessionCountVSCode==1&&
s.SessionCountVscode==1&&
// Ensure that connection latency is being counted!
// If it isn't, it's set to -1.
s.ConnectionMedianLatencyMS>=0
s.ConnectionMedianLatencyMs>=0
},testutil.WaitLong,testutil.IntervalFast,
"never saw stats",
)
Expand DownExpand Up@@ -243,9 +244,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t,func()bool {
s,ok:=<-stats
t.Logf("got stats with conn open: ok=%t, ConnectionCount=%d, SessionCountJetBrains=%d",
ok,s.ConnectionCount,s.SessionCountJetBrains)
ok,s.ConnectionCount,s.SessionCountJetbrains)
returnok&&s.ConnectionCount>0&&
s.SessionCountJetBrains==1
s.SessionCountJetbrains==1
},testutil.WaitLong,testutil.IntervalFast,
"never saw stats with conn open",
)
Expand All@@ -258,9 +259,9 @@ func TestAgent_Stats_Magic(t *testing.T) {
require.Eventuallyf(t,func()bool {
s,ok:=<-stats
t.Logf("got stats after disconnect %t, %d",
ok,s.SessionCountJetBrains)
ok,s.SessionCountJetbrains)
returnok&&
s.SessionCountJetBrains==0
s.SessionCountJetbrains==0
},testutil.WaitLong,testutil.IntervalFast,
"never saw stats after conn closes",
)
Expand DownExpand Up@@ -1346,7 +1347,7 @@ func TestAgent_Lifecycle(t *testing.T) {
RunOnStop:true,
}},
},
make(chan*agentsdk.Stats,50),
make(chan*proto.Stats,50),
tailnet.NewCoordinator(logger),
)
deferclient.Close()
Expand DownExpand Up@@ -1667,7 +1668,7 @@ func TestAgent_UpdatedDERP(t *testing.T) {
_=coordinator.Close()
})
agentID:=uuid.New()
statsCh:=make(chan*agentsdk.Stats,50)
statsCh:=make(chan*proto.Stats,50)
fs:=afero.NewMemMapFs()
client:=agenttest.NewClient(t,
logger.Named("agent"),
Expand DownExpand Up@@ -1816,7 +1817,7 @@ func TestAgent_Reconnect(t *testing.T) {
defercoordinator.Close()

agentID:=uuid.New()
statsCh:=make(chan*agentsdk.Stats,50)
statsCh:=make(chan*proto.Stats,50)
derpMap,_:=tailnettest.RunDERPAndSTUN(t)
client:=agenttest.NewClient(t,
logger,
Expand DownExpand Up@@ -1861,7 +1862,7 @@ func TestAgent_WriteVSCodeConfigs(t *testing.T) {
GitAuthConfigs:1,
DERPMap:&tailcfg.DERPMap{},
},
make(chan*agentsdk.Stats,50),
make(chan*proto.Stats,50),
coordinator,
)
deferclient.Close()
Expand DownExpand Up@@ -2018,7 +2019,7 @@ func setupSSHSession(
funcsetupAgent(t*testing.T,metadata agentsdk.Manifest,ptyTimeout time.Duration,opts...func(*agenttest.Client,*agent.Options)) (
*codersdk.WorkspaceAgentConn,
*agenttest.Client,
<-chan*agentsdk.Stats,
<-chan*proto.Stats,
afero.Fs,
agent.Agent,
) {
Expand DownExpand Up@@ -2046,7 +2047,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
t.Cleanup(func() {
_=coordinator.Close()
})
statsCh:=make(chan*agentsdk.Stats,50)
statsCh:=make(chan*proto.Stats,50)
fs:=afero.NewMemMapFs()
c:=agenttest.NewClient(t,logger.Named("agent"),metadata.AgentID,metadata,statsCh,coordinator)
t.Cleanup(c.Close)
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp