Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit12e6fbf

Browse files
authored
feat(coderd/database): adddbrollup service to rollup insights (#12665)
Add `dbrollup` service that runs the `UpsertTemplateUsageStats` queryevery 5 minutes, on the minute. This allows us to have fairly real-timeinsights data when viewing "today".
1 parent04f0510 commit12e6fbf

File tree

7 files changed

+479
-5
lines changed

7 files changed

+479
-5
lines changed

‎coderd/coderd.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/coder/coder/v2/coderd/batchstats"
4848
"github.com/coder/coder/v2/coderd/database"
4949
"github.com/coder/coder/v2/coderd/database/dbauthz"
50+
"github.com/coder/coder/v2/coderd/database/dbrollup"
5051
"github.com/coder/coder/v2/coderd/database/dbtime"
5152
"github.com/coder/coder/v2/coderd/database/pubsub"
5253
"github.com/coder/coder/v2/coderd/externalauth"
@@ -192,6 +193,9 @@ type Options struct {
192193
// NewTicker is used for unit tests to replace "time.NewTicker".
193194
NewTickerfunc(duration time.Duration) (tick<-chan time.Time,donefunc())
194195

196+
// DatabaseRolluper rolls up template usage stats from raw agent and app
197+
// stats. This is used to provide insights in the WebUI.
198+
DatabaseRolluper*dbrollup.Rolluper
195199
// WorkspaceUsageTracker tracks workspace usage by the CLI.
196200
WorkspaceUsageTracker*workspaceusage.Tracker
197201
}
@@ -366,6 +370,10 @@ func New(options *Options) *API {
366370
OIDC:options.OIDCConfig,
367371
}
368372

373+
ifoptions.DatabaseRolluper==nil {
374+
options.DatabaseRolluper=dbrollup.New(options.Logger.Named("dbrollup"),options.Database)
375+
}
376+
369377
ifoptions.WorkspaceUsageTracker==nil {
370378
options.WorkspaceUsageTracker=workspaceusage.New(options.Database,
371379
workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")),
@@ -414,7 +422,9 @@ func New(options *Options) *API {
414422
ctx,
415423
options.Logger.Named("acquirer"),
416424
options.Database,
417-
options.Pubsub),
425+
options.Pubsub,
426+
),
427+
dbRolluper:options.DatabaseRolluper,
418428
workspaceUsageTracker:options.WorkspaceUsageTracker,
419429
}
420430

@@ -1197,7 +1207,9 @@ type API struct {
11971207
statsBatcher*batchstats.Batcher
11981208

11991209
Acquirer*provisionerdserver.Acquirer
1200-
1210+
// dbRolluper rolls up template usage stats from raw agent and app
1211+
// stats. This is used to provide insights in the WebUI.
1212+
dbRolluper*dbrollup.Rolluper
12011213
workspaceUsageTracker*workspaceusage.Tracker
12021214
}
12031215

@@ -1212,6 +1224,7 @@ func (api *API) Close() error {
12121224
api.WebsocketWaitGroup.Wait()
12131225
api.WebsocketWaitMutex.Unlock()
12141226

1227+
api.dbRolluper.Close()
12151228
api.metricsCache.Close()
12161229
ifapi.updateChecker!=nil {
12171230
api.updateChecker.Close()

‎coderd/coderdtest/coderdtest.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"github.com/coder/coder/v2/coderd/batchstats"
5858
"github.com/coder/coder/v2/coderd/database"
5959
"github.com/coder/coder/v2/coderd/database/dbauthz"
60+
"github.com/coder/coder/v2/coderd/database/dbrollup"
6061
"github.com/coder/coder/v2/coderd/database/dbtestutil"
6162
"github.com/coder/coder/v2/coderd/database/pubsub"
6263
"github.com/coder/coder/v2/coderd/externalauth"
@@ -147,6 +148,7 @@ type Options struct {
147148
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
148149
AllowWorkspaceRenamesbool
149150
NewTickerfunc(duration time.Duration) (<-chan time.Time,func())
151+
DatabaseRolluper*dbrollup.Rolluper
150152
WorkspaceUsageTrackerFlushchanint
151153
WorkspaceUsageTrackerTickchan time.Time
152154
}
@@ -491,6 +493,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
491493
WorkspaceAppsStatsCollectorOptions:options.WorkspaceAppsStatsCollectorOptions,
492494
AllowWorkspaceRenames:options.AllowWorkspaceRenames,
493495
NewTicker:options.NewTicker,
496+
DatabaseRolluper:options.DatabaseRolluper,
494497
WorkspaceUsageTracker:wuTracker,
495498
}
496499
}

‎coderd/database/dbgen/dbgen.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,38 @@ func WorkspaceApp(t testing.TB, db database.Store, orig database.WorkspaceApp) d
489489
returnresource
490490
}
491491

492+
funcWorkspaceAppStat(t testing.TB,db database.Store,orig database.WorkspaceAppStat) database.WorkspaceAppStat {
493+
// This is not going to be correct, but our query doesn't return the ID.
494+
id,err:=cryptorand.Int63()
495+
require.NoError(t,err,"generate id")
496+
497+
scheme:= database.WorkspaceAppStat{
498+
ID:takeFirst(orig.ID,id),
499+
UserID:takeFirst(orig.UserID,uuid.New()),
500+
WorkspaceID:takeFirst(orig.WorkspaceID,uuid.New()),
501+
AgentID:takeFirst(orig.AgentID,uuid.New()),
502+
AccessMethod:takeFirst(orig.AccessMethod,""),
503+
SlugOrPort:takeFirst(orig.SlugOrPort,""),
504+
SessionID:takeFirst(orig.SessionID,uuid.New()),
505+
SessionStartedAt:takeFirst(orig.SessionStartedAt,dbtime.Now().Add(-time.Minute)),
506+
SessionEndedAt:takeFirst(orig.SessionEndedAt,dbtime.Now()),
507+
Requests:takeFirst(orig.Requests,1),
508+
}
509+
err=db.InsertWorkspaceAppStats(genCtx, database.InsertWorkspaceAppStatsParams{
510+
UserID: []uuid.UUID{scheme.UserID},
511+
WorkspaceID: []uuid.UUID{scheme.WorkspaceID},
512+
AgentID: []uuid.UUID{scheme.AgentID},
513+
AccessMethod: []string{scheme.AccessMethod},
514+
SlugOrPort: []string{scheme.SlugOrPort},
515+
SessionID: []uuid.UUID{scheme.SessionID},
516+
SessionStartedAt: []time.Time{scheme.SessionStartedAt},
517+
SessionEndedAt: []time.Time{scheme.SessionEndedAt},
518+
Requests: []int32{scheme.Requests},
519+
})
520+
require.NoError(t,err,"insert workspace agent stat")
521+
returnscheme
522+
}
523+
492524
funcWorkspaceResource(t testing.TB,db database.Store,orig database.WorkspaceResource) database.WorkspaceResource {
493525
resource,err:=db.InsertWorkspaceResource(genCtx, database.InsertWorkspaceResourceParams{
494526
ID:takeFirst(orig.ID,uuid.New()),

‎coderd/database/dbpurge/dbpurge.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
// This is for cleaning up old, unused resources from the database that take up space.
2525
funcNew(ctx context.Context,logger slog.Logger,db database.Store) io.Closer {
2626
closed:=make(chanstruct{})
27+
logger=logger.Named("dbpurge")
2728

2829
ctx,cancelFunc:=context.WithCancel(ctx)
2930
//nolint:gocritic // The system purges old db records without user input.

‎coderd/database/dbrollup/dbrollup.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package dbrollup
2+
3+
import (
4+
"context"
5+
"flag"
6+
"time"
7+
8+
"golang.org/x/sync/errgroup"
9+
10+
"cdr.dev/slog"
11+
12+
"github.com/coder/coder/v2/coderd/database"
13+
"github.com/coder/coder/v2/coderd/database/dbauthz"
14+
)
15+
16+
const (
17+
// DefaultInterval is the default time between rollups.
18+
// Rollups will be synchronized with the clock so that
19+
// they happen 13:00, 13:05, 13:10, etc.
20+
DefaultInterval=5*time.Minute
21+
)
22+
23+
typeEventstruct {
24+
TemplateUsageStatsbool
25+
}
26+
27+
typeRolluperstruct {
28+
cancel context.CancelFunc
29+
closedchanstruct{}
30+
db database.Store
31+
logger slog.Logger
32+
interval time.Duration
33+
eventchan<-Event
34+
}
35+
36+
typeOptionfunc(*Rolluper)
37+
38+
// WithInterval sets the interval between rollups.
39+
funcWithInterval(interval time.Duration)Option {
40+
returnfunc(r*Rolluper) {
41+
r.interval=interval
42+
}
43+
}
44+
45+
// WithEventChannel sets the event channel to use for rollup events.
46+
//
47+
// This is only used for testing.
48+
funcWithEventChannel(chchan<-Event)Option {
49+
ifflag.Lookup("test.v")==nil {
50+
panic("developer error: WithEventChannel is not to be used outside of tests")
51+
}
52+
returnfunc(r*Rolluper) {
53+
r.event=ch
54+
}
55+
}
56+
57+
// New creates a new DB rollup service that periodically runs rollup queries.
58+
// It is the caller's responsibility to call Close on the returned instance.
59+
//
60+
// This is for e.g. generating insights data (template_usage_stats) from
61+
// raw data (workspace_agent_stats, workspace_app_stats).
62+
funcNew(logger slog.Logger,db database.Store,opts...Option)*Rolluper {
63+
ctx,cancel:=context.WithCancel(context.Background())
64+
65+
r:=&Rolluper{
66+
cancel:cancel,
67+
closed:make(chanstruct{}),
68+
db:db,
69+
logger:logger,
70+
interval:DefaultInterval,
71+
}
72+
73+
for_,opt:=rangeopts {
74+
opt(r)
75+
}
76+
77+
//nolint:gocritic // The system rolls up database tables without user input.
78+
ctx=dbauthz.AsSystemRestricted(ctx)
79+
gor.start(ctx)
80+
81+
returnr
82+
}
83+
84+
func (r*Rolluper)start(ctx context.Context) {
85+
deferclose(r.closed)
86+
87+
do:=func() {
88+
vareg errgroup.Group
89+
90+
r.logger.Debug(ctx,"rolling up data")
91+
now:=time.Now()
92+
93+
// Track whether or not we performed a rollup (we got the advisory lock).
94+
varevEvent
95+
96+
eg.Go(func()error {
97+
returnr.db.InTx(func(tx database.Store)error {
98+
// Acquire a lock to ensure that only one instance of
99+
// the rollup is running at a time.
100+
ok,err:=tx.TryAcquireLock(ctx,database.LockIDDBRollup)
101+
iferr!=nil {
102+
returnerr
103+
}
104+
if!ok {
105+
returnnil
106+
}
107+
108+
ev.TemplateUsageStats=true
109+
returntx.UpsertTemplateUsageStats(ctx)
110+
},nil)
111+
})
112+
113+
err:=eg.Wait()
114+
iferr!=nil {
115+
ifdatabase.IsQueryCanceledError(err) {
116+
return
117+
}
118+
// Only log if Close hasn't been called.
119+
ifctx.Err()==nil {
120+
r.logger.Error(ctx,"failed to rollup data",slog.Error(err))
121+
}
122+
return
123+
}
124+
125+
r.logger.Debug(ctx,
126+
"rolled up data",
127+
slog.F("took",time.Since(now)),
128+
slog.F("event",ev),
129+
)
130+
131+
// For testing.
132+
ifr.event!=nil {
133+
select {
134+
case<-ctx.Done():
135+
return
136+
caser.event<-ev:
137+
}
138+
}
139+
}
140+
141+
// Perform do immediately and on every tick of the ticker,
142+
// disregarding the execution time of do. This ensure that
143+
// the rollup is performed every interval assuming do does
144+
// not take longer than the interval to execute.
145+
t:=time.NewTicker(time.Microsecond)
146+
defert.Stop()
147+
for {
148+
select {
149+
case<-ctx.Done():
150+
return
151+
case<-t.C:
152+
// Ensure we're on the interval.
153+
now:=time.Now()
154+
next:=now.Add(r.interval).Truncate(r.interval)// Ensure we're on the interval and synced with the clock.
155+
d:=next.Sub(now)
156+
// Safety check (shouldn't be possible).
157+
ifd<=0 {
158+
d=r.interval
159+
}
160+
t.Reset(d)
161+
162+
do()
163+
164+
r.logger.Debug(ctx,"next rollup at",slog.F("next",next))
165+
}
166+
}
167+
}
168+
169+
func (r*Rolluper)Close()error {
170+
r.cancel()
171+
<-r.closed
172+
returnnil
173+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp