Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitac7ea08

Browse files
authored
chore: add files cache for reading template tar archives from db (#17141)
1 parentc062942 commitac7ea08

File tree

5 files changed

+308
-0
lines changed

5 files changed

+308
-0
lines changed

‎archive/fs/tar.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package archivefs
2+
3+
import (
4+
"archive/tar"
5+
"io"
6+
"io/fs"
7+
8+
"github.com/spf13/afero"
9+
"github.com/spf13/afero/tarfs"
10+
)
11+
12+
funcFromTarReader(r io.Reader) fs.FS {
13+
tr:=tar.NewReader(r)
14+
tfs:=tarfs.New(tr)
15+
rofs:=afero.NewReadOnlyFs(tfs)
16+
returnafero.NewIOFS(rofs)
17+
}

‎coderd/files/cache.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package files
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"io/fs"
7+
"sync"
8+
9+
"github.com/google/uuid"
10+
"golang.org/x/xerrors"
11+
12+
archivefs"github.com/coder/coder/v2/archive/fs"
13+
"github.com/coder/coder/v2/coderd/database"
14+
"github.com/coder/coder/v2/coderd/util/lazy"
15+
)
16+
17+
// NewFromStore returns a file cache that will fetch files from the provided
18+
// database.
19+
funcNewFromStore(store database.Store)Cache {
20+
fetcher:=func(ctx context.Context,fileID uuid.UUID) (fs.FS,error) {
21+
file,err:=store.GetFileByID(ctx,fileID)
22+
iferr!=nil {
23+
returnnil,xerrors.Errorf("failed to read file from database: %w",err)
24+
}
25+
26+
content:=bytes.NewBuffer(file.Data)
27+
returnarchivefs.FromTarReader(content),nil
28+
}
29+
30+
returnCache{
31+
lock: sync.Mutex{},
32+
data:make(map[uuid.UUID]*cacheEntry),
33+
fetcher:fetcher,
34+
}
35+
}
36+
37+
// Cache persists the files for template versions, and is used by dynamic
38+
// parameters to deduplicate the files in memory. When any number of users opens
39+
// the workspace creation form for a given template version, it's files are
40+
// loaded into memory exactly once. We hold those files until there are no
41+
// longer any open connections, and then we remove the value from the map.
42+
typeCachestruct {
43+
lock sync.Mutex
44+
datamap[uuid.UUID]*cacheEntry
45+
fetcher
46+
}
47+
48+
typecacheEntrystruct {
49+
// refCount must only be accessed while the Cache lock is held.
50+
refCountint
51+
value*lazy.ValueWithError[fs.FS]
52+
}
53+
54+
typefetcherfunc(context.Context, uuid.UUID) (fs.FS,error)
55+
56+
// Acquire will load the fs.FS for the given file. It guarantees that parallel
57+
// calls for the same fileID will only result in one fetch, and that parallel
58+
// calls for distinct fileIDs will fetch in parallel.
59+
//
60+
// Every call to Acquire must have a matching call to Release.
61+
func (c*Cache)Acquire(ctx context.Context,fileID uuid.UUID) (fs.FS,error) {
62+
// It's important that this `Load` call occurs outside of `prepare`, after the
63+
// mutex has been released, or we would continue to hold the lock until the
64+
// entire file has been fetched, which may be slow, and would prevent other
65+
// files from being fetched in parallel.
66+
returnc.prepare(ctx,fileID).Load()
67+
}
68+
69+
func (c*Cache)prepare(ctx context.Context,fileID uuid.UUID)*lazy.ValueWithError[fs.FS] {
70+
c.lock.Lock()
71+
deferc.lock.Unlock()
72+
73+
entry,ok:=c.data[fileID]
74+
if!ok {
75+
value:=lazy.NewWithError(func() (fs.FS,error) {
76+
returnc.fetcher(ctx,fileID)
77+
})
78+
79+
entry=&cacheEntry{
80+
value:value,
81+
refCount:0,
82+
}
83+
c.data[fileID]=entry
84+
}
85+
86+
entry.refCount++
87+
returnentry.value
88+
}
89+
90+
// Release decrements the reference count for the given fileID, and frees the
91+
// backing data if there are no further references being held.
92+
func (c*Cache)Release(fileID uuid.UUID) {
93+
c.lock.Lock()
94+
deferc.lock.Unlock()
95+
96+
entry,ok:=c.data[fileID]
97+
if!ok {
98+
// If we land here, it's almost certainly because a bug already happened,
99+
// and we're freeing something that's already been freed, or we're calling
100+
// this function with an incorrect ID. Should this function return an error?
101+
return
102+
}
103+
104+
entry.refCount--
105+
ifentry.refCount>0 {
106+
return
107+
}
108+
109+
delete(c.data,fileID)
110+
}

‎coderd/files/cache_internal_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package files
2+
3+
import (
4+
"context"
5+
"io/fs"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/spf13/afero"
13+
"github.com/stretchr/testify/require"
14+
"golang.org/x/sync/errgroup"
15+
16+
"github.com/coder/coder/v2/testutil"
17+
)
18+
19+
funcTestConcurrency(t*testing.T) {
20+
t.Parallel()
21+
22+
emptyFS:=afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
23+
varfetches atomic.Int64
24+
c:=newTestCache(func(_ context.Context,_ uuid.UUID) (fs.FS,error) {
25+
fetches.Add(1)
26+
// Wait long enough before returning to make sure that all of the goroutines
27+
// will be waiting in line, ensuring that no one duplicated a fetch.
28+
time.Sleep(testutil.IntervalMedium)
29+
returnemptyFS,nil
30+
})
31+
32+
batches:=1000
33+
groups:=make([]*errgroup.Group,0,batches)
34+
forrangebatches {
35+
groups=append(groups,new(errgroup.Group))
36+
}
37+
38+
// Call Acquire with a unique ID per batch, many times per batch, with many
39+
// batches all in parallel. This is pretty much the worst-case scenario:
40+
// thousands of concurrent reads, with both warm and cold loads happening.
41+
batchSize:=10
42+
for_,g:=rangegroups {
43+
id:=uuid.New()
44+
forrangebatchSize {
45+
g.Go(func()error {
46+
// We don't bother to Release these references because the Cache will be
47+
// released at the end of the test anyway.
48+
_,err:=c.Acquire(t.Context(),id)
49+
returnerr
50+
})
51+
}
52+
}
53+
54+
for_,g:=rangegroups {
55+
require.NoError(t,g.Wait())
56+
}
57+
require.Equal(t,int64(batches),fetches.Load())
58+
}
59+
60+
funcTestRelease(t*testing.T) {
61+
t.Parallel()
62+
63+
emptyFS:=afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
64+
c:=newTestCache(func(_ context.Context,_ uuid.UUID) (fs.FS,error) {
65+
returnemptyFS,nil
66+
})
67+
68+
batches:=100
69+
ids:=make([]uuid.UUID,0,batches)
70+
forrangebatches {
71+
ids=append(ids,uuid.New())
72+
}
73+
74+
// Acquire a bunch of references
75+
batchSize:=10
76+
for_,id:=rangeids {
77+
forrangebatchSize {
78+
it,err:=c.Acquire(t.Context(),id)
79+
require.NoError(t,err)
80+
require.Equal(t,emptyFS,it)
81+
}
82+
}
83+
84+
// Make sure cache is fully loaded
85+
require.Equal(t,len(c.data),batches)
86+
87+
// Now release all of the references
88+
for_,id:=rangeids {
89+
forrangebatchSize {
90+
c.Release(id)
91+
}
92+
}
93+
94+
// ...and make sure that the cache has emptied itself.
95+
require.Equal(t,len(c.data),0)
96+
}
97+
98+
funcnewTestCache(fetcherfunc(context.Context, uuid.UUID) (fs.FS,error))Cache {
99+
returnCache{
100+
lock: sync.Mutex{},
101+
data:make(map[uuid.UUID]*cacheEntry),
102+
fetcher:fetcher,
103+
}
104+
}

‎coderd/util/lazy/valuewitherror.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package lazy
2+
3+
typeValueWithError[Tany]struct {
4+
innerValue[result[T]]
5+
}
6+
7+
typeresult[Tany]struct {
8+
valueT
9+
errerror
10+
}
11+
12+
// NewWithError allows you to provide a lazy initializer that can fail.
13+
funcNewWithError[Tany](fnfunc() (T,error))*ValueWithError[T] {
14+
return&ValueWithError[T]{
15+
inner:Value[result[T]]{fn:func()result[T] {
16+
value,err:=fn()
17+
returnresult[T]{value:value,err:err}
18+
}},
19+
}
20+
}
21+
22+
func (v*ValueWithError[T])Load() (T,error) {
23+
result:=v.inner.Load()
24+
returnresult.value,result.err
25+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package lazy_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"golang.org/x/xerrors"
8+
9+
"github.com/coder/coder/v2/coderd/util/lazy"
10+
)
11+
12+
funcTestLazyWithErrorOK(t*testing.T) {
13+
t.Parallel()
14+
15+
l:=lazy.NewWithError(func() (int,error) {
16+
return1,nil
17+
})
18+
19+
i,err:=l.Load()
20+
require.NoError(t,err)
21+
require.Equal(t,1,i)
22+
}
23+
24+
funcTestLazyWithErrorErr(t*testing.T) {
25+
t.Parallel()
26+
27+
l:=lazy.NewWithError(func() (int,error) {
28+
return0,xerrors.New("oh no! everything that could went horribly wrong!")
29+
})
30+
31+
i,err:=l.Load()
32+
require.Error(t,err)
33+
require.Equal(t,0,i)
34+
}
35+
36+
funcTestLazyWithErrorPointers(t*testing.T) {
37+
t.Parallel()
38+
39+
a:=1
40+
l:=lazy.NewWithError(func() (*int,error) {
41+
return&a,nil
42+
})
43+
44+
b,err:=l.Load()
45+
require.NoError(t,err)
46+
c,err:=l.Load()
47+
require.NoError(t,err)
48+
49+
*b++
50+
*c++
51+
require.Equal(t,3,a)
52+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp