Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7205f3c

Browse files
committed
Refactor network telemetry batching
1 parentfb9d5aa commit7205f3c

File tree

12 files changed

+228
-310
lines changed

12 files changed

+228
-310
lines changed

‎coderd/agentapi/api.go‎

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type API struct {
4646

4747
mu sync.Mutex
4848
cachedWorkspaceID uuid.UUID
49-
drpcServiceClosefunc()
5049
}
5150

5251
var_ agentproto.DRPCAgentServer=&API{}
@@ -64,18 +63,16 @@ type Options struct {
6463
AppearanceFetcher*atomic.Pointer[appearance.Fetcher]
6564
PublishWorkspaceUpdateFnfunc(ctx context.Context,workspaceID uuid.UUID)
6665
PublishWorkspaceAgentLogsUpdateFnfunc(ctx context.Context,workspaceAgentID uuid.UUID,msg agentsdk.LogsNotifyMessage)
67-
NetworkTelemetryBatchFnfunc(batch []*tailnetproto.TelemetryEvent)
68-
69-
AccessURL*url.URL
70-
AppHostnamestring
71-
AgentStatsRefreshInterval time.Duration
72-
DisableDirectConnectionsbool
73-
DerpForceWebSocketsbool
74-
DerpMapUpdateFrequency time.Duration
75-
NetworkTelemetryBatchFrequency time.Duration
76-
NetworkTelemetryBatchMaxSizeint
77-
ExternalAuthConfigs []*externalauth.Config
78-
Experiments codersdk.Experiments
66+
NetworkTelemetryHandlerfunc(batch []*tailnetproto.TelemetryEvent)
67+
68+
AccessURL*url.URL
69+
AppHostnamestring
70+
AgentStatsRefreshInterval time.Duration
71+
DisableDirectConnectionsbool
72+
DerpForceWebSocketsbool
73+
DerpMapUpdateFrequency time.Duration
74+
ExternalAuthConfigs []*externalauth.Config
75+
Experiments codersdk.Experiments
7976

8077
// Optional:
8178
// WorkspaceID avoids a future lookup to find the workspace ID by setting
@@ -156,24 +153,16 @@ func New(opts Options) *API {
156153
}
157154

158155
api.DRPCService=&tailnet.DRPCService{
159-
CoordPtr:opts.TailnetCoordinator,
160-
Logger:opts.Log,
161-
DerpMapUpdateFrequency:opts.DerpMapUpdateFrequency,
162-
DerpMapFn:opts.DerpMapFn,
163-
NetworkTelemetryBatchFrequency:opts.NetworkTelemetryBatchFrequency,
164-
NetworkTelemetryBatchMaxSize:opts.NetworkTelemetryBatchMaxSize,
165-
NetworkTelemetryBatchFn:opts.NetworkTelemetryBatchFn,
156+
CoordPtr:opts.TailnetCoordinator,
157+
Logger:opts.Log,
158+
DerpMapUpdateFrequency:opts.DerpMapUpdateFrequency,
159+
DerpMapFn:opts.DerpMapFn,
160+
NetworkTelemetryHandler:opts.NetworkTelemetryHandler,
166161
}
167-
api.drpcServiceClose=api.DRPCService.PeriodicTelemetryBatcher()
168162

169163
returnapi
170164
}
171165

172-
func (a*API)Close()error {
173-
a.drpcServiceClose()
174-
returnnil
175-
}
176-
177166
func (a*API)Server(ctx context.Context) (*drpcserver.Server,error) {
178167
mux:=drpcmux.New()
179168
err:=agentproto.DRPCRegisterAgent(mux,a)

‎coderd/agentapi/api_test.go‎

Lines changed: 0 additions & 106 deletions
This file was deleted.

‎coderd/coderd.go‎

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -547,12 +547,18 @@ func New(options *Options) *API {
547547
ifoptions.DeploymentValues.Prometheus.Enable {
548548
options.PrometheusRegistry.MustRegister(stn)
549549
}
550-
api.TailnetClientService,err=tailnet.NewClientService(
551-
api.Logger.Named("tailnetclient"),
552-
&api.TailnetCoordinator,
553-
api.Options.DERPMapUpdateFrequency,
554-
api.DERPMap,
550+
api.NetworkTelemetryBatcher=tailnet.NewNetworkTelemetryBatcher(
551+
api.Options.NetworkTelemetryBatchFrequency,
552+
api.Options.NetworkTelemetryBatchMaxSize,
553+
api.handleNetworkTelemetry,
555554
)
555+
api.TailnetClientService,err=tailnet.NewClientService(tailnet.ClientServiceOptions{
556+
Logger:api.Logger.Named("tailnetclient"),
557+
CoordPtr:&api.TailnetCoordinator,
558+
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
559+
DERPMapFn:api.DERPMap,
560+
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
561+
})
556562
iferr!=nil {
557563
api.Logger.Fatal(api.ctx,"failed to initialize tailnet client service",slog.Error(err))
558564
}
@@ -1263,6 +1269,7 @@ type API struct {
12631269
Auditor atomic.Pointer[audit.Auditor]
12641270
WorkspaceClientCoordinateOverride atomic.Pointer[func(rw http.ResponseWriter)bool]
12651271
TailnetCoordinator atomic.Pointer[tailnet.Coordinator]
1272+
NetworkTelemetryBatcher*tailnet.NetworkTelemetryBatcher
12661273
TailnetClientService*tailnet.ClientService
12671274
QuotaCommitter atomic.Pointer[proto.QuotaCommitter]
12681275
AppearanceFetcher atomic.Pointer[appearance.Fetcher]
@@ -1356,6 +1363,7 @@ func (api *API) Close() error {
13561363
}
13571364
_=api.agentProvider.Close()
13581365
_=api.statsReporter.Close()
1366+
_=api.NetworkTelemetryBatcher.Close()
13591367
returnnil
13601368
}
13611369

‎coderd/workspaceagentsrpc.go‎

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,38 +136,21 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
136136
StatsReporter:api.statsReporter,
137137
PublishWorkspaceUpdateFn:api.publishWorkspaceUpdate,
138138
PublishWorkspaceAgentLogsUpdateFn:api.publishWorkspaceAgentLogsUpdate,
139-
NetworkTelemetryBatchFn:func(batch []*tailnetproto.TelemetryEvent) {
140-
telemetryEvents:=make([]telemetry.NetworkEvent,0,len(batch))
141-
for_,pEvent:=rangebatch {
142-
tEvent,err:=telemetry.NetworkEventFromProto(pEvent)
143-
iferr!=nil {
144-
// Events that fail to be converted get discarded for now.
145-
continue
146-
}
147-
telemetryEvents=append(telemetryEvents,tEvent)
148-
}
149-
150-
api.Telemetry.Report(&telemetry.Snapshot{
151-
NetworkEvents:telemetryEvents,
152-
})
153-
},
139+
NetworkTelemetryHandler:api.NetworkTelemetryBatcher.Handler,
154140

155-
AccessURL:api.AccessURL,
156-
AppHostname:api.AppHostname,
157-
AgentStatsRefreshInterval:api.AgentStatsRefreshInterval,
158-
DisableDirectConnections:api.DeploymentValues.DERP.Config.BlockDirect.Value(),
159-
DerpForceWebSockets:api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
160-
DerpMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
161-
NetworkTelemetryBatchFrequency:api.Options.NetworkTelemetryBatchFrequency,
162-
NetworkTelemetryBatchMaxSize:api.Options.NetworkTelemetryBatchMaxSize,
163-
ExternalAuthConfigs:api.ExternalAuthConfigs,
164-
Experiments:api.Experiments,
141+
AccessURL:api.AccessURL,
142+
AppHostname:api.AppHostname,
143+
AgentStatsRefreshInterval:api.AgentStatsRefreshInterval,
144+
DisableDirectConnections:api.DeploymentValues.DERP.Config.BlockDirect.Value(),
145+
DerpForceWebSockets:api.DeploymentValues.DERP.Config.ForceWebSockets.Value(),
146+
DerpMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
147+
ExternalAuthConfigs:api.ExternalAuthConfigs,
148+
Experiments:api.Experiments,
165149

166150
// Optional:
167151
WorkspaceID:build.WorkspaceID,// saves the extra lookup later
168152
UpdateAgentMetricsFn:api.UpdateAgentMetrics,
169153
})
170-
deferagentAPI.Close()
171154

172155
streamID:= tailnet.StreamID{
173156
Name:fmt.Sprintf("%s-%s-%s",owner.Username,workspace.Name,workspaceAgent.Name),
@@ -184,6 +167,22 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
184167
}
185168
}
186169

170+
func (api*API)handleNetworkTelemetry(batch []*tailnetproto.TelemetryEvent) {
171+
telemetryEvents:=make([]telemetry.NetworkEvent,0,len(batch))
172+
for_,pEvent:=rangebatch {
173+
tEvent,err:=telemetry.NetworkEventFromProto(pEvent)
174+
iferr!=nil {
175+
// Events that fail to be converted get discarded for now.
176+
continue
177+
}
178+
telemetryEvents=append(telemetryEvents,tEvent)
179+
}
180+
181+
api.Telemetry.Report(&telemetry.Snapshot{
182+
NetworkEvents:telemetryEvents,
183+
})
184+
}
185+
187186
typeyamuxPingerCloserstruct {
188187
mux*yamux.Session
189188
}

‎codersdk/workspacesdk/connector_internal_test.go‎

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,13 @@ func TestTailnetAPIConnector_Disconnects(t *testing.T) {
5050
coordPtr.Store(&coord)
5151
derpMapCh:=make(chan*tailcfg.DERPMap)
5252
deferclose(derpMapCh)
53-
svc,err:=tailnet.NewClientService(
54-
logger,&coordPtr,
55-
time.Millisecond,func()*tailcfg.DERPMap {return<-derpMapCh },
56-
)
53+
svc,err:=tailnet.NewClientService(tailnet.ClientServiceOptions{
54+
Logger:logger,
55+
CoordPtr:&coordPtr,
56+
DERPMapUpdateFrequency:time.Millisecond,
57+
DERPMapFn:func()*tailcfg.DERPMap {return<-derpMapCh },
58+
NetworkTelemetryHandler:func(batch []*proto.TelemetryEvent) {},
59+
})
5760
require.NoError(t,err)
5861

5962
svr:=httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {

‎enterprise/coderd/coderd.go‎

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,13 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
138138
}
139139
returnapi.fetchRegions(ctx)
140140
}
141-
api.tailnetService,err=tailnet.NewClientService(
142-
api.Logger.Named("tailnetclient"),
143-
&api.AGPL.TailnetCoordinator,
144-
api.Options.DERPMapUpdateFrequency,
145-
api.AGPL.DERPMap,
146-
)
141+
api.tailnetService,err=tailnet.NewClientService(agpltailnet.ClientServiceOptions{
142+
Logger:api.Logger.Named("tailnetclient"),
143+
CoordPtr:&api.AGPL.TailnetCoordinator,
144+
DERPMapUpdateFrequency:api.Options.DERPMapUpdateFrequency,
145+
DERPMapFn:api.AGPL.DERPMap,
146+
NetworkTelemetryHandler:api.AGPL.NetworkTelemetryBatcher.Handler,
147+
})
147148
iferr!=nil {
148149
api.Logger.Fatal(api.ctx,"failed to initialize tailnet client service",slog.Error(err))
149150
}

‎enterprise/tailnet/workspaceproxy.go‎

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import (
66
"encoding/json"
77
"errors"
88
"net"
9-
"sync/atomic"
109
"time"
1110

1211
"github.com/google/uuid"
1312
"golang.org/x/xerrors"
14-
"tailscale.com/tailcfg"
1513

1614
"cdr.dev/slog"
1715
"github.com/coder/coder/v2/apiversion"
@@ -25,15 +23,14 @@ type ClientService struct {
2523

2624
// NewClientService returns a ClientService based on the given Coordinator pointer. The pointer is
2725
// loaded on each processed connection.
28-
funcNewClientService(
29-
logger slog.Logger,
30-
coordPtr*atomic.Pointer[agpl.Coordinator],
31-
derpMapUpdateFrequency time.Duration,
32-
derpMapFnfunc()*tailcfg.DERPMap,
33-
) (
34-
*ClientService,error,
35-
) {
36-
s,err:=agpl.NewClientService(logger,coordPtr,derpMapUpdateFrequency,derpMapFn)
26+
funcNewClientService(options agpl.ClientServiceOptions) (*ClientService,error) {
27+
s,err:=agpl.NewClientService(agpl.ClientServiceOptions{
28+
Logger:options.Logger,
29+
CoordPtr:options.CoordPtr,
30+
DERPMapUpdateFrequency:options.DERPMapUpdateFrequency,
31+
DERPMapFn:options.DERPMapFn,
32+
NetworkTelemetryHandler:options.NetworkTelemetryHandler,
33+
})
3734
iferr!=nil {
3835
returnnil,err
3936
}

‎enterprise/wsproxy/wsproxysdk/wsproxysdk_test.go‎

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,13 @@ func TestDialCoordinator(t *testing.T) {
171171

172172
coordPtr:= atomic.Pointer[agpl.Coordinator]{}
173173
coordPtr.Store(&coord)
174-
cSrv,err:=tailnet.NewClientService(
175-
logger,&coordPtr,
176-
time.Hour,
177-
func()*tailcfg.DERPMap {panic("not implemented") },
178-
)
174+
cSrv,err:=tailnet.NewClientService(agpl.ClientServiceOptions{
175+
Logger:logger,
176+
CoordPtr:&coordPtr,
177+
DERPMapUpdateFrequency:time.Hour,
178+
DERPMapFn:func()*tailcfg.DERPMap {panic("not implemented") },
179+
NetworkTelemetryHandler:func(batch []*proto.TelemetryEvent) {panic("not implemented") },
180+
})
179181
require.NoError(t,err)
180182

181183
// buffer the channels here, so we don't need to read and write in goroutines to

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp