Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit782e1dd

Browse files
committed
feat: add a storage pool rotation (#217)
1 parentdbabb6c commit782e1dd

File tree

12 files changed

+105
-164
lines changed

12 files changed

+105
-164
lines changed

‎configs/config.example.logical_generic.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ provision:
9797
# Custom parameters for clone containers, see
9898
# https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources
9999
containerConfig:
100-
"shm-size":256MB
100+
"shm-size":1gb
101101

102102
# Data retrieval flow. This section defines both initial retrieval, and rules
103103
# to keep the data directory in a synchronized state with the source. Both are optional:

‎configs/config.example.logical_rds_iam.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size":256MB
99+
"shm-size":1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

‎configs/config.example.physical_generic.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size":256MB
99+
"shm-size":1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

‎configs/config.example.physical_walg.yml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size":256MB
99+
"shm-size":1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

‎pkg/retrieval/engine/postgres/logical/restore.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (r *RestoreJob) retrieveDataStateAt(ctx context.Context, contID string) (st
285285
returndataStateAt,nil
286286
}
287287

288-
// updateDataStateAt updates dataStateAt for in-memory representation of afilesystem pool.
288+
// updateDataStateAt updates dataStateAt for in-memory representation of astorage pool.
289289
func (r*RestoreJob)updateDataStateAt() {
290290
dsaTime,err:=time.Parse(util.DataStateAtFormat,r.dbMark.DataStateAt)
291291
iferr!=nil {

‎pkg/retrieval/engine/postgres/snapshot/physical.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ func (p *PhysicalInitial) markDatabaseData() error {
671671
returnp.dbMarker.SaveConfig(p.dbMark)
672672
}
673673

674-
// updateDataStateAt updates dataStateAt for in-memory representation of afilesystem pool.
674+
// updateDataStateAt updates dataStateAt for in-memory representation of astorage pool.
675675
func (p*PhysicalInitial)updateDataStateAt() {
676676
dsaTime,err:=time.Parse(util.DataStateAtFormat,p.dbMark.DataStateAt)
677677
iferr!=nil {

‎pkg/retrieval/retrieval.go‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func (r *Retrieval) refreshFunc(ctx context.Context) func() {
240240
}
241241
}
242242

243-
// fullRefreshmakes afull refresh for anold filesystem pool.
243+
// fullRefreshperformsfull refresh for anunused storage pool and makes it active.
244244
func (r*Retrieval)fullRefresh(ctx context.Context)error {
245245
// Stop previous runs and snapshot schedulers.
246246
ifr.ctxCancel!=nil {
@@ -249,13 +249,18 @@ func (r *Retrieval) fullRefresh(ctx context.Context) error {
249249

250250
runCtx,cancel:=context.WithCancel(ctx)
251251
r.ctxCancel=cancel
252-
poolToUpdate:=r.poolManager.Oldest()
252+
elementToUpdate:=r.poolManager.GetPoolToUpdate()
253253

254-
ifpoolToUpdate==nil {
255-
log.Msg("Pool toa full refresh not found. Skip refreshing.")
254+
ifelementToUpdate==nil||elementToUpdate.Value==nil {
255+
log.Msg("Pool toperform full refresh not found. Skip refreshing")
256256
returnnil
257257
}
258258

259+
poolToUpdate,err:=r.poolManager.GetFSManager(elementToUpdate.Value.(string))
260+
iferr!=nil {
261+
returnerrors.Wrap(err,"failed to get FSManager")
262+
}
263+
259264
log.Msg("Pool to a full refresh: ",poolToUpdate.Pool())
260265

261266
iferr:=preparePoolToRefresh(poolToUpdate);err!=nil {
@@ -269,14 +274,11 @@ func (r *Retrieval) fullRefresh(ctx context.Context) error {
269274
returncleanUpErr
270275
}
271276

272-
current:=r.poolManager.Active()
273-
274277
iferr:=r.run(runCtx,poolToUpdate);err!=nil {
275278
returnerr
276279
}
277280

278-
r.poolManager.SetOldest(current)
279-
r.poolManager.SetActive(poolToUpdate)
281+
r.poolManager.SetActive(elementToUpdate)
280282

281283
returnnil
282284
}

‎pkg/services/provision/pool/pool_manager.go‎

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package pool
66

77
import (
8+
"container/list"
89
"io/ioutil"
910
"os"
1011
"path"
@@ -34,9 +35,8 @@ const (
3435
typeManagerstruct {
3536
cfg*Config
3637
mu*sync.Mutex
38+
fsManagerList*list.List
3739
fsManagerPoolmap[string]FSManager
38-
fsManagerFSManager
39-
oldFsManagerFSManager
4040
runner runners.Runner
4141
blockDeviceTypesmap[string]string
4242
}
@@ -58,6 +58,7 @@ func NewPoolManager(cfg *Config, runner runners.Runner) *Manager {
5858
fsManagerPool:make(map[string]FSManager),
5959
runner:runner,
6060
blockDeviceTypes:make(map[string]string),
61+
fsManagerList:list.New(),
6162
}
6263
}
6364

@@ -68,24 +69,56 @@ func (pm *Manager) Reload(cfg Config) error {
6869
returnpm.ReloadPools()
6970
}
7071

71-
//Active returns theactivefilesystempool manager.
72-
func (pm*Manager)Active()FSManager {
73-
returnpm.fsManager
72+
//SetActive sets a newactive pool manager element.
73+
func (pm*Manager)SetActive(element*list.Element) {
74+
pm.fsManagerList.MoveToFront(element)
7475
}
7576

76-
// SetActive sets a new active pool manager.
77-
func (pm*Manager)SetActive(activeFSManager) {
78-
pm.fsManager=active
77+
// Active returns the active storage pool manager.
78+
func (pm*Manager)Active()FSManager {
79+
active:=pm.fsManagerList.Front()
80+
81+
ifactive==nil||active.Value==nil {
82+
returnnil
83+
}
84+
85+
returnpm.getFSManager(active.Value.(string))
7986
}
8087

81-
// Oldest returns the oldest filesystem pool manager.
82-
func (pm*Manager)Oldest()FSManager {
83-
returnpm.oldFsManager
88+
func (pm*Manager)getFSManager(poolstring)FSManager {
89+
pm.mu.Lock()
90+
fsm:=pm.fsManagerPool[pool]
91+
pm.mu.Unlock()
92+
93+
returnfsm
8494
}
8595

86-
// SetOldest sets a pool manager to update.
87-
func (pm*Manager)SetOldest(poolFSManager) {
88-
pm.oldFsManager=pool
96+
// GetPoolToUpdate returns the element to update.
97+
func (pm*Manager)GetPoolToUpdate()*list.Element {
98+
forelement:=pm.fsManagerList.Back();element!=nil;element=element.Prev() {
99+
ifelement.Value==nil {
100+
returnnil
101+
}
102+
103+
// The active pool cannot be updated as it leads to downtime.
104+
ifelement==pm.fsManagerList.Front() {
105+
returnnil
106+
}
107+
108+
fsm:=pm.getFSManager(element.Value.(string))
109+
110+
clones,err:=fsm.ListClonesNames()
111+
iferr!=nil {
112+
log.Err("failed to list clones",err)
113+
returnnil
114+
}
115+
116+
iflen(clones)==0 {
117+
returnelement
118+
}
119+
}
120+
121+
returnnil
89122
}
90123

91124
// GetFSManager returns a filesystem manager by name if exists.
@@ -130,24 +163,14 @@ func (pm *Manager) ReloadPools() error {
130163
fsPools:=pm.examineEntries(entries)
131164

132165
iflen(fsPools)==0 {
133-
returnerrors.New("no available filesystem pools")
134-
}
135-
136-
active,old:=pm.detectWorkingPools(fsPools)
137-
138-
ifactive==nil {
139-
returnerrors.New("active pool not found: make sure it exists")
166+
returnerrors.New("no available pools")
140167
}
141168

142169
pm.mu.Lock()
143-
144170
pm.fsManagerPool=fsPools
145-
pm.SetActive(active)
146-
pm.SetOldest(old)
147-
148171
pm.mu.Unlock()
149172

150-
log.Msg("AvailableFS pools: ",pm.describeAvailablePools())
173+
log.Msg("Availablestorage pools: ",pm.describeAvailablePools())
151174
log.Msg("Active pool: ",pm.Active().Pool().Name)
152175

153176
returnnil
@@ -208,6 +231,13 @@ func (pm *Manager) examineEntries(entries []os.FileInfo) map[string]FSManager {
208231

209232
// TODO(akartasov): extract pool name.
210233
fsManagers[entry.Name()]=fsm
234+
235+
ifpm.Active()==nil||pm.Active().Pool().DSA.Before(pool.DSA) {
236+
pm.fsManagerList.PushFront(fsm.Pool().Name)
237+
continue
238+
}
239+
240+
pm.fsManagerList.PushBack(fsm.Pool().Name)
211241
}
212242

213243
returnfsManagers
@@ -226,33 +256,6 @@ func (pm *Manager) reloadBlockDevices() error {
226256
returnnil
227257
}
228258

229-
func (pm*Manager)detectWorkingPools(fsmmap[string]FSManager) (FSManager,FSManager) {
230-
varfsManager,oldFSManager
231-
232-
for_,manager:=rangefsm {
233-
iffsManager==nil {
234-
fsManager=manager
235-
continue
236-
}
237-
238-
iffsManager.Pool().DSA.Before(manager.Pool().DSA) {
239-
ifold==nil {
240-
old=fsManager
241-
}
242-
243-
fsManager=manager
244-
245-
continue
246-
}
247-
248-
ifold==nil||manager.Pool().DSA.Before(old.Pool().DSA) {
249-
old=manager
250-
}
251-
}
252-
253-
returnfsManager,old
254-
}
255-
256259
funcextractDataStateAt(dataPathstring) (*time.Time,error) {
257260
marker:=dbmarker.NewMarker(dataPath)
258261

@@ -294,13 +297,14 @@ func (pm *Manager) getFSInfo(path string) (string, error) {
294297
func (pm*Manager)describeAvailablePools() []string {
295298
availablePools:= []string{}
296299

297-
pm.mu.Lock()
300+
forel:=pm.fsManagerList.Front();el!=nil;el=el.Next() {
301+
ifel.Value==nil {
302+
log.Err("empty element: skip listing")
303+
continue
304+
}
298305

299-
for_,fsm:=rangepm.fsManagerPool {
300-
availablePools=append(availablePools,fsm.Pool().DataDir())
306+
availablePools=append(availablePools,el.Value.(string))
301307
}
302308

303-
pm.mu.Unlock()
304-
305309
returnavailablePools
306310
}

‎pkg/services/provision/pool/pool_manager_test.go‎

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp