Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat(coderd/database): adddbrollup service to rollup insights#12665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
mafredri merged 2 commits intomainfrommafredri/add-dbrollup-service-for-insights
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletionscoderd/coderd.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -47,6 +47,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand DownExpand Up@@ -192,6 +193,9 @@ type Options struct {
// NewTicker is used for unit tests to replace "time.NewTicker".
NewTicker func(duration time.Duration) (tick <-chan time.Time, done func())

// DatabaseRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
DatabaseRolluper *dbrollup.Rolluper
// WorkspaceUsageTracker tracks workspace usage by the CLI.
WorkspaceUsageTracker *workspaceusage.Tracker
}
Expand DownExpand Up@@ -366,6 +370,10 @@ func New(options *Options) *API {
OIDC: options.OIDCConfig,
}

if options.DatabaseRolluper == nil {
options.DatabaseRolluper = dbrollup.New(options.Logger.Named("dbrollup"), options.Database)
}

if options.WorkspaceUsageTracker == nil {
options.WorkspaceUsageTracker = workspaceusage.New(options.Database,
workspaceusage.WithLogger(options.Logger.Named("workspace_usage_tracker")),
Expand DownExpand Up@@ -414,7 +422,9 @@ func New(options *Options) *API {
ctx,
options.Logger.Named("acquirer"),
options.Database,
options.Pubsub),
options.Pubsub,
),
dbRolluper: options.DatabaseRolluper,
workspaceUsageTracker: options.WorkspaceUsageTracker,
}

Expand DownExpand Up@@ -1197,7 +1207,9 @@ type API struct {
statsBatcher *batchstats.Batcher

Acquirer *provisionerdserver.Acquirer

// dbRolluper rolls up template usage stats from raw agent and app
// stats. This is used to provide insights in the WebUI.
dbRolluper *dbrollup.Rolluper
workspaceUsageTracker *workspaceusage.Tracker
}

Expand All@@ -1212,6 +1224,7 @@ func (api *API) Close() error {
api.WebsocketWaitGroup.Wait()
api.WebsocketWaitMutex.Unlock()

api.dbRolluper.Close()
api.metricsCache.Close()
if api.updateChecker != nil {
api.updateChecker.Close()
Expand Down
3 changes: 3 additions & 0 deletionscoderd/coderdtest/coderdtest.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -57,6 +57,7 @@ import (
"github.com/coder/coder/v2/coderd/batchstats"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbrollup"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/externalauth"
Expand DownExpand Up@@ -147,6 +148,7 @@ type Options struct {
WorkspaceAppsStatsCollectorOptions workspaceapps.StatsCollectorOptions
AllowWorkspaceRenames bool
NewTicker func(duration time.Duration) (<-chan time.Time, func())
DatabaseRolluper *dbrollup.Rolluper
WorkspaceUsageTrackerFlush chan int
WorkspaceUsageTrackerTick chan time.Time
}
Expand DownExpand Up@@ -491,6 +493,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
WorkspaceAppsStatsCollectorOptions: options.WorkspaceAppsStatsCollectorOptions,
AllowWorkspaceRenames: options.AllowWorkspaceRenames,
NewTicker: options.NewTicker,
DatabaseRolluper: options.DatabaseRolluper,
WorkspaceUsageTracker: wuTracker,
}
}
Expand Down
32 changes: 32 additions & 0 deletionscoderd/database/dbgen/dbgen.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -489,6 +489,38 @@ func WorkspaceApp(t testing.TB, db database.Store, orig database.WorkspaceApp) d
return resource
}

func WorkspaceAppStat(t testing.TB, db database.Store, orig database.WorkspaceAppStat) database.WorkspaceAppStat {
// This is not going to be correct, but our query doesn't return the ID.
id, err := cryptorand.Int63()
require.NoError(t, err, "generate id")

scheme := database.WorkspaceAppStat{
ID: takeFirst(orig.ID, id),
UserID: takeFirst(orig.UserID, uuid.New()),
WorkspaceID: takeFirst(orig.WorkspaceID, uuid.New()),
AgentID: takeFirst(orig.AgentID, uuid.New()),
AccessMethod: takeFirst(orig.AccessMethod, ""),
SlugOrPort: takeFirst(orig.SlugOrPort, ""),
SessionID: takeFirst(orig.SessionID, uuid.New()),
SessionStartedAt: takeFirst(orig.SessionStartedAt, dbtime.Now().Add(-time.Minute)),
SessionEndedAt: takeFirst(orig.SessionEndedAt, dbtime.Now()),
Requests: takeFirst(orig.Requests, 1),
}
err = db.InsertWorkspaceAppStats(genCtx, database.InsertWorkspaceAppStatsParams{
UserID: []uuid.UUID{scheme.UserID},
WorkspaceID: []uuid.UUID{scheme.WorkspaceID},
AgentID: []uuid.UUID{scheme.AgentID},
AccessMethod: []string{scheme.AccessMethod},
SlugOrPort: []string{scheme.SlugOrPort},
SessionID: []uuid.UUID{scheme.SessionID},
SessionStartedAt: []time.Time{scheme.SessionStartedAt},
SessionEndedAt: []time.Time{scheme.SessionEndedAt},
Requests: []int32{scheme.Requests},
})
require.NoError(t, err, "insert workspace agent stat")
return scheme
}

func WorkspaceResource(t testing.TB, db database.Store, orig database.WorkspaceResource) database.WorkspaceResource {
resource, err := db.InsertWorkspaceResource(genCtx, database.InsertWorkspaceResourceParams{
ID: takeFirst(orig.ID, uuid.New()),
Expand Down
1 change: 1 addition & 0 deletionscoderd/database/dbpurge/dbpurge.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -24,6 +24,7 @@ const (
// This is for cleaning up old, unused resources from the database that take up space.
func New(ctx context.Context, logger slog.Logger, db database.Store) io.Closer {
closed := make(chan struct{})
logger = logger.Named("dbpurge")

ctx, cancelFunc := context.WithCancel(ctx)
//nolint:gocritic // The system purges old db records without user input.
Expand Down
173 changes: 173 additions & 0 deletionscoderd/database/dbrollup/dbrollup.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
package dbrollup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I see a potential abstraction here -- the PR I'm working on in parallel#12659 has a similar logic of "run a query every interval". There are definitely other areas of the codebase that could benefit from a similar framework.


import (
"context"
"flag"
"time"

"golang.org/x/sync/errgroup"

"cdr.dev/slog"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
)

const (
// DefaultInterval is the default time between rollups.
// Rollups will be synchronized with the clock so that
// they happen 13:00, 13:05, 13:10, etc.
DefaultInterval = 5 * time.Minute
)

type Event struct {
TemplateUsageStats bool
}

type Rolluper struct {
cancel context.CancelFunc
closed chan struct{}
db database.Store
logger slog.Logger
interval time.Duration
event chan<- Event
}

type Option func(*Rolluper)

// WithInterval sets the interval between rollups.
func WithInterval(interval time.Duration) Option {
return func(r *Rolluper) {
r.interval = interval
}
}

// WithEventChannel sets the event channel to use for rollup events.
//
// This is only used for testing.
func WithEventChannel(ch chan<- Event) Option {
if flag.Lookup("test.v") == nil {
panic("developer error: WithEventChannel is not to be used outside of tests")
}
return func(r *Rolluper) {
r.event = ch
}
}

// New creates a new DB rollup service that periodically runs rollup queries.
// It is the caller's responsibility to call Close on the returned instance.
//
// This is for e.g. generating insights data (template_usage_stats) from
// raw data (workspace_agent_stats, workspace_app_stats).
func New(logger slog.Logger, db database.Store, opts ...Option) *Rolluper {
ctx, cancel := context.WithCancel(context.Background())

r := &Rolluper{
cancel: cancel,
closed: make(chan struct{}),
db: db,
logger: logger,
interval: DefaultInterval,
}

for _, opt := range opts {
opt(r)
}

//nolint:gocritic // The system rolls up database tables without user input.
ctx = dbauthz.AsSystemRestricted(ctx)
go r.start(ctx)

return r
}

func (r *Rolluper) start(ctx context.Context) {
defer close(r.closed)

do := func() {
var eg errgroup.Group

r.logger.Debug(ctx, "rolling up data")
now := time.Now()

// Track whether or not we performed a rollup (we got the advisory lock).
var ev Event

eg.Go(func() error {
return r.db.InTx(func(tx database.Store) error {
// Acquire a lock to ensure that only one instance of
// the rollup is running at a time.
ok, err := tx.TryAcquireLock(ctx, database.LockIDDBRollup)
if err != nil {
return err
}
if !ok {
return nil
}

ev.TemplateUsageStats = true
return tx.UpsertTemplateUsageStats(ctx)
}, nil)
})

err := eg.Wait()
if err != nil {
if database.IsQueryCanceledError(err) {
return
}
// Only log if Close hasn't been called.
if ctx.Err() == nil {
r.logger.Error(ctx, "failed to rollup data", slog.Error(err))
}
return
}

r.logger.Debug(ctx,
"rolled up data",
slog.F("took", time.Since(now)),
slog.F("event", ev),
)

// For testing.
if r.event != nil {
select {
case <-ctx.Done():
return
case r.event <- ev:
}
}
}

// Perform do immediately and on every tick of the ticker,
// disregarding the execution time of do. This ensure that
// the rollup is performed every interval assuming do does
// not take longer than the interval to execute.
t := time.NewTicker(time.Microsecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
// Ensure we're on the interval.
now := time.Now()
next := now.Add(r.interval).Truncate(r.interval) // Ensure we're on the interval and synced with the clock.
d := next.Sub(now)
// Safety check (shouldn't be possible).
if d <= 0 {
d = r.interval
}
t.Reset(d)

do()

r.logger.Debug(ctx, "next rollup at", slog.F("next", next))
}
}
}

func (r *Rolluper) Close() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Is the error return value here to satisfy an interface? Do we need it?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yeah, I feel the io.Closer interface is so common it's worth using this signature even if the error is not used at the moment or ever in the future.

r.cancel()
<-r.closed
return nil
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp