Expand Up @@ -25,60 +25,61 @@ type FileAcquirer interface { // New returns a file cache that will fetch files from a database func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache { return (&Cache{ lock: sync.Mutex{}, data: make(map[uuid.UUID]*cacheEntry), authz: authz, }).registerMetrics(registerer) return &Cache{ lock: sync.Mutex{}, data: make(map[uuid.UUID]*cacheEntry), authz: authz, cacheMetrics: newCacheMetrics(registerer), } } func(c *Cache) registerMetrics( registerer prometheus.Registerer)*Cache { funcnewCacheMetrics( registerer prometheus.Registerer)cacheMetrics { subsystem := "file_cache" f := promauto.With(registerer) c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts {Namespace: "coderd", Subsystem: subsystem ,Name: "open_files_size_bytes_current" ,Help : "The current amount of memory of all files currently open in the file cache. ",}) c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{ Namespace: "coderd", Subsystem: subsystem ,Name: "open_files_size_bytes_total" ,Help : "The total amount of memory ever opened in the file cache. This number never decrements. ",}) c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: subsystem ,Name: "open_files_current" ,Help : "The count of unique files currently open in the file cache. ",}) c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{ Namespace: "coderd", Subsystem: subsystem ,Name: "open_files_total" ,Help : "The total count of unique files ever opened in the file cache. ",}) c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: subsystem ,Name: "open_file_refs_current" ,Help : "The count of file references currently open in the file cache. Multiple references can be held for the same file. ",}) c.totalOpenFileReferences = f.NewCounterVec(prometheus.CounterOpts{ Namespace: "coderd", Subsystem: subsystem ,Name: "open_file_refs_total" ,Help : "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache. ",}, []string{" hit"}) return c return cacheMetrics {currentCacheSize: f.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_files_size_bytes_current ",Help: "The current amount of memory of all files currently open in the file cache.", }), totalCacheSize: f.NewCounter(prometheus.CounterOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_files_size_bytes_total ",Help: "The total amount of memory ever opened in the file cache. This number never decrements.", }), currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_files_current ",Help: "The count of unique files currently open in the file cache.", }), totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_files_total ",Help: "The total count of unique files ever opened in the file cache.", }), currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_file_refs_current ",Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.", }), totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{ Namespace: "coderd" ,Subsystem: subsystem ,Name : "open_file_refs_total ",Help: "The total number of file references ever opened in the file cache. The ' hit' label indicates if the file was loaded from the cache.", }, []string{"hit"}), } } // Cache persists the files for template versions, and is used by dynamic Expand Down Expand Up @@ -106,18 +107,22 @@ type cacheMetrics struct { totalCacheSize prometheus.Counter } type cacheEntry struct { // refCount must only be accessed while the cacheEntry lock is held. lock sync.Mutex refCount int value *lazy.ValueWithError[CacheEntryValue] close func() purge func() } type CacheEntryValue struct { fs.FS Object rbac.Object Size int64 } type cacheEntry struct { // refCount must only be accessed while the Cache lock is held. refCount int value *lazy.ValueWithError[CacheEntryValue] } var _ fs.FS = (*CloseFS)(nil) // CloseFS is a wrapper around fs.FS that implements io.Closer. The Close() Expand All @@ -142,93 +147,121 @@ func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID // mutex has been released, or we would continue to hold the lock until the // entire file has been fetched, which may be slow, and would prevent other // files from being fetched in parallel. it, err := c.prepare(ctx, db, fileID).Load() e := c.prepare(db, fileID) ev, err := e.value.Load() if err != nil { c.release(fileID) e.close() e.purge() return nil, err } // We always run the fetch under a system context and actor, so we need to check the caller's // context manually before returning. // Check if the caller's context was canceled if err := ctx.Err(); err != nil { return nil, err } // Check that the caller is authorized to access the file subject, ok := dbauthz.ActorFromContext(ctx) if !ok { return nil, dbauthz.ErrNoActor } // Always check the caller can actually read the file. if err := c.authz.Authorize(ctx, subject, policy.ActionRead, it.Object); err != nil { c.release(fileID) if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil { e.close() return nil, err } varonce sync.Once varcloseOnce sync.Once return &CloseFS{ FS:it .FS, FS:ev .FS, close: func() { // sync.Once makes the Close() idempotent, so we can call it // multiple times without worrying about double-releasing. once.Do(func() { c.release(fileID) }) closeOnce.Do(func() { e.close() }) }, }, nil } func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] { func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry { c.lock.Lock() defer c.lock.Unlock() hitLabel := "true" entry, ok := c.data[fileID] if !ok { value := lazy.NewWithError(func() (CacheEntryValue, error) { val, err := fetch(ctx, db, fileID) hitLabel = "false" var purgeOnce sync.Once purge := func() { purgeOnce.Do(func() { c.purge(fileID) }) } entry = &cacheEntry{ refCount: 0, value: lazy.NewWithError(func() (CacheEntryValue, error) { val, err := fetch(db, fileID) if err != nil { return val, err } // Always add to the cache size the bytes of the file loaded. if err == nil { // Add the size of the file to the cache size metrics. c.currentCacheSize.Add(float64(val.Size)) c.totalCacheSize.Add(float64(val.Size)) } return val, err }) return val, err}), entry = &cacheEntry{ value: value, refCount: 0, close: func() { entry.lock.Lock() defer entry.lock.Unlock() entry.refCount-- c.currentOpenFileReferences.Dec() if entry.refCount > 0 { return } purge() }, purge: purge, } c.data[fileID] = entry c.currentOpenFiles.Inc() c.totalOpenedFiles.Inc() hitLabel = "false" } entry.lock.Lock() defer entry.lock.Unlock() c.currentOpenFileReferences.Inc() c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc() entry.refCount++ return entry.value return entry } // release decrements the reference count for the given fileID, and frees the // backing data if there are no further references being held. // // release should only be called after a successful call to Acquire using the Release() // method on the returned *CloseFS. func (c *Cache) release(fileID uuid.UUID) { // purge immediately removes an entry from the cache, even if it has open references. // It should only be called from the `close` function in a `cacheEntry`. func (c *Cache) purge(fileID uuid.UUID) { c.lock.Lock() defer c.lock.Unlock() entry, ok := c.data[fileID] if !ok { // If we land here, it's almost certainly because a bug already happened, // and we're freeing something that's already been freed, or we're calling // this function with an incorrect ID. Should this function return an error? return } c.currentOpenFileReferences.Dec() entry.refCount-- if entry.refCount > 0 { // If we land here, it's probably because of a fetch attempt that // resulted in an error, and got purged already. It may also be an // erroneous extra close, but we can't really distinguish between those // two cases currently. return } // Purge the file from the cache. c.currentOpenFiles.Dec() ev, err := entry.value.Load() if err == nil { c.currentCacheSize.Add(-1 * float64(ev.Size)) Expand All @@ -246,11 +279,18 @@ func (c *Cache) Count() int { return len(c.data) } func fetch(ctx context.Context, store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { // Make sure the read does not fail due to authorization issues. // Authz is checked on the Acquire call, so this is safe. func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) { // Because many callers can be waiting on the same file fetch concurrently, we // want to prevent any failures that would cause them all to receive errors // because the caller who initiated the fetch would fail. // - We always run the fetch with an uncancelable context, and then check // context cancellation for each acquirer afterwards. // - We always run the fetch as a system user, and then check authorization // for each acquirer afterwards. // This prevents a canceled context or an unauthorized user from "holding up // the queue". //nolint:gocritic file, err := store.GetFileByID(dbauthz.AsFileReader(ctx ), fileID) file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background() ), fileID) if err != nil { return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err) } Expand Down