Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: add files cache for reading template tar archives from db#17141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
aslilac merged 7 commits intomainfromlilac/files-cache
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletionsarchive/fs/tar.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
package archivefs

import (
"archive/tar"
"io"
"io/fs"

"github.com/spf13/afero"
"github.com/spf13/afero/tarfs"
)

func FromTarReader(r io.Reader) fs.FS {
tr := tar.NewReader(r)
tfs := tarfs.New(tr)
rofs := afero.NewReadOnlyFs(tfs)
return afero.NewIOFS(rofs)
}
110 changes: 110 additions & 0 deletionscoderd/files/cache.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
package files

import (
"bytes"
"context"
"io/fs"
"sync"

"github.com/google/uuid"
"golang.org/x/xerrors"

archivefs "github.com/coder/coder/v2/archive/fs"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/util/lazy"
)

// NewFromStore returns a file cache that will fetch files from the provided
// database.
func NewFromStore(store database.Store) Cache {
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
file, err := store.GetFileByID(ctx, fileID)
if err != nil {
return nil, xerrors.Errorf("failed to read file from database: %w", err)
}

content := bytes.NewBuffer(file.Data)
return archivefs.FromTarReader(content), nil
}

return Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
fetcher: fetcher,
}
}

// Cache persists the files for template versions, and is used by dynamic
// parameters to deduplicate the files in memory. When any number of users opens
// the workspace creation form for a given template version, it's files are
// loaded into memory exactly once. We hold those files until there are no
// longer any open connections, and then we remove the value from the map.
type Cache struct {
lock sync.Mutex
data map[uuid.UUID]*cacheEntry
fetcher
}

type cacheEntry struct {
// refCount must only be accessed while the Cache lock is held.
refCount int
value *lazy.ValueWithError[fs.FS]
}

type fetcher func(context.Context, uuid.UUID) (fs.FS, error)

// Acquire will load the fs.FS for the given file. It guarantees that parallel
// calls for the same fileID will only result in one fetch, and that parallel
// calls for distinct fileIDs will fetch in parallel.
//
// Every call to Acquire must have a matching call to Release.
func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
// It's important that this `Load` call occurs outside of `prepare`, after the
// mutex has been released, or we would continue to hold the lock until the
// entire file has been fetched, which may be slow, and would prevent other
// files from being fetched in parallel.
return c.prepare(ctx, fileID).Load()
}

func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] {
c.lock.Lock()
defer c.lock.Unlock()

entry, ok := c.data[fileID]
if !ok {
value := lazy.NewWithError(func() (fs.FS, error) {
return c.fetcher(ctx, fileID)
})

entry = &cacheEntry{
value: value,
refCount: 0,
}
c.data[fileID] = entry
}

entry.refCount++
return entry.value
}

// Release decrements the reference count for the given fileID, and frees the
// backing data if there are no further references being held.
func (c *Cache) Release(fileID uuid.UUID) {
c.lock.Lock()
defer c.lock.Unlock()

entry, ok := c.data[fileID]
if !ok {
// If we land here, it's almost certainly because a bug already happened,
// and we're freeing something that's already been freed, or we're calling
// this function with an incorrect ID. Should this function return an error?
return
}

entry.refCount--
if entry.refCount > 0 {
return
}

delete(c.data, fileID)
}
104 changes: 104 additions & 0 deletionscoderd/files/cache_internal_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
package files

import (
"context"
"io/fs"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/coder/coder/v2/testutil"
)

func TestConcurrency(t *testing.T) {
t.Parallel()

emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
var fetches atomic.Int64
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
fetches.Add(1)
// Wait long enough before returning to make sure that all of the goroutines
// will be waiting in line, ensuring that no one duplicated a fetch.
time.Sleep(testutil.IntervalMedium)
return emptyFS, nil
})

batches := 1000
groups := make([]*errgroup.Group, 0, batches)
for range batches {
groups = append(groups, new(errgroup.Group))
}

// Call Acquire with a unique ID per batch, many times per batch, with many
// batches all in parallel. This is pretty much the worst-case scenario:
// thousands of concurrent reads, with both warm and cold loads happening.
batchSize := 10
for _, g := range groups {
id := uuid.New()
for range batchSize {
g.Go(func() error {
// We don't bother to Release these references because the Cache will be
// released at the end of the test anyway.
_, err := c.Acquire(t.Context(), id)
return err
})
}
}

for _, g := range groups {
require.NoError(t, g.Wait())
}
require.Equal(t, int64(batches), fetches.Load())
}

func TestRelease(t *testing.T) {
t.Parallel()

emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) {
return emptyFS, nil
})

batches := 100
ids := make([]uuid.UUID, 0, batches)
for range batches {
ids = append(ids, uuid.New())
}

// Acquire a bunch of references
batchSize := 10
for _, id := range ids {
for range batchSize {
it, err := c.Acquire(t.Context(), id)
require.NoError(t, err)
require.Equal(t, emptyFS, it)
}
}

// Make sure cache is fully loaded
require.Equal(t, len(c.data), batches)

// Now release all of the references
for _, id := range ids {
for range batchSize {
c.Release(id)
}
}

// ...and make sure that the cache has emptied itself.
require.Equal(t, len(c.data), 0)
}

func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache {
return Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
fetcher: fetcher,
}
}
25 changes: 25 additions & 0 deletionscoderd/util/lazy/valuewitherror.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
package lazy

type ValueWithError[T any] struct {
inner Value[result[T]]
}

type result[T any] struct {
value T
err error
}

// NewWithError allows you to provide a lazy initializer that can fail.
func NewWithError[T any](fn func() (T, error)) *ValueWithError[T] {
return &ValueWithError[T]{
inner: Value[result[T]]{fn: func() result[T] {
value, err := fn()
return result[T]{value: value, err: err}
}},
}
}

func (v *ValueWithError[T]) Load() (T, error) {
result := v.inner.Load()
return result.value, result.err
}
52 changes: 52 additions & 0 deletionscoderd/util/lazy/valuewitherror_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
package lazy_test

import (
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/util/lazy"
)

func TestLazyWithErrorOK(t *testing.T) {
t.Parallel()

l := lazy.NewWithError(func() (int, error) {
return 1, nil
})

i, err := l.Load()
require.NoError(t, err)
require.Equal(t, 1, i)
}

func TestLazyWithErrorErr(t *testing.T) {
t.Parallel()

l := lazy.NewWithError(func() (int, error) {
return 0, xerrors.New("oh no! everything that could went horribly wrong!")
})

i, err := l.Load()
require.Error(t, err)
require.Equal(t, 0, i)
}

func TestLazyWithErrorPointers(t *testing.T) {
t.Parallel()

a := 1
l := lazy.NewWithError(func() (*int, error) {
return &a, nil
})

b, err := l.Load()
require.NoError(t, err)
c, err := l.Load()
require.NoError(t, err)

*b++
*c++
require.Equal(t, 3, a)
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp