@@ -62,13 +62,15 @@ func newStatsCache(prefix string, getKeys storage.KeysFunc) *statsCache {
62
62
// thus we run a background goroutine to periodically cleanup keys if needed.
63
63
type statsCache struct {
64
64
prefix string
65
- getKeys storage.KeysFunc
66
65
stop chan struct {}
67
66
wg sync.WaitGroup
68
67
lastKeyCleanup atomic.Pointer [time.Time ]
69
68
70
- lock sync.Mutex
71
- keys map [string ]sizeRevision
69
+ getKeysLock sync.Mutex
70
+ getKeys storage.KeysFunc
71
+
72
+ keysLock sync.Mutex
73
+ keys map [string ]sizeRevision
72
74
}
73
75
74
76
type sizeRevision struct {
@@ -77,23 +79,34 @@ type sizeRevision struct {
77
79
}
78
80
79
81
func (sc * statsCache )Stats (ctx context.Context ) (storage.Stats ,error ) {
80
- keys ,err := sc .getKeys (ctx )
82
+ keys ,err := sc .GetKeys (ctx )
81
83
if err != nil {
82
84
return storage.Stats {},err
83
85
}
84
86
stats := storage.Stats {
85
87
ObjectCount :int64 (len (keys )),
86
88
}
87
- sc .lock .Lock ()
88
- defer sc .lock .Unlock ()
89
+ sc .keysLock .Lock ()
90
+ defer sc .keysLock .Unlock ()
89
91
sc .cleanKeys (keys )
90
92
if len (sc .keys )!= 0 {
91
93
stats .EstimatedAverageObjectSizeBytes = sc .keySizes ()/ int64 (len (sc .keys ))
92
94
}
93
95
return stats ,nil
94
96
}
95
97
98
+ func (sc * statsCache )GetKeys (ctx context.Context ) ([]string ,error ) {
99
+ sc .getKeysLock .Lock ()
100
+ getKeys := sc .getKeys
101
+ sc .getKeysLock .Unlock ()
102
+
103
+ // Don't execute getKeys under lock.
104
+ return getKeys (ctx )
105
+ }
106
+
96
107
func (sc * statsCache )SetKeysFunc (keys storage.KeysFunc ) {
108
+ sc .getKeysLock .Lock ()
109
+ defer sc .getKeysLock .Unlock ()
97
110
sc .getKeys = keys
98
111
}
99
112
@@ -103,27 +116,27 @@ func (sc *statsCache) Close() {
103
116
}
104
117
105
118
func (sc * statsCache )run () {
106
- err := wait . PollUntilContextCancel ( wait . ContextForChannel ( sc . stop ), sizerRefreshInterval , false , func ( ctx context. Context ) ( done bool , err error ) {
107
- sc . cleanKeysIfNeeded ( ctx )
108
- return false , nil
109
- })
110
- if err != nil {
111
- klog . InfoS ( "Sizer exiting" )
119
+ jitter := 0.5 // Period between [interval, interval * (1.0 + jitter)]
120
+ sliding := true
121
+ // wait.JitterUntilWithContext starts work immediately, so wait first.
122
+ select {
123
+ case <- time . After ( wait . Jitter ( sizerRefreshInterval , jitter )):
124
+ case <- sc . stop :
112
125
}
126
+ wait .JitterUntilWithContext (wait .ContextForChannel (sc .stop ),sc .cleanKeysIfNeeded ,sizerRefreshInterval ,jitter ,sliding )
113
127
}
114
128
115
129
func (sc * statsCache )cleanKeysIfNeeded (ctx context.Context ) {
116
130
lastKeyCleanup := sc .lastKeyCleanup .Load ()
117
131
if lastKeyCleanup != nil && time .Since (* lastKeyCleanup )< sizerRefreshInterval {
118
132
return
119
133
}
120
- // Don't execute getKeys under lock.
121
- keys ,err := sc .getKeys (ctx )
134
+ keys ,err := sc .GetKeys (ctx )
122
135
if err != nil {
123
136
klog .InfoS ("Error getting keys" ,"err" ,err )
124
137
}
125
- sc .lock .Lock ()
126
- defer sc .lock .Unlock ()
138
+ sc .keysLock .Lock ()
139
+ defer sc .keysLock .Unlock ()
127
140
sc .cleanKeys (keys )
128
141
}
129
142
@@ -157,16 +170,16 @@ func (sc *statsCache) keySizes() (totalSize int64) {
157
170
}
158
171
159
172
func (sc * statsCache )Update (kvs []* mvccpb.KeyValue ) {
160
- sc .lock .Lock ()
161
- defer sc .lock .Unlock ()
173
+ sc .keysLock .Lock ()
174
+ defer sc .keysLock .Unlock ()
162
175
for _ ,kv := range kvs {
163
176
sc .updateKey (kv )
164
177
}
165
178
}
166
179
167
180
func (sc * statsCache )UpdateKey (kv * mvccpb.KeyValue ) {
168
- sc .lock .Lock ()
169
- defer sc .lock .Unlock ()
181
+ sc .keysLock .Lock ()
182
+ defer sc .keysLock .Unlock ()
170
183
171
184
sc .updateKey (kv )
172
185
}
@@ -185,8 +198,8 @@ func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) {
185
198
}
186
199
187
200
func (sc * statsCache )DeleteKey (kv * mvccpb.KeyValue ) {
188
- sc .lock .Lock ()
189
- defer sc .lock .Unlock ()
201
+ sc .keysLock .Lock ()
202
+ defer sc .keysLock .Unlock ()
190
203
191
204
key := string (kv .Key )
192
205
keySizeRevision := sc .keys [key ]