Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbfc6d87

Browse files
authored
Merge pull request#133053 from serathius/jitter
Add jitter to priodically executed processes in storage to avoid too many concurrent executions
2 parents0e9679d +79dc0b8 commitbfc6d87

File tree

3 files changed

+45
-31
lines changed

3 files changed

+45
-31
lines changed

‎staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,14 @@ type getLister interface {
321321

322322
func (cconsistencyChecker)startChecking(stopCh<-chanstruct{}) {
323323
klog.V(3).InfoS("Cache consistency check start","group",c.groupResource.Group,"resource",c.groupResource.Resource)
324-
err:=wait.PollUntilContextCancel(wait.ContextForChannel(stopCh),ConsistencyCheckPeriod,false,func(ctx context.Context) (donebool,errerror) {
325-
c.check(ctx)
326-
returnfalse,nil
327-
})
328-
iferr!=nil {
329-
klog.V(3).InfoS("Cache consistency check exiting","group",c.groupResource.Group,"resource",c.groupResource.Resource,"err",err)
330-
}
324+
jitter:=0.5// Period between [interval, interval * (1.0 + jitter)]
325+
sliding:=true
326+
// wait.JitterUntilWithContext starts work immediately, so wait first.
327+
select {
328+
case<-time.After(wait.Jitter(ConsistencyCheckPeriod,jitter)):
329+
case<-stopCh:
330+
}
331+
wait.JitterUntilWithContext(wait.ContextForChannel(stopCh),c.check,ConsistencyCheckPeriod,jitter,sliding)
331332
}
332333

333334
func (c*consistencyChecker)check(ctx context.Context) {

‎staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ func newStatsCache(prefix string, getKeys storage.KeysFunc) *statsCache {
6262
// thus we run a background goroutine to periodically cleanup keys if needed.
6363
typestatsCachestruct {
6464
prefixstring
65-
getKeys storage.KeysFunc
6665
stopchanstruct{}
6766
wg sync.WaitGroup
6867
lastKeyCleanup atomic.Pointer[time.Time]
6968

70-
lock sync.Mutex
71-
keysmap[string]sizeRevision
69+
getKeysLock sync.Mutex
70+
getKeys storage.KeysFunc
71+
72+
keysLock sync.Mutex
73+
keysmap[string]sizeRevision
7274
}
7375

7476
typesizeRevisionstruct {
@@ -77,23 +79,34 @@ type sizeRevision struct {
7779
}
7880

7981
func (sc*statsCache)Stats(ctx context.Context) (storage.Stats,error) {
80-
keys,err:=sc.getKeys(ctx)
82+
keys,err:=sc.GetKeys(ctx)
8183
iferr!=nil {
8284
return storage.Stats{},err
8385
}
8486
stats:= storage.Stats{
8587
ObjectCount:int64(len(keys)),
8688
}
87-
sc.lock.Lock()
88-
defersc.lock.Unlock()
89+
sc.keysLock.Lock()
90+
defersc.keysLock.Unlock()
8991
sc.cleanKeys(keys)
9092
iflen(sc.keys)!=0 {
9193
stats.EstimatedAverageObjectSizeBytes=sc.keySizes()/int64(len(sc.keys))
9294
}
9395
returnstats,nil
9496
}
9597

98+
func (sc*statsCache)GetKeys(ctx context.Context) ([]string,error) {
99+
sc.getKeysLock.Lock()
100+
getKeys:=sc.getKeys
101+
sc.getKeysLock.Unlock()
102+
103+
// Don't execute getKeys under lock.
104+
returngetKeys(ctx)
105+
}
106+
96107
func (sc*statsCache)SetKeysFunc(keys storage.KeysFunc) {
108+
sc.getKeysLock.Lock()
109+
defersc.getKeysLock.Unlock()
97110
sc.getKeys=keys
98111
}
99112

@@ -103,27 +116,27 @@ func (sc *statsCache) Close() {
103116
}
104117

105118
func (sc*statsCache)run() {
106-
err:=wait.PollUntilContextCancel(wait.ContextForChannel(sc.stop),sizerRefreshInterval,false,func(ctx context.Context) (donebool,errerror) {
107-
sc.cleanKeysIfNeeded(ctx)
108-
returnfalse,nil
109-
})
110-
iferr!=nil {
111-
klog.InfoS("Sizer exiting")
119+
jitter:=0.5// Period between [interval, interval * (1.0 + jitter)]
120+
sliding:=true
121+
// wait.JitterUntilWithContext starts work immediately, so wait first.
122+
select {
123+
case<-time.After(wait.Jitter(sizerRefreshInterval,jitter)):
124+
case<-sc.stop:
112125
}
126+
wait.JitterUntilWithContext(wait.ContextForChannel(sc.stop),sc.cleanKeysIfNeeded,sizerRefreshInterval,jitter,sliding)
113127
}
114128

115129
func (sc*statsCache)cleanKeysIfNeeded(ctx context.Context) {
116130
lastKeyCleanup:=sc.lastKeyCleanup.Load()
117131
iflastKeyCleanup!=nil&&time.Since(*lastKeyCleanup)<sizerRefreshInterval {
118132
return
119133
}
120-
// Don't execute getKeys under lock.
121-
keys,err:=sc.getKeys(ctx)
134+
keys,err:=sc.GetKeys(ctx)
122135
iferr!=nil {
123136
klog.InfoS("Error getting keys","err",err)
124137
}
125-
sc.lock.Lock()
126-
defersc.lock.Unlock()
138+
sc.keysLock.Lock()
139+
defersc.keysLock.Unlock()
127140
sc.cleanKeys(keys)
128141
}
129142

@@ -157,16 +170,16 @@ func (sc *statsCache) keySizes() (totalSize int64) {
157170
}
158171

159172
func (sc*statsCache)Update(kvs []*mvccpb.KeyValue) {
160-
sc.lock.Lock()
161-
defersc.lock.Unlock()
173+
sc.keysLock.Lock()
174+
defersc.keysLock.Unlock()
162175
for_,kv:=rangekvs {
163176
sc.updateKey(kv)
164177
}
165178
}
166179

167180
func (sc*statsCache)UpdateKey(kv*mvccpb.KeyValue) {
168-
sc.lock.Lock()
169-
defersc.lock.Unlock()
181+
sc.keysLock.Lock()
182+
defersc.keysLock.Unlock()
170183

171184
sc.updateKey(kv)
172185
}
@@ -185,8 +198,8 @@ func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) {
185198
}
186199

187200
func (sc*statsCache)DeleteKey(kv*mvccpb.KeyValue) {
188-
sc.lock.Lock()
189-
defersc.lock.Unlock()
201+
sc.keysLock.Lock()
202+
defersc.keysLock.Unlock()
190203

191204
key:=string(kv.Key)
192205
keySizeRevision:=sc.keys[key]

‎test/integration/metrics/metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,8 +645,8 @@ func TestWatchCacheConsistencyCheckMetrics(t *testing.T) {
645645
iferr!=nil {
646646
t.Fatal(err)
647647
}
648-
//Do at least2 scrape cyclesto require 2 successes
649-
delay:=2*period
648+
//wait 3 periods to for2 scrape cycles(takes 1-1.5 period) and require 2 successes
649+
delay:=3*period
650650
time.Sleep(delay)
651651
resp,err:=rt.RoundTrip(req)
652652
iferr!=nil {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp