Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: integrate agentAPI with resources monitoring logic#16438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
defelmnq merged 32 commits intomainfromagent_res_mon
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
32 commits
Select commitHold shift + click to select a range
9d42cad
work on new agent version
defelmnqFeb 4, 2025
7b2d19e
improve function for resources monitoring
defelmnqFeb 10, 2025
0124d60
add missing files
defelmnqFeb 10, 2025
3661e8c
work on resources monitor tests
defelmnqFeb 11, 2025
a5a788e
apply fmt and lint
defelmnqFeb 11, 2025
91d1515
work on dbauthz tests
defelmnqFeb 11, 2025
0bc7632
work on dbauthz
defelmnqFeb 11, 2025
3085041
work on rbac
defelmnqFeb 11, 2025
120a37b
continue to iterate
defelmnqFeb 11, 2025
dd8ed40
continue to iterate
defelmnqFeb 11, 2025
0a8941b
continue to iterate
defelmnqFeb 11, 2025
f3388b4
work on tests
defelmnqFeb 11, 2025
523f6fd
improve testing
defelmnqFeb 11, 2025
06adbf7
improve error messages
defelmnqFeb 11, 2025
c7b03d0
rework architecture of resources monitor
defelmnqFeb 12, 2025
2c3d171
improve resourcesmonitor struct
defelmnqFeb 12, 2025
18b65e0
improve resourcesmonitor struct
defelmnqFeb 12, 2025
c95b05a
change proto payload for get resources monitoring config
defelmnqFeb 12, 2025
c79b6cb
change proto payload for get resources monitoring config
defelmnqFeb 12, 2025
b28d4fa
rework fetcher and tests
defelmnqFeb 13, 2025
7701624
fix tests
defelmnqFeb 13, 2025
5fad903
fix tests
defelmnqFeb 13, 2025
b611ae5
fix tests
defelmnqFeb 13, 2025
3c65b8a
fix logic
defelmnqFeb 13, 2025
63c5869
improve testing fetcher and rename struct
defelmnqFeb 13, 2025
2d3eeb5
lint
defelmnqFeb 13, 2025
e17aafc
work on dbauthz
defelmnqFeb 13, 2025
c5a4201
improve dbauthz for fetching
defelmnqFeb 13, 2025
262a672
change dbauthz permissions
defelmnqFeb 13, 2025
dbca96e
finalise tests
defelmnqFeb 13, 2025
3145eab
fix comments from github
defelmnqFeb 13, 2025
3bec324
add collectedAt
defelmnqFeb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletionsagent/agent.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -37,15 +37,18 @@ import (
"github.com/coder/coder/v2/agent/agentscripts"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
"github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/clistat"
"github.com/coder/coder/v2/cli/gitauth"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/quartz"
"github.com/coder/retry"
)

Expand DownExpand Up@@ -85,8 +88,8 @@ type Options struct {
}

type Client interface {
ConnectRPC23(ctx context.Context) (
proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
ConnectRPC24(ctx context.Context) (
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient24, error,
)
RewriteDERPMap(derpMap *tailcfg.DERPMap)
}
Expand DownExpand Up@@ -399,7 +402,7 @@ func (t *trySingleflight) Do(key string, fn func()) {
fn()
}

func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
tickerDone := make(chan struct{})
collectDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
Expand DownExpand Up@@ -615,7 +618,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23

// reportLifecycle reports the current lifecycle state once. All state
// changes are reported in order.
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
for {
select {
case <-a.lifecycleUpdate:
Expand DownExpand Up@@ -697,7 +700,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
// fetchServiceBannerLoop fetches the service banner on an interval. It will
// not be fetched immediately; the expectation is that it is primed elsewhere
// (and must be done before the session actually starts).
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
ticker := time.NewTicker(a.announcementBannersRefreshInterval)
defer ticker.Stop()
for {
Expand DownExpand Up@@ -733,7 +736,7 @@ func (a *agent) run() (retErr error) {
a.sessionToken.Store(&sessionToken)

// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx)
aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx)
if err != nil {
return err
}
Expand All@@ -750,7 +753,7 @@ func (a *agent) run() (retErr error) {
connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI)

connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{})
if err != nil {
return xerrors.Errorf("fetch service banner: %w", err)
Expand All@@ -767,7 +770,7 @@ func (a *agent) run() (retErr error) {
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
// shutdown scripts.
connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
err := a.logSender.SendLoop(ctx, aAPI)
if xerrors.Is(err, agentsdk.LogLimitExceededError) {
// we don't want this error to tear down the API connection and propagate to the
Expand All@@ -785,6 +788,25 @@ func (a *agent) run() (retErr error) {
// metadata reporting can cease as soon as we start gracefully shutting down
connMan.startAgentAPI("report metadata", gracefulShutdownBehaviorStop, a.reportMetadata)

// resources monitor can cease as soon as we start gracefully shutting down.
connMan.startAgentAPI("resources monitor", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
logger := a.logger.Named("resources_monitor")
clk := quartz.NewReal()
config, err := aAPI.GetResourcesMonitoringConfiguration(ctx, &proto.GetResourcesMonitoringConfigurationRequest{})
if err != nil {
return xerrors.Errorf("failed to get resources monitoring configuration: %w", err)
}

statfetcher, err := clistat.New()
if err != nil {
return xerrors.Errorf("failed to create resources fetcher: %w", err)
}
resourcesFetcher := resourcesmonitor.NewFetcher(statfetcher)

resourcesmonitor := resourcesmonitor.NewResourcesMonitor(logger, clk, config, resourcesFetcher, aAPI)
return resourcesmonitor.Start(ctx)
})

// channels to sync goroutines below
// handle manifest
// |
Expand All@@ -807,7 +829,7 @@ func (a *agent) run() (retErr error) {
connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK))

connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop,
func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
Expand All@@ -822,7 +844,7 @@ func (a *agent) run() (retErr error) {
a.createOrUpdateNetwork(manifestOK, networkOK))

connMan.startTailnetAPI("coordination", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
Expand All@@ -831,7 +853,7 @@ func (a *agent) run() (retErr error) {
)

connMan.startTailnetAPI("derp map subscriber", gracefulShutdownBehaviorStop,
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient23) error {
func(ctx context.Context, tAPI tailnetproto.DRPCTailnetClient24) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
Expand All@@ -840,7 +862,7 @@ func (a *agent) run() (retErr error) {

connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop)

connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
if err := networkOK.wait(ctx); err != nil {
return xerrors.Errorf("no network: %w", err)
}
Expand All@@ -851,8 +873,8 @@ func (a *agent) run() (retErr error) {
}

// handleManifest returns a function that fetches and processes the manifest
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error {
func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error {
var (
sentResult = false
err error
Expand DownExpand Up@@ -961,8 +983,8 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,

// createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates
// the tailnet using the information in the manifest
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error {
return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) {
func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient24) error {
return func(ctx context.Context, _ proto.DRPCAgentClient24) (retErr error) {
if err := manifestOK.wait(ctx); err != nil {
return xerrors.Errorf("no manifest: %w", err)
}
Expand DownExpand Up@@ -1266,7 +1288,7 @@ func (a *agent) createTailnet(ctx context.Context, agentID uuid.UUID, derpMap *t

// runCoordinator runs a coordinator and returns whether a reconnect
// should occur.
func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from coordination RPC")
// we run the RPC on the hardCtx so that we have a chance to send the disconnect message if we
// gracefully shut down.
Expand DownExpand Up@@ -1313,7 +1335,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
}

// runDERPMapSubscriber runs a coordinator and returns if a reconnect should occur.
func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient23, network *tailnet.Conn) error {
func (a *agent) runDERPMapSubscriber(ctx context.Context, tClient tailnetproto.DRPCTailnetClient24, network *tailnet.Conn) error {
defer a.logger.Debug(ctx, "disconnected from derp map RPC")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand DownExpand Up@@ -1683,16 +1705,16 @@ const (

type apiConnRoutineManager struct {
logger slog.Logger
aAPI proto.DRPCAgentClient23
tAPI tailnetproto.DRPCTailnetClient23
aAPI proto.DRPCAgentClient24
tAPI tailnetproto.DRPCTailnetClient24
eg *errgroup.Group
stopCtx context.Context
remainCtx context.Context
}

func newAPIConnRoutineManager(
gracefulCtx, hardCtx context.Context, logger slog.Logger,
aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23,
aAPI proto.DRPCAgentClient24, tAPI tailnetproto.DRPCTailnetClient24,
) *apiConnRoutineManager {
// routines that remain in operation during graceful shutdown use the remainCtx. They'll still
// exit if the errgroup hits an error, which usually means a problem with the conn.
Expand DownExpand Up@@ -1725,7 +1747,7 @@ func newAPIConnRoutineManager(
// but for Tailnet.
func (a *apiConnRoutineManager) startAgentAPI(
name string, behavior gracefulShutdownBehavior,
f func(context.Context, proto.DRPCAgentClient23) error,
f func(context.Context, proto.DRPCAgentClient24) error,
) {
logger := a.logger.With(slog.F("name", name))
var ctx context.Context
Expand DownExpand Up@@ -1762,7 +1784,7 @@ func (a *apiConnRoutineManager) startAgentAPI(
// but for the Agent API.
func (a *apiConnRoutineManager) startTailnetAPI(
name string, behavior gracefulShutdownBehavior,
f func(context.Context, tailnetproto.DRPCTailnetClient23) error,
f func(context.Context, tailnetproto.DRPCTailnetClient24) error,
) {
logger := a.logger.With(slog.F("name", name))
var ctx context.Context
Expand Down
35 changes: 32 additions & 3 deletionsagent/agenttest/client.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -96,8 +96,8 @@ func (c *Client) Close() {
c.derpMapOnce.Do(func() { close(c.derpMapUpdates) })
}

func (c *Client)ConnectRPC23(ctx context.Context) (
agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error,
func (c *Client)ConnectRPC24(ctx context.Context) (
agentproto.DRPCAgentClient24, proto.DRPCTailnetClient24, error,
) {
conn, lis := drpcsdk.MemTransportPipe()
c.LastWorkspaceAgent = func() {
Expand DownExpand Up@@ -171,7 +171,9 @@ type FakeAgentAPI struct {
metadata map[string]agentsdk.Metadata
timings []*agentproto.Timing

getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
getAnnouncementBannersFunc func() ([]codersdk.BannerConfig, error)
getResourcesMonitoringConfigurationFunc func() (*agentproto.GetResourcesMonitoringConfigurationResponse, error)
pushResourcesMonitoringUsageFunc func(*agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error)
}

func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
Expand DownExpand Up@@ -212,6 +214,33 @@ func (f *FakeAgentAPI) GetAnnouncementBanners(context.Context, *agentproto.GetAn
return &agentproto.GetAnnouncementBannersResponse{AnnouncementBanners: bannersProto}, nil
}

func (f *FakeAgentAPI) GetResourcesMonitoringConfiguration(_ context.Context, _ *agentproto.GetResourcesMonitoringConfigurationRequest) (*agentproto.GetResourcesMonitoringConfigurationResponse, error) {
f.Lock()
defer f.Unlock()

if f.getResourcesMonitoringConfigurationFunc == nil {
return &agentproto.GetResourcesMonitoringConfigurationResponse{
Config: &agentproto.GetResourcesMonitoringConfigurationResponse_Config{
CollectionIntervalSeconds: 10,
NumDatapoints: 20,
},
}, nil
}

return f.getResourcesMonitoringConfigurationFunc()
}

func (f *FakeAgentAPI) PushResourcesMonitoringUsage(_ context.Context, req *agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error) {
f.Lock()
defer f.Unlock()

if f.pushResourcesMonitoringUsageFunc == nil {
return &agentproto.PushResourcesMonitoringUsageResponse{}, nil
}

return f.pushResourcesMonitoringUsageFunc(req)
}

func (f *FakeAgentAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsRequest) (*agentproto.UpdateStatsResponse, error) {
f.logger.Debug(ctx, "update stats called", slog.F("req", req))
// empty request is sent to get the interval; but our tests don't want empty stats requests
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp