Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit06c0486

Browse files
committed
Fix race condition in CFS GC
1 parent17b656b commit06c0486

File tree

6 files changed

+132
-35
lines changed

6 files changed

+132
-35
lines changed

‎doc/src/sgml/cfs.sgml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,13 @@
311311
It is calculated as sum of physical sizes of the files minus sum of used size of the files divided by sum of physical sizes of the files.
312312
</para>
313313

314+
<para>
315+
Particular relation can be defragmented using <varname>cfs_gc_relation(relation)</varname> function.
316+
This function can be used only if there are no active GC workers (<varname>cfs_gc_workers</varname> equals to zero).
317+
It returns number of defragmented segments of relation. If there is on relation with such OID or it is not compressed
318+
or some other GC process is active, 0 is returned.
319+
</para>
320+
314321
<para>
315322
There are several functions allowing to monitors garbage collection activity:
316323
<varname>cfs_gc_activity_scanned_files</varname> returns number of files scanned by GC,

‎src/backend/storage/file/cfs.c

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ int cfs_gc_period;
5757
intcfs_gc_delay;
5858
intcfs_level;
5959
boolcfs_encryption;
60+
boolcfs_gc_verify_file;
6061

6162
staticboolcfs_read_file(intfd,void*data,uint32size);
6263
staticboolcfs_write_file(intfd,voidconst*data,uint32size);
@@ -65,7 +66,7 @@ static void cfs_start_background_gc(void);
6566
CfsState*cfs_state;
6667

6768
staticboolcfs_stop;
68-
69+
staticintcfs_processed_segments;
6970

7071
#ifCFS_COMPRESSOR==SNAPPY_COMPRESSOR
7172

@@ -335,6 +336,9 @@ int cfs_munmap(FileMap* map)
335336
#endif
336337
}
337338

339+
/*
340+
* Protects file from GC
341+
*/
338342
voidcfs_lock_file(FileMap*map,charconst*file_path)
339343
{
340344
longdelay=CFS_LOCK_MIN_TIMEOUT;
@@ -356,7 +360,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
356360
if (md2 >=0) {
357361
/* Recover map */
358362
if (!cfs_read_file(md2,map,sizeof(FileMap))) {
359-
elog(LOG,"Failed to read file %s: %m",map_bck_path);
363+
elog(LOG,"CFS failed to read file %s: %m",map_bck_path);
360364
}
361365
close(md2);
362366
}
@@ -384,7 +388,7 @@ void cfs_lock_file(FileMap* map, char const* file_path)
384388
}
385389

386390
/*
387-
*Protects filefrom GC
391+
*Release filelock
388392
*/
389393
voidcfs_unlock_file(FileMap*map)
390394
{
@@ -491,12 +495,12 @@ static bool cfs_gc_file(char* map_path)
491495
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
492496
}
493497
if (md<0) {
494-
elog(LOG,"Failed to open map file %s: %m",map_path);
498+
elog(LOG,"CFS failed to open map file %s: %m",map_path);
495499
gotoFinishGC;
496500
}
497501
map=cfs_mmap(md);
498502
if (map==MAP_FAILED) {
499-
elog(LOG,"Failed to map file %s: %m",map_path);
503+
elog(LOG,"CFS failed to map file %s: %m",map_path);
500504
close(md);
501505
gotoFinishGC;
502506
}
@@ -517,7 +521,7 @@ static bool cfs_gc_file(char* map_path)
517521
uint32newSize=0;
518522
inode_t**inodes= (inode_t**)palloc(RELSEG_SIZE*sizeof(inode_t*));
519523
boolremove_backups= true;
520-
intn_pages=virtSize /BLCKSZ;
524+
intn_pages;
521525
TimestampTzstartTime,endTime;
522526
longsecs;
523527
intusecs;
@@ -549,7 +553,7 @@ static bool cfs_gc_file(char* map_path)
549553
if (md2 >=0) {
550554
/* Recover map */
551555
if (!cfs_read_file(md2,newMap,sizeof(FileMap))) {
552-
elog(LOG,"Failed to read file %s: %m",map_bck_path);
556+
elog(LOG,"CFS failed to read file %s: %m",map_bck_path);
553557
gotoCleanup;
554558
}
555559
close(md2);
@@ -572,6 +576,12 @@ static bool cfs_gc_file(char* map_path)
572576
delay *=2;
573577
}
574578
}
579+
/* Reread variables after lockign file */
580+
usedSize=pg_atomic_read_u32(&map->usedSize);
581+
physSize=pg_atomic_read_u32(&map->physSize);
582+
virtSize=pg_atomic_read_u32(&map->virtSize);
583+
n_pages=virtSize /BLCKSZ;
584+
575585
md2=open(map_bck_path,O_CREAT|O_RDWR|PG_BINARY|O_TRUNC,0600);
576586
if (md2<0) {
577587
gotoCleanup;
@@ -583,7 +593,7 @@ static bool cfs_gc_file(char* map_path)
583593
/* sort inodes by offset to improve read locality */
584594
qsort(inodes,n_pages,sizeof(inode_t*),cfs_cmp_page_offs);
585595

586-
fd=open(file_path,O_RDWR|PG_BINARY,0);
596+
fd=open(file_path,O_RDONLY|PG_BINARY,0);
587597
if (fd<0) {
588598
gotoCleanup;
589599
}
@@ -593,6 +603,7 @@ static bool cfs_gc_file(char* map_path)
593603
gotoCleanup;
594604
}
595605
cfs_state->gc_stat.processedFiles+=1;
606+
cfs_processed_segments+=1;
596607

597608
for (i=0;i<n_pages;i++) {
598609
intsize=CFS_INODE_SIZE(*inodes[i]);
@@ -605,12 +616,12 @@ static bool cfs_gc_file(char* map_path)
605616
Assert(rc==offs);
606617

607618
if (!cfs_read_file(fd,block,size)) {
608-
elog(LOG,"Failedto read file %s: %m",file_path);
619+
elog(LOG,"CFS GC failedto readblock %d offile %s at position %d size %d: %m",i,file_path,offs,size);
609620
gotoCleanup;
610621
}
611622

612623
if (!cfs_write_file(fd2,block,size)) {
613-
elog(LOG,"Failed to write file %s: %m",file_bck_path);
624+
elog(LOG,"CFS failed to write file %s: %m",file_bck_path);
614625
gotoCleanup;
615626
}
616627
cfs_state->gc_stat.processedBytes+=size;
@@ -621,58 +632,84 @@ static bool cfs_gc_file(char* map_path)
621632
*inodes[i]=CFS_INODE(size,offs);
622633
}
623634
}
624-
pg_atomic_write_u32(&map->usedSize,newSize);
625-
626635
if (close(fd)<0) {
627-
elog(LOG,"Failed to close file %s: %m",file_path);
636+
elog(LOG,"CFS failed to close file %s: %m",file_path);
628637
gotoCleanup;
629638
}
630639
fd=-1;
631640

632641
/* Persist copy of data file */
633642
if (pg_fsync(fd2)<0) {
634-
elog(LOG,"Failed to sync file %s: %m",file_bck_path);
643+
elog(LOG,"CFS failed to sync file %s: %m",file_bck_path);
635644
gotoCleanup;
636645
}
637646
if (close(fd2)<0) {
638-
elog(LOG,"Failed to close file %s: %m",file_bck_path);
647+
elog(LOG," CFS failed to close file %s: %m",file_bck_path);
639648
gotoCleanup;
640649
}
641650
fd2=-1;
642651

643652
/* Persist copy of map file */
644653
if (!cfs_write_file(md2,&newMap,sizeof(newMap))) {
645-
elog(LOG,"Failed to write file %s: %m",map_bck_path);
654+
elog(LOG,"CFS failed to write file %s: %m",map_bck_path);
646655
gotoCleanup;
647656
}
648657
if (pg_fsync(md2)<0) {
649-
elog(LOG,"Failed to sync file %s: %m",map_bck_path);
658+
elog(LOG,"CFS failed to sync file %s: %m",map_bck_path);
650659
gotoCleanup;
651660
}
652661
if (close(md2)<0) {
653-
elog(LOG,"Failed to close file %s: %m",map_bck_path);
662+
elog(LOG,"CFS failed to close file %s: %m",map_bck_path);
654663
gotoCleanup;
655664
}
656665
md2=-1;
657666

658667
/* Persist map with CFS_GC_LOCK set: in case of crash we will know that map may be changed by GC */
659668
if (cfs_msync(map)<0) {
660-
elog(LOG,"Failed to sync map %s: %m",map_path);
669+
elog(LOG,"CFS failed to sync map %s: %m",map_path);
661670
gotoCleanup;
662671
}
663672
if (pg_fsync(md)<0) {
664-
elog(LOG,"Failed to sync file %s: %m",map_path);
673+
elog(LOG,"CFS failed to sync file %s: %m",map_path);
665674
gotoCleanup;
666675
}
667676

677+
678+
if (cfs_gc_verify_file) {
679+
fd=open(file_bck_path,O_RDONLY|PG_BINARY,0);
680+
Assert(fd >=0);
681+
682+
for (i=0;i<n_pages;i++) {
683+
inode_tinode=newMap->inodes[i];
684+
intsize=CFS_INODE_SIZE(inode);
685+
if (size!=0) {
686+
charblock[BLCKSZ];
687+
chardecomressedBlock[BLCKSZ];
688+
off_tresPG_USED_FOR_ASSERTS_ONLY;
689+
boolrcPG_USED_FOR_ASSERTS_ONLY;
690+
res=lseek(fd,CFS_INODE_OFFS(inode),SEEK_SET);
691+
Assert(res== (off_t)CFS_INODE_OFFS(inode));
692+
rc=cfs_read_file(fd,block,size);
693+
Assert(rc);
694+
cfs_decrypt(block, (off_t)i*BLCKSZ,size);
695+
res=cfs_decompress(decomressedBlock,BLCKSZ,block,size);
696+
if (res!=BLCKSZ) {
697+
pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* release lock */
698+
elog(PANIC,"Verification failed for block %d of relation %s: error code %d",i,file_path, (int)res);
699+
}
700+
}
701+
}
702+
close(fd);
703+
}
704+
668705
/*
669706
* Now all information necessary for recovery is stored.
670707
* We are ready to replace existed file with defragmented one.
671708
* Use rename and rely on file system to provide atomicity of this operation.
672709
*/
673710
remove_backups= false;
674711
if (rename(file_bck_path,file_path)<0) {
675-
elog(LOG,"Failed to rename file %s: %m",file_path);
712+
elog(LOG,"CFS failed to rename file %s: %m",file_path);
676713
gotoCleanup;
677714
}
678715
ReplaceMap:
@@ -685,11 +722,11 @@ static bool cfs_gc_file(char* map_path)
685722

686723
/* Before removing backup files and releasing locks we need to flush updated map file */
687724
if (cfs_msync(map)<0) {
688-
elog(LOG,"Failed to sync map %s: %m",map_path);
725+
elog(LOG,"CFS failed to sync map %s: %m",map_path);
689726
gotoCleanup;
690727
}
691728
if (pg_fsync(md)<0) {
692-
elog(LOG,"Failed to sync file %s: %m",map_path);
729+
elog(LOG,"CFS failed to sync file %s: %m",map_path);
693730
Cleanup:
694731
if (fd >=0)close(fd);
695732
if (fd2 >=0)close(fd2);
@@ -703,11 +740,12 @@ static bool cfs_gc_file(char* map_path)
703740
}else {
704741
remove_backups= true;/* now backups are not need any more */
705742
}
743+
706744
pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* release lock */
707745

708746
/* remove map backup file */
709747
if (remove_backups&&unlink(map_bck_path)) {
710-
elog(LOG,"Failed to unlink file %s: %m",map_bck_path);
748+
elog(LOG,"CFS failed to unlink file %s: %m",map_bck_path);
711749
succeed= false;
712750
}
713751

@@ -739,11 +777,11 @@ static bool cfs_gc_file(char* map_path)
739777
}
740778

741779
if (cfs_munmap(map)<0) {
742-
elog(LOG,"Failed to unmap file %s: %m",map_path);
780+
elog(LOG,"CFS failed to unmap file %s: %m",map_path);
743781
succeed= false;
744782
}
745783
if (close(md)<0) {
746-
elog(LOG,"Failed to close file %s: %m",map_path);
784+
elog(LOG,"CFS failed to close file %s: %m",map_path);
747785
succeed= false;
748786
}
749787
FinishGC:
@@ -875,6 +913,7 @@ PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_bytes);
875913
PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_pages);
876914
PG_FUNCTION_INFO_V1(cfs_gc_activity_processed_files);
877915
PG_FUNCTION_INFO_V1(cfs_gc_activity_scanned_files);
916+
PG_FUNCTION_INFO_V1(cfs_gc_relation);
878917

879918
Datumcfs_start_gc(PG_FUNCTION_ARGS)
880919
{
@@ -1002,10 +1041,10 @@ Datum cfs_compression_ratio(PG_FUNCTION_ARGS)
10021041
physSize+=pg_atomic_read_u32(&map->physSize);
10031042

10041043
if (cfs_munmap(map)<0) {
1005-
elog(LOG,"Failed to unmap file %s: %m",map_path);
1044+
elog(LOG,"CFS failed to unmap file %s: %m",map_path);
10061045
}
10071046
if (close(md)<0) {
1008-
elog(LOG,"Failed to close file %s: %m",map_path);
1047+
elog(LOG,"CFS failed to close file %s: %m",map_path);
10091048
}
10101049
i+=1;
10111050
}
@@ -1051,10 +1090,10 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
10511090
physSize+=pg_atomic_read_u32(&map->physSize);
10521091

10531092
if (cfs_munmap(map)<0) {
1054-
elog(LOG,"Failed to unmap file %s: %m",map_path);
1093+
elog(LOG,"CFS failed to unmap file %s: %m",map_path);
10551094
}
10561095
if (close(md)<0) {
1057-
elog(LOG,"Failed to close file %s: %m",map_path);
1096+
elog(LOG,"CFS failed to close file %s: %m",map_path);
10581097
}
10591098
i+=1;
10601099
}
@@ -1065,6 +1104,36 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
10651104
PG_RETURN_FLOAT8((double)(physSize-usedSize)/physSize);
10661105
}
10671106

1107+
Datumcfs_gc_relation(PG_FUNCTION_ARGS)
1108+
{
1109+
cfs_processed_segments=0;
1110+
1111+
if (cfs_gc_workers==0&&pg_atomic_test_set_flag(&cfs_state->gc_started))
1112+
{
1113+
Oidoid=PG_GETARG_OID(0);
1114+
Relationrel=try_relation_open(oid,AccessShareLock);
1115+
1116+
if (rel!=NULL) {
1117+
char*path=relpathbackend(rel->rd_node,rel->rd_backend,MAIN_FORKNUM);
1118+
char*map_path= (char*)palloc(strlen(path)+16);
1119+
inti=0;
1120+
sprintf(map_path,"%s.cfm",path);
1121+
1122+
while (true) {
1123+
if (!cfs_gc_file(map_path)) {
1124+
break;
1125+
}
1126+
sprintf(map_path,"%s.%u.cfm",path,++i);
1127+
}
1128+
pfree(path);
1129+
pfree(map_path);
1130+
relation_close(rel,AccessShareLock);
1131+
}
1132+
pg_atomic_clear_flag(&cfs_state->gc_started);
1133+
}
1134+
PG_RETURN_INT32(cfs_processed_segments);
1135+
}
1136+
10681137
Datumcfs_gc_activity_processed_bytes(PG_FUNCTION_ARGS)
10691138
{
10701139
PG_RETURN_INT64(cfs_state->gc_stat.processedBytes);

‎src/backend/storage/file/fd.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,7 @@ FileWrite(File file, char *buffer, int amount)
18161816
intreturnCode;
18171817
charcompressedBuffer[CFS_MAX_COMPRESSED_SIZE(BLCKSZ)];
18181818
inode_tinode=0;
1819+
/*inode_t prev_inode;*/
18191820
off_tseekPos;
18201821

18211822
Assert(FileIsValid(file));
@@ -1867,6 +1868,7 @@ FileWrite(File file, char *buffer, int amount)
18671868
return-1;
18681869
}
18691870
inode=map->inodes[VfdCache[file].seekPos /BLCKSZ];
1871+
/*prev_inode = inode;*/
18701872
if (compressedSize>0&&compressedSize<CFS_MIN_COMPRESSED_SIZE(BLCKSZ)) {
18711873
Assert((VfdCache[file].seekPos& (BLCKSZ-1))==0);
18721874
/* Do not check that new image of compressed page fits into
@@ -1907,8 +1909,14 @@ FileWrite(File file, char *buffer, int amount)
19071909
{
19081910
if (VfdCache[file].fileFlags&PG_COMPRESSION) {
19091911
if (returnCode==amount)
1910-
{
1912+
{
1913+
/* Verify that there is no race condition
1914+
bool rc = pg_atomic_compare_exchange_u64((pg_atomic_uint64*)&VfdCache[file].map->inodes[VfdCache[file].seekPos / BLCKSZ],
1915+
&prev_inode, inode);
1916+
Assert(rc);
1917+
*/
19111918
VfdCache[file].map->inodes[VfdCache[file].seekPos /BLCKSZ]=inode;
1919+
/**/
19121920
VfdCache[file].seekPos+=BLCKSZ;
19131921
cfs_extend(VfdCache[file].map,VfdCache[file].seekPos);
19141922
returnCode=BLCKSZ;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp