Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: add statsReporter for reporting stats on agent v2 API#11920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/10534-stats-reporter
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletionsagent/stats.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
package agent

import (
"context"
"sync"
"time"

"golang.org/x/xerrors"
"tailscale.com/types/netlogtype"

"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
)

const maxConns = 2048

type networkStatsSource interface {
SetConnStatsCallback(maxPeriod time.Duration, maxConns int, dump func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts))
}

type statsCollector interface {
Collect(ctx context.Context, networkStats map[netlogtype.Connection]netlogtype.Counts) *proto.Stats
}

type statsDest interface {
UpdateStats(ctx context.Context, req *proto.UpdateStatsRequest) (*proto.UpdateStatsResponse, error)
}

// statsReporter is a subcomponent of the agent that handles registering the stats callback on the
// networkStatsSource (tailnet.Conn in prod), handling the callback, calling back to the
// statsCollector (agent in prod) to collect additional stats, then sending the update to the
// statsDest (agent API in prod)
type statsReporter struct {
*sync.Cond
networkStats *map[netlogtype.Connection]netlogtype.Counts
unreported bool
lastInterval time.Duration

source networkStatsSource
collector statsCollector
logger slog.Logger
}

func newStatsReporter(logger slog.Logger, source networkStatsSource, collector statsCollector) *statsReporter {
return &statsReporter{
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
source: source,
collector: collector,
}
}

func (s *statsReporter) callback(_, _ time.Time, virtual, _ map[netlogtype.Connection]netlogtype.Counts) {
s.L.Lock()
defer s.L.Unlock()
s.logger.Debug(context.Background(), "got stats callback")
s.networkStats = &virtual
s.unreported = true
s.Broadcast()
}

// reportLoop programs the source (tailnet.Conn) to send it stats via the
// callback, then reports them to the dest.
//
// It's intended to be called within the larger retry loop that establishes a
// connection to the agent API, then passes that connection to go routines like
// this that use it. There is no retry and we fail on the first error since
// this will be inside a larger retry loop.
func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
// send an initial, blank report to get the interval
resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{})
if err != nil {
return xerrors.Errorf("initial update: %w", err)
}
s.lastInterval = resp.ReportInterval.AsDuration()
s.source.SetConnStatsCallback(s.lastInterval, maxConns, s.callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Would it make sense to also unset the callback on exit? Mostly a safety precaution in casecallback ever has code that may block without a consumer.


// use a separate goroutine to monitor the context so that we notice immediately, rather than
// waiting for the next callback (which might never come if we are closing!)
ctxDone := false
go func() {
<-ctx.Done()
s.L.Lock()
defer s.L.Unlock()
ctxDone = true
s.Broadcast()
}()
defer s.logger.Debug(ctx, "reportLoop exiting")

s.L.Lock()
defer s.L.Unlock()
for {
for !s.unreported && !ctxDone {
s.Wait()
}
if ctxDone {
return nil
}
networkStats := *s.networkStats
s.unreported = false
if err = s.reportLocked(ctx, dest, networkStats); err != nil {
return xerrors.Errorf("report stats: %w", err)
}
}
}

func (s *statsReporter) reportLocked(
ctx context.Context, dest statsDest, networkStats map[netlogtype.Connection]netlogtype.Counts,
) error {
// here we want to do our collecting/reporting while it is unlocked, but then relock
// when we return to reportLoop.
s.L.Unlock()
defer s.L.Lock()
stats := s.collector.Collect(ctx, networkStats)
resp, err := dest.UpdateStats(ctx, &proto.UpdateStatsRequest{Stats: stats})
if err != nil {
return err
}
interval := resp.GetReportInterval().AsDuration()
if interval != s.lastInterval {
s.logger.Info(ctx, "new stats report interval", slog.F("interval", interval))
s.lastInterval = interval
s.source.SetConnStatsCallback(s.lastInterval, maxConns, s.callback)
}
return nil
}
212 changes: 212 additions & 0 deletionsagent/stats_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
package agent

import (
"context"
"net/netip"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"tailscale.com/types/ipproto"

"tailscale.com/types/netlogtype"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/testutil"
)

funcTestStatsReporter(t*testing.T) {
t.Parallel()
ctx:=testutil.Context(t,testutil.WaitShort)
logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug)
fSource:=newFakeNetworkStatsSource(ctx,t)
fCollector:=newFakeCollector(t)
fDest:=newFakeStatsDest()
uut:=newStatsReporter(logger,fSource,fCollector)

loopErr:=make(chanerror,1)
loopCtx,loopCancel:=context.WithCancel(ctx)
gofunc() {
err:=uut.reportLoop(loopCtx,fDest)
loopErr<-err
}()

// initial request to get duration
req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
require.NotNil(t,req)
require.Nil(t,req.Stats)
interval:=time.Second*34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is there a special meaning to this magic value?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I just wanted something that's not likely to be some internal constant and proves that we actually use the interval in the response.

testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.UpdateStatsResponse{ReportInterval:durationpb.New(interval)})

// call to source to set the callback and interval
gotInterval:=testutil.RequireRecvCtx(ctx,t,fSource.period)
require.Equal(t,interval,gotInterval)

// callback returning netstats
netStats:=map[netlogtype.Connection]netlogtype.Counts{
{
Proto:ipproto.TCP,
Src:netip.MustParseAddrPort("192.168.1.33:4887"),
Dst:netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets:22,
TxBytes:23,
RxPackets:24,
RxBytes:25,
},
}
fSource.callback(time.Now(),time.Now(),netStats,nil)

// collector called to complete the stats
gotNetStats:=testutil.RequireRecvCtx(ctx,t,fCollector.calls)
require.Equal(t,netStats,gotNetStats)

// while we are collecting the stats, send in two new netStats to simulate
// what happens if we don't keep up. Only the latest should be kept.
netStats0:=map[netlogtype.Connection]netlogtype.Counts{
{
Proto:ipproto.TCP,
Src:netip.MustParseAddrPort("192.168.1.33:4887"),
Dst:netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets:10,
TxBytes:10,
RxPackets:10,
RxBytes:10,
},
}
fSource.callback(time.Now(),time.Now(),netStats0,nil)
netStats1:=map[netlogtype.Connection]netlogtype.Counts{
{
Proto:ipproto.TCP,
Src:netip.MustParseAddrPort("192.168.1.33:4887"),
Dst:netip.MustParseAddrPort("192.168.2.99:9999"),
}: {
TxPackets:11,
TxBytes:11,
RxPackets:11,
RxBytes:11,
},
}
fSource.callback(time.Now(),time.Now(),netStats1,nil)

// complete first collection
stats:=&proto.Stats{SessionCountJetbrains:55}
testutil.RequireSendCtx(ctx,t,fCollector.stats,stats)

// destination called to report the first stats
update:=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
require.NotNil(t,update)
require.Equal(t,stats,update.Stats)
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.UpdateStatsResponse{ReportInterval:durationpb.New(interval)})

// second update -- only netStats1 is reported
gotNetStats=testutil.RequireRecvCtx(ctx,t,fCollector.calls)
require.Equal(t,netStats1,gotNetStats)
stats=&proto.Stats{SessionCountJetbrains:66}
testutil.RequireSendCtx(ctx,t,fCollector.stats,stats)
update=testutil.RequireRecvCtx(ctx,t,fDest.reqs)
require.NotNil(t,update)
require.Equal(t,stats,update.Stats)
interval2:=27*time.Second
testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.UpdateStatsResponse{ReportInterval:durationpb.New(interval2)})

// set the new interval
gotInterval=testutil.RequireRecvCtx(ctx,t,fSource.period)
require.Equal(t,interval2,gotInterval)

loopCancel()
err:=testutil.RequireRecvCtx(ctx,t,loopErr)
require.NoError(t,err)
}

typefakeNetworkStatsSourcestruct {
sync.Mutex
ctx context.Context
t testing.TB
callbackfunc(start,end time.Time,virtual,physicalmap[netlogtype.Connection]netlogtype.Counts)
periodchan time.Duration
}

func (f*fakeNetworkStatsSource)SetConnStatsCallback(maxPeriod time.Duration,_int,dumpfunc(start time.Time,end time.Time,virtualmap[netlogtype.Connection]netlogtype.Counts,physicalmap[netlogtype.Connection]netlogtype.Counts)) {
f.Lock()
deferf.Unlock()
f.callback=dump
select {
case<-f.ctx.Done():
f.t.Error("timeout")
casef.period<-maxPeriod:
// OK
}
}

funcnewFakeNetworkStatsSource(ctx context.Context,t testing.TB)*fakeNetworkStatsSource {
f:=&fakeNetworkStatsSource{
ctx:ctx,
t:t,
period:make(chan time.Duration),
}
returnf
}

typefakeCollectorstruct {
t testing.TB
callschanmap[netlogtype.Connection]netlogtype.Counts
statschan*proto.Stats
}

func (f*fakeCollector)Collect(ctx context.Context,networkStatsmap[netlogtype.Connection]netlogtype.Counts)*proto.Stats {
select {
case<-ctx.Done():
f.t.Error("timeout on collect")
returnnil
casef.calls<-networkStats:
// ok
}
select {
case<-ctx.Done():
f.t.Error("timeout on collect")
returnnil
cases:=<-f.stats:
returns
}
}

funcnewFakeCollector(t testing.TB)*fakeCollector {
return&fakeCollector{
t:t,
calls:make(chanmap[netlogtype.Connection]netlogtype.Counts),
stats:make(chan*proto.Stats),
}
}

typefakeStatsDeststruct {
reqschan*proto.UpdateStatsRequest
respschan*proto.UpdateStatsResponse
}

func (f*fakeStatsDest)UpdateStats(ctx context.Context,req*proto.UpdateStatsRequest) (*proto.UpdateStatsResponse,error) {
select {
case<-ctx.Done():
returnnil,ctx.Err()
casef.reqs<-req:
// OK
}
select {
case<-ctx.Done():
returnnil,ctx.Err()
caseresp:=<-f.resps:
returnresp,nil
}
}

funcnewFakeStatsDest()*fakeStatsDest {
return&fakeStatsDest{
reqs:make(chan*proto.UpdateStatsRequest),
resps:make(chan*proto.UpdateStatsResponse),
}
}

[8]ページ先頭

©2009-2025 Movatter.jp