Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita8d6ebf

Browse files
committed
Merge branch 'PGPROEE9_6_CFS_385' into PGPROEE9_6
2 parents89cd7eb +598030c commita8d6ebf

File tree

6 files changed

+172
-134
lines changed

6 files changed

+172
-134
lines changed

‎src/backend/storage/file/cfs.c

Lines changed: 130 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -389,23 +389,31 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
389389
*Section 3: Compression implementation.
390390
* ----------------------------------------------------------------
391391
*/
392-
voidcfs_initialize()
392+
intcfs_shmem_size()
393393
{
394-
cfs_state= (CfsState*)ShmemAlloc(sizeof(CfsState));
395-
memset(&cfs_state->gc_stat,0,sizeofcfs_state->gc_stat);
396-
pg_atomic_init_flag(&cfs_state->gc_started);
397-
pg_atomic_init_u32(&cfs_state->n_active_gc,0);
398-
cfs_state->n_workers=0;
399-
cfs_state->gc_enabled= true;
400-
cfs_state->max_iterations=0;
401-
402-
if (cfs_encryption)
403-
cfs_crypto_init();
404-
405-
elog(LOG,"Start CFS version %s compression algorithm %s encryption %s",
406-
CFS_VERSION,cfs_algorithm(),cfs_encryption ?"enabled" :"disabled");
394+
returnsizeof(CfsState);
407395
}
408396

397+
voidcfs_initialize()
398+
{
399+
boolfound;
400+
cfs_state= (CfsState*)ShmemInitStruct("CFS Control",sizeof(CfsState),&found);
401+
if (!found)
402+
{
403+
memset(&cfs_state->gc_stat,0,sizeofcfs_state->gc_stat);
404+
pg_atomic_init_flag(&cfs_state->gc_started);
405+
pg_atomic_init_u32(&cfs_state->n_active_gc,0);
406+
cfs_state->n_workers=0;
407+
cfs_state->gc_enabled=cfs_gc_enabled;
408+
cfs_state->max_iterations=0;
409+
410+
if (cfs_encryption)
411+
cfs_crypto_init();
412+
413+
elog(LOG,"Start CFS version %s compression algorithm %s encryption %s GC %s",
414+
CFS_VERSION,cfs_algorithm(),cfs_encryption ?"enabled" :"disabled",cfs_gc_enabled ?"enabled" :"disabled");
415+
}
416+
}
409417
intcfs_msync(FileMap*map)
410418
{
411419
#ifdefWIN32
@@ -540,86 +548,72 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
540548
voidcfs_lock_file(FileMap*map,charconst*file_path)
541549
{
542550
longdelay=CFS_LOCK_MIN_TIMEOUT;
543-
intn_attempts=0;
544551

545552
while (true)
546553
{
547-
uint64count=pg_atomic_fetch_add_u32(&map->lock,1);
548-
boolrevokeLock= false;
554+
uint32count=pg_atomic_fetch_add_u32(&map->lock,1);
549555

550556
if (count<CFS_GC_LOCK)
551-
break;
552-
553-
if (InRecovery)
554557
{
555-
revokeLock= true;
556-
}
557-
else
558-
{
559-
if (pg_atomic_unlocked_test_flag(&cfs_state->gc_started))
560-
{
561-
if (++n_attempts>MAX_LOCK_ATTEMPTS)
562-
{
563-
/* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564-
* Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565-
* without releasing lock. And for some reasons recovery was not performed and this page left locked.
566-
* We should revoke the the lock to allow access to this segment.
567-
*/
568-
revokeLock= true;
569-
elog(WARNING,"CFS revokes lock on file %s\n",file_path);
570-
}
571-
}
572-
else
573-
{
574-
n_attempts=0;/* Reset counter of attempts because GC is in progress */
575-
}
558+
/* No GC is active for this segment */
559+
break;
576560
}
577-
if (revokeLock
578-
/* use gc_started flag to prevent race condition with other backends and GC */
579-
&&pg_atomic_test_set_flag(&cfs_state->gc_started))
580-
{
581-
/* Ugggh... looks like last GC was interrupted.
582-
* Try to recover the file.
583-
*/
584-
char*map_bck_path=psprintf("%s.cfm.bck",file_path);
585-
char*file_bck_path=psprintf("%s.bck",file_path);
586561

587-
elog(WARNING,"CFS indicates that GC of %s was interrupted: try to perform recovery",file_path);
562+
if (pg_atomic_read_u32(&cfs_state->n_active_gc)==0)
563+
{
564+
/* There is no active GC, so lock is set by crashed GC */
588565

589-
if (access(file_bck_path,R_OK)!=0)
590-
{
591-
/* There is no backup file: new map should be constructed */
592-
intmd2=open(map_bck_path,O_RDWR|PG_BINARY,0);
593-
if (md2 >=0)
594-
{
595-
/* Recover map. */
596-
if (!cfs_read_file(md2,map,sizeof(FileMap)))
597-
elog(WARNING,"CFS failed to read file %s: %m",map_bck_path);
566+
LWLockAcquire(CfsGcLock,LW_EXCLUSIVE);/* Prevent race condition with GC */
598567

599-
close(md2);
600-
}
601-
}
602-
else
568+
/* Recheck under CfsGcLock that map->lock was not released */
569+
if (pg_atomic_read_u32(&map->lock) >=CFS_GC_LOCK)
603570
{
604-
/* Presence of backup file means that we still have
605-
* unchanged data and map files. Just remove backup files and
606-
* revoke GC lock.
571+
/* Uhhh... looks like last GC was interrupted.
572+
* Try to recover the file.
607573
*/
608-
unlink(file_bck_path);
609-
unlink(map_bck_path);
574+
char*map_bck_path=psprintf("%s.cfm.bck",file_path);
575+
char*file_bck_path=psprintf("%s.bck",file_path);
576+
577+
elog(WARNING,"CFS indicates that GC of %s was interrupted: trying to perform recovery",file_path);
578+
579+
if (access(file_bck_path,R_OK)!=0)
580+
{
581+
/* There is no backup file: new map should be constructed */
582+
intmd2=open(map_bck_path,O_RDWR|PG_BINARY,0);
583+
if (md2 >=0)
584+
{
585+
/* Recover map. */
586+
if (!cfs_read_file(md2,map,sizeof(FileMap)))
587+
elog(WARNING,"CFS failed to read file %s: %m",map_bck_path);
588+
589+
close(md2);
590+
}
591+
}
592+
else
593+
{
594+
/* Presence of backup file means that we still have
595+
* unchanged data and map files. Just remove backup files and
596+
* revoke GC lock.
597+
*/
598+
unlink(file_bck_path);
599+
unlink(map_bck_path);
600+
}
601+
602+
count=pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* revoke GC lock */
603+
Assert((int)count>0);
604+
pfree(file_bck_path);
605+
pfree(map_bck_path);
610606
}
611-
612-
pg_atomic_clear_flag(&cfs_state->gc_started);
613-
count=pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* revoke GC lock */
614-
Assert((int)count>0);
615-
pfree(file_bck_path);
616-
pfree(map_bck_path);
607+
LWLockRelease(CfsGcLock);
617608
break;
618-
}
609+
}
610+
/* Wait until GC of segment is completed */
619611
pg_atomic_fetch_sub_u32(&map->lock,1);
620612
pg_usleep(delay);
621613
if (delay<CFS_LOCK_MAX_TIMEOUT)
614+
{
622615
delay *=2;
616+
}
623617
}
624618

625619
if (IsUnderPostmaster&&cfs_gc_workers!=0
@@ -649,11 +643,11 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
649643
/*
650644
* Perform garbage collection (if required) on the file
651645
* @param map_path - path to the map file (*.cfm).
652-
* @paramnoerror -surpress error message (when this function is calledbycfs_gc_relation until there are available segments)
646+
* @parambacground -GC is performed in backgroundbyBGW: surpress error message and set CfsGcLock
653647
*/
654-
staticboolcfs_gc_file(char*map_path,boolnoerror)
648+
staticboolcfs_gc_file(char*map_path,boolbackground)
655649
{
656-
intmd=open(map_path,O_RDWR|PG_BINARY,0);
650+
intmd;
657651
FileMap*map;
658652
uint32physSize;
659653
uint32usedSize;
@@ -663,29 +657,33 @@ static bool cfs_gc_file(char* map_path, bool noerror)
663657
intfd2=-1;
664658
intmd2=-1;
665659
boolsucceed= false;
660+
intrc;
661+
666662

667663
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
668664

669-
while (!cfs_state->gc_enabled)
665+
if (background)
670666
{
671-
intrc;
672-
673-
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc,1);
674-
675-
rc=WaitLatch(MyLatch,
676-
WL_TIMEOUT |WL_POSTMASTER_DEATH,
677-
CFS_DISABLE_TIMEOUT/* ms */);
678-
if (cfs_gc_stop|| (rc&WL_POSTMASTER_DEATH))
679-
exit(1);
667+
while (!cfs_state->gc_enabled)
668+
{
669+
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc,1);
670+
671+
rc=WaitLatch(MyLatch,
672+
WL_TIMEOUT |WL_POSTMASTER_DEATH,
673+
CFS_DISABLE_TIMEOUT/* ms */);
674+
if (cfs_gc_stop|| (rc&WL_POSTMASTER_DEATH))
675+
exit(1);
676+
677+
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
678+
}
680679

681-
pg_atomic_fetch_add_u32(&cfs_state->n_active_gc,1);
680+
LWLockAcquire(CfsGcLock,LW_SHARED);/* avoid race condition with cfs_file_lock */
682681
}
683682

683+
md=open(map_path,O_RDWR|PG_BINARY,0);
684684
if (md<0)
685685
{
686-
if (!noerror) {
687-
elog(WARNING,"CFS failed to open map file %s: %m",map_path);
688-
}
686+
elog(DEBUG1,"CFS failed to open map file %s: %m",map_path);
689687
gotoFinishGC;
690688
}
691689

@@ -766,6 +764,11 @@ static bool cfs_gc_file(char* map_path, bool noerror)
766764
remove_backups= false;
767765
gotoReplaceMap;
768766
}
767+
else
768+
{
769+
/* No backups - nothing has to be recovered. Just release GC lock */
770+
break;
771+
}
769772
}
770773
else
771774
{
@@ -908,7 +911,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
908911
{
909912
inode_tinode=newMap->inodes[i];
910913
intsize=CFS_INODE_SIZE(inode);
911-
if (size!=0)
914+
if (size!=0&&size<BLCKSZ)
912915
{
913916
charblock[BLCKSZ];
914917
chardecomressedBlock[BLCKSZ];
@@ -926,7 +929,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
926929
pg_atomic_fetch_sub_u32(&map->lock,CFS_GC_LOCK);/* release lock */
927930
/* TODO Is it worth to PANIC or ERROR will be enough? */
928931
elog(PANIC,"Verification failed for block %d of relation %s: error code %d",
929-
i,file_path, (int)res);
932+
i,file_bck_path, (int)res);
930933
}
931934
}
932935
}
@@ -1032,7 +1035,12 @@ static bool cfs_gc_file(char* map_path, bool noerror)
10321035
}
10331036

10341037
FinishGC:
1038+
if (background)
1039+
{
1040+
LWLockRelease(CfsGcLock);
1041+
}
10351042
pg_atomic_fetch_sub_u32(&cfs_state->n_active_gc,1);
1043+
10361044
returnsucceed;
10371045
}
10381046

@@ -1066,7 +1074,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10661074
strcmp(file_path+len-4,".cfm")==0)
10671075
{
10681076
if (entry->d_ino %cfs_state->n_workers==worker_id
1069-
&& !cfs_gc_file(file_path,false))
1077+
&& !cfs_gc_file(file_path,true))
10701078
{
10711079
success= false;
10721080
break;
@@ -1395,31 +1403,35 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
13951403

13961404
Datumcfs_gc_relation(PG_FUNCTION_ARGS)
13971405
{
1398-
cfs_gc_processed_segments=0;
1399-
1400-
if (cfs_gc_workers==0&&pg_atomic_test_set_flag(&cfs_state->gc_started))
1406+
Oidoid=PG_GETARG_OID(0);
1407+
Relationrel=try_relation_open(oid,AccessShareLock);
1408+
intprocessed_segments=0;
1409+
1410+
if (rel!=NULL)
14011411
{
1402-
Oidoid=PG_GETARG_OID(0);
1403-
Relationrel=try_relation_open(oid,AccessShareLock);
1404-
1405-
if (rel!=NULL)
1406-
{
1407-
char*path=relpathbackend(rel->rd_node,rel->rd_backend,MAIN_FORKNUM);
1408-
char*map_path= (char*)palloc(strlen(path)+16);
1409-
inti=0;
1410-
sprintf(map_path,"%s.cfm",path);
1412+
char*path;
1413+
char*map_path;
1414+
inti=0;
1415+
1416+
LWLockAcquire(CfsGcLock,LW_EXCLUSIVE);/* Prevent interaction with background GC */
1417+
1418+
processed_segments=cfs_gc_processed_segments;
1419+
1420+
path=relpathbackend(rel->rd_node,rel->rd_backend,MAIN_FORKNUM);
1421+
map_path= (char*)palloc(strlen(path)+16);
1422+
sprintf(map_path,"%s.cfm",path);
14111423

1412-
while (true)
1413-
{
1414-
if (!cfs_gc_file(map_path, true))
1415-
break;
1416-
sprintf(map_path,"%s.%u.cfm",path,++i);
1417-
}
1418-
pfree(path);
1419-
pfree(map_path);
1420-
relation_close(rel,AccessShareLock);
1424+
while (cfs_gc_file(map_path, false))
1425+
{
1426+
sprintf(map_path,"%s.%u.cfm",path,++i);
14211427
}
1422-
pg_atomic_clear_flag(&cfs_state->gc_started);
1428+
pfree(path);
1429+
pfree(map_path);
1430+
relation_close(rel,AccessShareLock);
1431+
1432+
processed_segments-=cfs_gc_processed_segments;
1433+
1434+
LWLockRelease(CfsGcLock);
14231435
}
14241436
PG_RETURN_INT32(cfs_gc_processed_segments);
14251437
}

‎src/backend/storage/ipc/ipci.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include"storage/procsignal.h"
4545
#include"storage/sinvaladt.h"
4646
#include"storage/spin.h"
47+
#include"storage/cfs.h"
4748
#include"utils/snapmgr.h"
4849

4950

@@ -142,6 +143,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
142143
size=add_size(size,BTreeShmemSize());
143144
size=add_size(size,SyncScanShmemSize());
144145
size=add_size(size,AsyncShmemSize());
146+
size=add_size(size,cfs_shmem_size());
145147
#ifdefEXEC_BACKEND
146148
size=add_size(size,ShmemBackendArraySize());
147149
#endif
@@ -254,7 +256,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
254256
BTreeShmemInit();
255257
SyncScanShmemInit();
256258
AsyncShmemInit();
257-
259+
cfs_initialize();
260+
258261
/*
259262
* Init array of Latches in SHMEM for WAITLSN
260263
*/

‎src/backend/storage/lmgr/lwlocknames.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ CommitTsLock39
4747
ReplicationOriginLock40
4848
MultiXactTruncationLock41
4949
OldSnapshotTimeMapLock42
50+
CfsGcLock 43

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp