Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6c94dd4

Browse files
authored
chore: add DRPC server implementation for network telemetry (#13675)
1 parent2fde054 commit6c94dd4

File tree

14 files changed

+1190
-555
lines changed

14 files changed

+1190
-555
lines changed

‎coderd/agentapi/api.go‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/coder/coder/v2/coderd/database/pubsub"
2323
"github.com/coder/coder/v2/coderd/externalauth"
2424
"github.com/coder/coder/v2/coderd/prometheusmetrics"
25-
"github.com/coder/coder/v2/coderd/schedule"
2625
"github.com/coder/coder/v2/coderd/tracing"
2726
"github.com/coder/coder/v2/coderd/workspacestats"
2827
"github.com/coder/coder/v2/codersdk"
@@ -60,11 +59,11 @@ type Options struct {
6059
Pubsub pubsub.Pubsub
6160
DerpMapFnfunc()*tailcfg.DERPMap
6261
TailnetCoordinator*atomic.Pointer[tailnet.Coordinator]
63-
TemplateScheduleStore*atomic.Pointer[schedule.TemplateScheduleStore]
6462
StatsReporter*workspacestats.Reporter
6563
AppearanceFetcher*atomic.Pointer[appearance.Fetcher]
6664
PublishWorkspaceUpdateFnfunc(ctx context.Context,workspaceID uuid.UUID)
6765
PublishWorkspaceAgentLogsUpdateFnfunc(ctx context.Context,workspaceAgentID uuid.UUID,msg agentsdk.LogsNotifyMessage)
66+
NetworkTelemetryHandlerfunc(batch []*tailnetproto.TelemetryEvent)
6867

6968
AccessURL*url.URL
7069
AppHostnamestring
@@ -154,10 +153,11 @@ func New(opts Options) *API {
154153
}
155154

156155
api.DRPCService=&tailnet.DRPCService{
157-
CoordPtr:opts.TailnetCoordinator,
158-
Logger:opts.Log,
159-
DerpMapUpdateFrequency:opts.DerpMapUpdateFrequency,
160-
DerpMapFn:opts.DerpMapFn,
156+
CoordPtr:opts.TailnetCoordinator,
157+
Logger:opts.Log,
158+
DerpMapUpdateFrequency:opts.DerpMapUpdateFrequency,
159+
DerpMapFn:opts.DerpMapFn,
160+
NetworkTelemetryHandler:opts.NetworkTelemetryHandler,
161161
}
162162

163163
returnapi

‎coderd/coderd.go‎

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"cdr.dev/slog"
4040
agentproto"github.com/coder/coder/v2/agent/proto"
4141
"github.com/coder/coder/v2/buildinfo"
42+
"github.com/coder/coder/v2/clock"
4243
_"github.com/coder/coder/v2/coderd/apidoc"// Used for swagger docs.
4344
"github.com/coder/coder/v2/coderd/appearance"
4445
"github.com/coder/coder/v2/coderd/audit"
@@ -142,14 +143,16 @@ type Options struct {
142143
DERPServer*derp.Server
143144
// BaseDERPMap is used as the base DERP map for all clients and agents.
144145
// Proxies are added to this list.
145-
BaseDERPMap*tailcfg.DERPMap
146-
DERPMapUpdateFrequency time.Duration
147-
SwaggerEndpointbool
148-
SetUserGroupsfunc(ctx context.Context,logger slog.Logger,tx database.Store,userID uuid.UUID,orgGroupNamesmap[uuid.UUID][]string,createMissingGroupsbool)error
149-
SetUserSiteRolesfunc(ctx context.Context,logger slog.Logger,tx database.Store,userID uuid.UUID,roles []string)error
150-
TemplateScheduleStore*atomic.Pointer[schedule.TemplateScheduleStore]
151-
UserQuietHoursScheduleStore*atomic.Pointer[schedule.UserQuietHoursScheduleStore]
152-
AccessControlStore*atomic.Pointer[dbauthz.AccessControlStore]
146+
BaseDERPMap*tailcfg.DERPMap
147+
DERPMapUpdateFrequency time.Duration
148+
NetworkTelemetryBatchFrequency time.Duration
149+
NetworkTelemetryBatchMaxSizeint
150+
SwaggerEndpointbool
151+
SetUserGroupsfunc(ctx context.Context,logger slog.Logger,tx database.Store,userID uuid.UUID,orgGroupNamesmap[uuid.UUID][]string,createMissingGroupsbool)error
152+
SetUserSiteRolesfunc(ctx context.Context,logger slog.Logger,tx database.Store,userID uuid.UUID,roles []string)error
153+
TemplateScheduleStore*atomic.Pointer[schedule.TemplateScheduleStore]
154+
UserQuietHoursScheduleStore*atomic.Pointer[schedule.UserQuietHoursScheduleStore]
155+
AccessControlStore*atomic.Pointer[dbauthz.AccessControlStore]
153156
// AppSecurityKey is the crypto key used to sign and encrypt tokens related to
154157
// workspace applications. It consists of both a signing and encryption key.
155158
AppSecurityKey workspaceapps.SecurityKey
@@ -305,6 +308,12 @@ func New(options *Options) *API {
305308
ifoptions.DERPMapUpdateFrequency==0 {
306309
options.DERPMapUpdateFrequency=5*time.Second
307310
}
311+
ifoptions.NetworkTelemetryBatchFrequency==0 {
312+
options.NetworkTelemetryBatchFrequency=1*time.Minute
313+
}
314+
ifoptions.NetworkTelemetryBatchMaxSize==0 {
315+
options.NetworkTelemetryBatchMaxSize=1_000
316+
}
308317
ifoptions.TailnetCoordinator==nil {
309318
options.TailnetCoordinator=tailnet.NewCoordinator(options.Logger)
310319
}
@@ -539,12 +548,19 @@ func New(options *Options) *API {
539548
ifoptions.DeploymentValues.Prometheus.Enable {
540549
options.PrometheusRegistry.MustRegister(stn)
541550
}
542-
api.TailnetClientService,err=tailnet.NewClientService(
543-
api.Logger.Named("tailnetclient"),
544-
&api.TailnetCoordinator,
545-
api.Options.DERPMapUpdateFrequency,
546-
api.DERPMap,
551+
api.NetworkTelemetryBatcher=tailnet.NewNetworkTelemetryBatcher(
552+
clock.NewReal(),
553+
api.Options.NetworkTelemetryBatchFrequency,
554+
api.Options.NetworkTelemetryBatchMaxSize,
555+
api.handleNetworkTelemetry,
547556
)
557+
api.TailnetClientService,err=tailnet.NewClientService(tailnet.ClientServiceOptions{
558+
Logger:api.Logger.Named("tailnetclient"),
559+
CoordPtr:&api.TailnetCoordinator,
560+
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
561+
DERPMapFn:api.DERPMap,
562+
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
563+
})
548564
iferr!=nil {
549565
api.Logger.Fatal(api.ctx,"failed to initialize tailnet client service",slog.Error(err))
550566
}
@@ -1255,6 +1271,7 @@ type API struct {
12551271
Auditor atomic.Pointer[audit.Auditor]
12561272
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter)bool]
12571273
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
1274+
NetworkTelemetryBatcher*tailnet.NetworkTelemetryBatcher
12581275
TailnetClientService*tailnet.ClientService
12591276
QuotaCommitter atomic.Pointer[proto.QuotaCommitter]
12601277
AppearanceFetcher atomic.Pointer[appearance.Fetcher]
@@ -1313,7 +1330,12 @@ type API struct {
13131330

13141331
// Close waits for all WebSocket connections to drain before returning.
13151332
func (api*API)Close()error {
1316-
api.cancel()
1333+
select {
1334+
case<-api.ctx.Done():
1335+
returnxerrors.New("API already closed")
1336+
default:
1337+
api.cancel()
1338+
}
13171339
ifapi.derpCloseFunc!=nil {
13181340
api.derpCloseFunc()
13191341
}
@@ -1348,6 +1370,7 @@ func (api *API) Close() error {
13481370
}
13491371
_=api.agentProvider.Close()
13501372
_=api.statsReporter.Close()
1373+
_=api.NetworkTelemetryBatcher.Close()
13511374
returnnil
13521375
}
13531376

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp