@@ -389,23 +389,31 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
389389 *Section 3: Compression implementation.
390390 * ----------------------------------------------------------------
391391 */
392- void cfs_initialize ()
392+ int cfs_shmem_size ()
393393{
394- cfs_state = (CfsState * )ShmemAlloc (sizeof (CfsState ));
395- memset (& cfs_state -> gc_stat ,0 ,sizeof cfs_state -> gc_stat );
396- pg_atomic_init_flag (& cfs_state -> gc_started );
397- pg_atomic_init_u32 (& cfs_state -> n_active_gc ,0 );
398- cfs_state -> n_workers = 0 ;
399- cfs_state -> gc_enabled = true;
400- cfs_state -> max_iterations = 0 ;
401-
402- if (cfs_encryption )
403- cfs_crypto_init ();
404-
405- elog (LOG ,"Start CFS version %s compression algorithm %s encryption %s" ,
406- CFS_VERSION ,cfs_algorithm (),cfs_encryption ?"enabled" :"disabled" );
394+ return sizeof (CfsState );
407395}
408396
397+ void cfs_initialize ()
398+ {
399+ bool found ;
400+ cfs_state = (CfsState * )ShmemInitStruct ("CFS Control" ,sizeof (CfsState ),& found );
401+ if (!found )
402+ {
403+ memset (& cfs_state -> gc_stat ,0 ,sizeof cfs_state -> gc_stat );
404+ pg_atomic_init_flag (& cfs_state -> gc_started );
405+ pg_atomic_init_u32 (& cfs_state -> n_active_gc ,0 );
406+ cfs_state -> n_workers = 0 ;
407+ cfs_state -> gc_enabled = cfs_gc_enabled ;
408+ cfs_state -> max_iterations = 0 ;
409+
410+ if (cfs_encryption )
411+ cfs_crypto_init ();
412+
413+ elog (LOG ,"Start CFS version %s compression algorithm %s encryption %s GC %s" ,
414+ CFS_VERSION ,cfs_algorithm (),cfs_encryption ?"enabled" :"disabled" ,cfs_gc_enabled ?"enabled" :"disabled" );
415+ }
416+ }
409417int cfs_msync (FileMap * map )
410418{
411419#ifdef WIN32
@@ -540,86 +548,72 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
540548void cfs_lock_file (FileMap * map ,char const * file_path )
541549{
542550long delay = CFS_LOCK_MIN_TIMEOUT ;
543- int n_attempts = 0 ;
544551
545552while (true)
546553{
547- uint64 count = pg_atomic_fetch_add_u32 (& map -> lock ,1 );
548- bool revokeLock = false;
554+ uint32 count = pg_atomic_fetch_add_u32 (& map -> lock ,1 );
549555
550556if (count < CFS_GC_LOCK )
551- break ;
552-
553- if (InRecovery )
554557{
555- revokeLock = true;
556- }
557- else
558- {
559- if (pg_atomic_unlocked_test_flag (& cfs_state -> gc_started ))
560- {
561- if (++ n_attempts > MAX_LOCK_ATTEMPTS )
562- {
563- /* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564- * Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565- * without releasing lock. And for some reasons recovery was not performed and this page left locked.
566- * We should revoke the the lock to allow access to this segment.
567- */
568- revokeLock = true;
569- elog (WARNING ,"CFS revokes lock on file %s\n" ,file_path );
570- }
571- }
572- else
573- {
574- n_attempts = 0 ;/* Reset counter of attempts because GC is in progress */
575- }
558+ /* No GC is active for this segment */
559+ break ;
576560}
577- if (revokeLock
578- /* use gc_started flag to prevent race condition with other backends and GC */
579- && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
580- {
581- /* Ugggh... looks like last GC was interrupted.
582- * Try to recover the file.
583- */
584- char * map_bck_path = psprintf ("%s.cfm.bck" ,file_path );
585- char * file_bck_path = psprintf ("%s.bck" ,file_path );
586561
587- elog (WARNING ,"CFS indicates that GC of %s was interrupted: try to perform recovery" ,file_path );
562+ if (pg_atomic_read_u32 (& cfs_state -> n_active_gc )== 0 )
563+ {
564+ /* There is no active GC, so lock is set by crashed GC */
588565
589- if (access (file_bck_path ,R_OK )!= 0 )
590- {
591- /* There is no backup file: new map should be constructed */
592- int md2 = open (map_bck_path ,O_RDWR |PG_BINARY ,0 );
593- if (md2 >=0 )
594- {
595- /* Recover map. */
596- if (!cfs_read_file (md2 ,map ,sizeof (FileMap )))
597- elog (WARNING ,"CFS failed to read file %s: %m" ,map_bck_path );
566+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent race condition with GC */
598567
599- close (md2 );
600- }
601- }
602- else
568+ /* Recheck under CfsGcLock that map->lock was not released */
569+ if (pg_atomic_read_u32 (& map -> lock ) >=CFS_GC_LOCK )
603570{
604- /* Presence of backup file means that we still have
605- * unchanged data and map files. Just remove backup files and
606- * revoke GC lock.
571+ /* Uhhh... looks like last GC was interrupted.
572+ * Try to recover the file.
607573 */
608- unlink (file_bck_path );
609- unlink (map_bck_path );
574+ char * map_bck_path = psprintf ("%s.cfm.bck" ,file_path );
575+ char * file_bck_path = psprintf ("%s.bck" ,file_path );
576+
577+ elog (WARNING ,"CFS indicates that GC of %s was interrupted: trying to perform recovery" ,file_path );
578+
579+ if (access (file_bck_path ,R_OK )!= 0 )
580+ {
581+ /* There is no backup file: new map should be constructed */
582+ int md2 = open (map_bck_path ,O_RDWR |PG_BINARY ,0 );
583+ if (md2 >=0 )
584+ {
585+ /* Recover map. */
586+ if (!cfs_read_file (md2 ,map ,sizeof (FileMap )))
587+ elog (WARNING ,"CFS failed to read file %s: %m" ,map_bck_path );
588+
589+ close (md2 );
590+ }
591+ }
592+ else
593+ {
594+ /* Presence of backup file means that we still have
595+ * unchanged data and map files. Just remove backup files and
596+ * revoke GC lock.
597+ */
598+ unlink (file_bck_path );
599+ unlink (map_bck_path );
600+ }
601+
602+ count = pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* revoke GC lock */
603+ Assert ((int )count > 0 );
604+ pfree (file_bck_path );
605+ pfree (map_bck_path );
610606}
611-
612- pg_atomic_clear_flag (& cfs_state -> gc_started );
613- count = pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* revoke GC lock */
614- Assert ((int )count > 0 );
615- pfree (file_bck_path );
616- pfree (map_bck_path );
607+ LWLockRelease (CfsGcLock );
617608break ;
618- }
609+ }
610+ /* Wait until GC of segment is completed */
619611pg_atomic_fetch_sub_u32 (& map -> lock ,1 );
620612pg_usleep (delay );
621613if (delay < CFS_LOCK_MAX_TIMEOUT )
614+ {
622615delay *=2 ;
616+ }
623617}
624618
625619if (IsUnderPostmaster && cfs_gc_workers != 0
@@ -649,11 +643,11 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
649643/*
650644 * Perform garbage collection (if required) on the file
651645 * @param map_path - path to the map file (*.cfm).
652- * @paramnoerror -surpress error message (when this function is called bycfs_gc_relation until there are available segments)
646+ * @parambacground -GC is performed in background byBGW: surpress error message and set CfsGcLock
653647 */
654- static bool cfs_gc_file (char * map_path ,bool noerror )
648+ static bool cfs_gc_file (char * map_path ,bool background )
655649{
656- int md = open ( map_path , O_RDWR | PG_BINARY , 0 ) ;
650+ int md ;
657651FileMap * map ;
658652uint32 physSize ;
659653uint32 usedSize ;
@@ -663,29 +657,33 @@ static bool cfs_gc_file(char* map_path, bool noerror)
663657int fd2 = -1 ;
664658int md2 = -1 ;
665659bool succeed = false;
660+ int rc ;
661+
666662
667663pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
668664
669- while (! cfs_state -> gc_enabled )
665+ if ( background )
670666{
671- int rc ;
672-
673- pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
674-
675- rc = WaitLatch (MyLatch ,
676- WL_TIMEOUT |WL_POSTMASTER_DEATH ,
677- CFS_DISABLE_TIMEOUT /* ms */ );
678- if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
679- exit (1 );
667+ while (!cfs_state -> gc_enabled )
668+ {
669+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
670+
671+ rc = WaitLatch (MyLatch ,
672+ WL_TIMEOUT |WL_POSTMASTER_DEATH ,
673+ CFS_DISABLE_TIMEOUT /* ms */ );
674+ if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
675+ exit (1 );
676+
677+ pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
678+ }
680679
681- pg_atomic_fetch_add_u32 ( & cfs_state -> n_active_gc , 1 );
680+ LWLockAcquire ( CfsGcLock , LW_SHARED ); /* avoid race condition with cfs_file_lock */
682681}
683682
683+ md = open (map_path ,O_RDWR |PG_BINARY ,0 );
684684if (md < 0 )
685685{
686- if (!noerror ) {
687- elog (WARNING ,"CFS failed to open map file %s: %m" ,map_path );
688- }
686+ elog (DEBUG1 ,"CFS failed to open map file %s: %m" ,map_path );
689687gotoFinishGC ;
690688}
691689
@@ -766,6 +764,11 @@ static bool cfs_gc_file(char* map_path, bool noerror)
766764remove_backups = false;
767765gotoReplaceMap ;
768766}
767+ else
768+ {
769+ /* No backups - nothing has to be recovered. Just release GC lock */
770+ break ;
771+ }
769772}
770773else
771774{
@@ -908,7 +911,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
908911{
909912inode_t inode = newMap -> inodes [i ];
910913int size = CFS_INODE_SIZE (inode );
911- if (size != 0 )
914+ if (size != 0 && size < BLCKSZ )
912915{
913916char block [BLCKSZ ];
914917char decomressedBlock [BLCKSZ ];
@@ -926,7 +929,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
926929pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* release lock */
927930/* TODO Is it worth to PANIC or ERROR will be enough? */
928931elog (PANIC ,"Verification failed for block %d of relation %s: error code %d" ,
929- i ,file_path , (int )res );
932+ i ,file_bck_path , (int )res );
930933}
931934}
932935}
@@ -1032,7 +1035,12 @@ static bool cfs_gc_file(char* map_path, bool noerror)
10321035}
10331036
10341037FinishGC :
1038+ if (background )
1039+ {
1040+ LWLockRelease (CfsGcLock );
1041+ }
10351042pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
1043+
10361044return succeed ;
10371045}
10381046
@@ -1066,7 +1074,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
10661074strcmp (file_path + len - 4 ,".cfm" )== 0 )
10671075{
10681076if (entry -> d_ino %cfs_state -> n_workers == worker_id
1069- && !cfs_gc_file (file_path ,false ))
1077+ && !cfs_gc_file (file_path ,true ))
10701078{
10711079success = false;
10721080break ;
@@ -1395,31 +1403,35 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
13951403
13961404Datum cfs_gc_relation (PG_FUNCTION_ARGS )
13971405{
1398- cfs_gc_processed_segments = 0 ;
1399-
1400- if (cfs_gc_workers == 0 && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
1406+ Oid oid = PG_GETARG_OID (0 );
1407+ Relation rel = try_relation_open (oid ,AccessShareLock );
1408+ int processed_segments = 0 ;
1409+
1410+ if (rel != NULL )
14011411{
1402- Oid oid = PG_GETARG_OID (0 );
1403- Relation rel = try_relation_open (oid ,AccessShareLock );
1404-
1405- if (rel != NULL )
1406- {
1407- char * path = relpathbackend (rel -> rd_node ,rel -> rd_backend ,MAIN_FORKNUM );
1408- char * map_path = (char * )palloc (strlen (path )+ 16 );
1409- int i = 0 ;
1410- sprintf (map_path ,"%s.cfm" ,path );
1412+ char * path ;
1413+ char * map_path ;
1414+ int i = 0 ;
1415+
1416+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1417+
1418+ processed_segments = cfs_gc_processed_segments ;
1419+
1420+ path = relpathbackend (rel -> rd_node ,rel -> rd_backend ,MAIN_FORKNUM );
1421+ map_path = (char * )palloc (strlen (path )+ 16 );
1422+ sprintf (map_path ,"%s.cfm" ,path );
14111423
1412- while (true)
1413- {
1414- if (!cfs_gc_file (map_path , true))
1415- break ;
1416- sprintf (map_path ,"%s.%u.cfm" ,path ,++ i );
1417- }
1418- pfree (path );
1419- pfree (map_path );
1420- relation_close (rel ,AccessShareLock );
1424+ while (cfs_gc_file (map_path , false))
1425+ {
1426+ sprintf (map_path ,"%s.%u.cfm" ,path ,++ i );
14211427}
1422- pg_atomic_clear_flag (& cfs_state -> gc_started );
1428+ pfree (path );
1429+ pfree (map_path );
1430+ relation_close (rel ,AccessShareLock );
1431+
1432+ processed_segments -= cfs_gc_processed_segments ;
1433+
1434+ LWLockRelease (CfsGcLock );
14231435}
14241436PG_RETURN_INT32 (cfs_gc_processed_segments );
14251437}