Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd90737a

Browse files
committed
Move locks from map file to transient array i shared memory
1 parent8d4046d commitd90737a

File tree

3 files changed

+222
-158
lines changed

3 files changed

+222
-158
lines changed

‎src/backend/storage/file/cfs.c

Lines changed: 121 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
392392
*/
393393
intcfs_shmem_size()
394394
{
395-
returnsizeof(CfsState);
395+
returnsizeof(CfsState)+sizeof(pg_atomic_uint32)*MaxBackends;
396396
}
397397

398398
voidcfs_initialize()
@@ -401,6 +401,8 @@ void cfs_initialize()
401401
cfs_state= (CfsState*)ShmemInitStruct("CFS Control",sizeof(CfsState),&found);
402402
if (!found)
403403
{
404+
inti;
405+
404406
memset(&cfs_state->gc_stat,0,sizeofcfs_state->gc_stat);
405407
pg_atomic_init_flag(&cfs_state->gc_started);
406408
pg_atomic_init_u32(&cfs_state->n_active_gc,0);
@@ -409,6 +411,9 @@ void cfs_initialize()
409411
cfs_state->gc_enabled= true;
410412
cfs_state->max_iterations=0;
411413

414+
for (i=0;i<MaxBackends;i++)
415+
pg_atomic_init_u32(&cfs_state->locks[i],0);
416+
412417
if (cfs_encryption)
413418
cfs_crypto_init();
414419

@@ -449,11 +454,6 @@ FileMap* cfs_mmap(int md)
449454
#else
450455
map= (FileMap*)mmap(NULL,sizeof(FileMap),PROT_WRITE |PROT_READ,MAP_SHARED,md,0);
451456
#endif
452-
if (map!=MAP_FAILED&&map->postmasterPid!=PostmasterPid)
453-
{
454-
map->postmasterPid=PostmasterPid;
455-
pg_atomic_write_u32(&map->lock,0);
456-
}
457457
returnmap;
458458
}
459459

@@ -606,65 +606,126 @@ static bool cfs_recover(FileMap* map, int md,
606606
unlink(file_bck_path);
607607
unlink(map_bck_path);
608608
}
609+
if (ok)
610+
pg_atomic_write_u32(&map->gc_active, false);/* clear the GC flag */
609611
returnok;
610612
}
611613

612614
/*
613-
* Protects file from GC
615+
* Get lock entry for this file.
616+
* Size of array of locks is equal to maximal number of backends, because there are cann't be more than MaxBackens active locks.
614617
*/
615-
voidcfs_lock_file(FileMap*map,intmd,charconst*file_path)
618+
staticpg_atomic_uint32*
619+
cfs_get_lock(charconst*file_path)
620+
{
621+
uint32hash=string_hash(file_path,0);
622+
return&cfs_state->locks[hash %MaxBackends];
623+
}
624+
625+
/*
626+
* Set GC exclusive lock preventing all backends from accessing this file
627+
*/
628+
staticvoid
629+
cfs_gc_lock(pg_atomic_uint32*lock)
616630
{
631+
uint32count=pg_atomic_fetch_or_u32(lock,CFS_GC_LOCK);
617632
longdelay=CFS_LOCK_MIN_TIMEOUT;
618633

619-
while (true)
634+
while ((count& ~CFS_GC_LOCK)!=1)
620635
{
621-
uint32count=pg_atomic_fetch_add_u32(&map->lock,1);
622-
623-
if (count<CFS_GC_LOCK)
636+
pg_usleep(delay);
637+
CHECK_FOR_INTERRUPTS();
638+
count=pg_atomic_read_u32(lock);
639+
if (delay<CFS_LOCK_MAX_TIMEOUT)
624640
{
625-
/* No GC is active for this segment */
626-
break;
641+
delay *=2;
627642
}
643+
}
644+
pg_memory_barrier();
645+
}
628646

629-
if (pg_atomic_read_u32(&cfs_state->n_active_gc)==0)
630-
{
631-
/* There is no active GC, so lock is set by crashed GC */
647+
/*
648+
* Release CFS GC lock
649+
*/
650+
staticvoidcfs_gc_unlock(pg_atomic_uint32*lock)
651+
{
652+
pg_write_barrier();
653+
pg_atomic_fetch_and_u32(lock, ~CFS_GC_LOCK);
654+
}
632655

633-
LWLockAcquire(CfsGcLock,LW_EXCLUSIVE);/* Prevent race condition with GC */
656+
/*
657+
* Set shared acess lock, preventing GC of this file
658+
*/
659+
staticvoid
660+
cfs_access_lock(charconst*file_path)
661+
{
662+
pg_atomic_uint32*lock=cfs_get_lock(file_path);
663+
longdelay=CFS_LOCK_MIN_TIMEOUT;
634664

635-
/* Recheck under CfsGcLock that map->lock was not released */
636-
if (pg_atomic_read_u32(&map->lock) >=CFS_GC_LOCK)
637-
{
638-
/* Uhhh... looks like last GC was interrupted.
639-
* Try to recover the file.
640-
*/
641-
char*map_path=psprintf("%s.cfm",file_path);
642-
char*map_bck_path=psprintf("%s.cfm.bck",file_path);
643-
char*file_bck_path=psprintf("%s.bck",file_path);
644-
645-
if (!cfs_recover(map,md,file_path,map_path,file_bck_path,map_bck_path))
646-
{
647-
pg_atomic_fetch_sub_u32(&map->lock,1);
648-
LWLockRelease(CfsGcLock);
649-
elog(ERROR,"CFS found that file %s is completely destroyed",file_path);
650-
}
665+
/* Increment number of locks and wait until there is no active GC for this segment */
666+
while (true)
667+
{
668+
uint32count=pg_atomic_fetch_add_u32(lock,1);
651669

652-
count=pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* revoke GC lock */
653-
Assert((int)count>0);
654-
pfree(file_bck_path);
655-
pfree(map_bck_path);
656-
pfree(map_path);
657-
}
658-
LWLockRelease(CfsGcLock);
670+
if (count<CFS_GC_LOCK)
671+
{
672+
/* No GC is active for this segment */
673+
return;
659674
}
660675
/* Wait until GC of segment is completed */
661-
pg_atomic_fetch_sub_u32(&map->lock,1);
676+
pg_atomic_fetch_sub_u32(lock,1);
662677
pg_usleep(delay);
678+
CHECK_FOR_INTERRUPTS();
663679
if (delay<CFS_LOCK_MAX_TIMEOUT)
664680
{
665681
delay *=2;
666682
}
667683
}
684+
}
685+
686+
/*
687+
* Protects file from GC and checks whether recovery of the file is needed
688+
*/
689+
voidcfs_lock_file(FileMap*map,intmd,charconst*file_path)
690+
{
691+
cfs_access_lock(file_path);
692+
693+
if (pg_atomic_read_u32(&map->gc_active))/* Non-zero value of map->gc_active indicates that GC was not successfully completed during previous Postges session */
694+
{
695+
LWLockAcquire(CfsGcLock,LW_EXCLUSIVE);/* Prevent race condition with GC */
696+
697+
/* Recheck under CfsGcLock that map->gc_active was not released */
698+
if (pg_atomic_read_u32(&map->gc_active))
699+
{
700+
/* Uhhh... looks like last GC was interrupted.
701+
* Try to recover the file.
702+
*/
703+
char*map_path=psprintf("%s.cfm",file_path);
704+
char*map_bck_path=psprintf("%s.cfm.bck",file_path);
705+
char*file_bck_path=psprintf("%s.bck",file_path);
706+
707+
if (!cfs_recover(map,md,file_path,map_path,file_bck_path,map_bck_path))
708+
{
709+
cfs_unlock_file(map,file_path);
710+
LWLockRelease(CfsGcLock);
711+
elog(ERROR,"CFS found that file %s is completely destroyed",file_path);
712+
}
713+
714+
pfree(file_bck_path);
715+
pfree(map_bck_path);
716+
pfree(map_path);
717+
}
718+
LWLockRelease(CfsGcLock);
719+
}
720+
}
721+
722+
/*
723+
* Start background GC workers if not start yet.
724+
* It is done lazily on forst data file access.
725+
* Is there some better place to start background workers?
726+
*/
727+
voidcfs_start_background_workers(void)
728+
{
668729

669730
if (IsUnderPostmaster&&cfs_gc_workers!=0
670731
&&pg_atomic_test_set_flag(&cfs_state->gc_started))
@@ -676,9 +737,10 @@ void cfs_lock_file(FileMap* map, int md, char const* file_path)
676737
/*
677738
* Release file lock
678739
*/
679-
voidcfs_unlock_file(FileMap*map)
740+
voidcfs_unlock_file(FileMap*map,charconst*file_path)
680741
{
681-
pg_atomic_fetch_sub_u32(&map->lock,1);
742+
pg_atomic_uint32*lock=cfs_get_lock(file_path);
743+
pg_atomic_fetch_sub_u32(lock,1);
682744
}
683745

684746
/*
@@ -727,7 +789,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
727789
char*file_path= (char*)palloc(suf+1);
728790
char*map_bck_path= (char*)palloc(suf+10);
729791
char*file_bck_path= (char*)palloc(suf+5);
730-
uint32count;
731792
intrc;
732793

733794
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
@@ -752,7 +813,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
752813
exit(1);
753814

754815
ResetLatch(MyLatch);
755-
CHECK_FOR_INTERRUPTS();
756816

757817
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
758818
}
@@ -784,8 +844,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
784844
strcat(strcpy(file_bck_path,file_path),".bck");
785845

786846
/* mostly same as for cfs_lock_file */
787-
count=pg_atomic_fetch_add_u32(&map->lock,1);
788-
if (count >=CFS_GC_LOCK)
847+
if (pg_atomic_read_u32(&map->gc_active))/* Check if GC was not normally completed at previous Postgres run */
789848
{
790849
/* there could not be concurrent GC for this file here, so recover */
791850
if (!cfs_recover(map,md,file_path,map_path,file_bck_path,map_bck_path))
@@ -821,11 +880,14 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
821880
intusecs,usecs2;
822881
inti,size;
823882
uint32offs;
883+
pg_atomic_uint32*lock;
824884
off_trcPG_USED_FOR_ASSERTS_ONLY;
825885

826886
startTime=GetCurrentTimestamp();
827887
secondTime=startTime;
828888

889+
lock=cfs_get_lock(file_path);
890+
829891
fd2=open(file_bck_path,O_CREAT|O_RDWR|PG_BINARY|O_TRUNC,0600);
830892
if (fd2<0)
831893
{
@@ -855,13 +917,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
855917
}
856918

857919
/* temporary lock file for fetching map snapshot */
858-
count=pg_atomic_fetch_or_u32(&map->lock,CFS_GC_LOCK);
859-
while ((count& ~CFS_GC_LOCK)!=1)
860-
{
861-
pg_usleep(10);
862-
count=pg_atomic_read_u32(&map->lock);
863-
}
864-
pg_memory_barrier();
920+
cfs_gc_lock(lock);
921+
865922
/* Reread variables after locking file */
866923
virtSize=pg_atomic_read_u32(&map->hdr.virtSize);
867924
n_pages=virtSize /BLCKSZ;
@@ -873,7 +930,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
873930
inodes[i]=&newMap->inodes[i];
874931
}
875932
/* may unlock until second phase */
876-
pg_atomic_fetch_and_u32(&map->lock, ~CFS_GC_LOCK);
933+
cfs_gc_unlock(lock);
877934

878935
/* sort inodes by offset to improve read locality */
879936
qsort(inodes,n_pages,sizeof(inode_t*),cfs_cmp_page_offs);
@@ -950,13 +1007,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
9501007

9511008
secondTime=GetCurrentTimestamp();
9521009

953-
count=pg_atomic_fetch_or_u32(&map->lock,CFS_GC_LOCK);
954-
while ((count& ~CFS_GC_LOCK)!=1)
955-
{
956-
pg_usleep(10);
957-
count=pg_atomic_read_u32(&map->lock);
958-
}
959-
pg_memory_barrier();
1010+
cfs_gc_lock(lock);
1011+
9601012
/* Reread variables after locking file */
9611013
virtSize=pg_atomic_read_u32(&map->hdr.virtSize);
9621014
n_pages=virtSize /BLCKSZ;
@@ -1054,6 +1106,8 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
10541106
pg_atomic_write_u32(&newMap->hdr.physSize,newSize);
10551107
pg_atomic_write_u32(&newMap->hdr.virtSize,virtSize);
10561108

1109+
pg_atomic_write_u32(&newMap->gc_active, true);/* Indicate start of GC */
1110+
10571111
/* Persist copy of map file */
10581112
if (!cfs_write_file(md2,&newMap->hdr,sizeof(newMap->hdr)))
10591113
{
@@ -1116,7 +1170,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
11161170

11171171
if (res!=BLCKSZ)
11181172
{
1119-
pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK+1);/* release lock */
1173+
pg_atomic_fetch_sub_u32(lock,CFS_GC_LOCK);/* release lock */
11201174
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc,1);
11211175
elog(ERROR,"CFS: verification failed for block %u position %u size %u of relation %s: error code %d",
11221176
i, (int)CFS_INODE_OFFS(inode),size,file_bck_path, (int)res);
@@ -1146,6 +1200,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
11461200
memcpy(map->inodes,newMap->inodes,n_pages*sizeof(inode_t));
11471201
pg_atomic_write_u32(&map->hdr.usedSize,newUsed);
11481202
pg_atomic_write_u32(&map->hdr.physSize,newSize);
1203+
pg_atomic_write_u32(&map->gc_active, false);
11491204
map->generation+=1;/* force all backends to reopen the file */
11501205

11511206
/* Before removing backup files and releasing locks
@@ -1159,7 +1214,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
11591214
{
11601215
elog(WARNING,"CFS failed to sync file %s: %m",map_path);
11611216

1162-
Cleanup:
1217+
Cleanup:
11631218
if (fd >=0)close(fd);
11641219
if (fd2 >=0)close(fd2);
11651220
if (md2 >=0)close(md2);
@@ -1174,8 +1229,7 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
11741229
else
11751230
remove_backups= true;/* we don't need backups anymore */
11761231

1177-
pg_write_barrier();
1178-
pg_atomic_fetch_and_u32(&map->lock, ~CFS_GC_LOCK);/* release gc lock */
1232+
cfs_gc_unlock(lock);
11791233

11801234
/* remove map backup file */
11811235
if (remove_backups&&unlink(map_bck_path))
@@ -1210,7 +1264,6 @@ static bool cfs_gc_file(char* map_path, GC_CALL_KIND background)
12101264
MyProcPid,suf,map_path,physSize,virtSize,usedSize, (double)virtSize/physSize);
12111265

12121266
FinUnmap:
1213-
pg_atomic_fetch_sub_u32(&map->lock,1);/* release read lock */
12141267
if (cfs_munmap(map)<0)
12151268
{
12161269
elog(WARNING,"CFS failed to unmap file %s: %m",map_path);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp