@@ -389,23 +389,31 @@ void cfs_decrypt(const char* fname, void* block, uint32 offs, uint32 size)
389
389
*Section 3: Compression implementation.
390
390
* ----------------------------------------------------------------
391
391
*/
392
- void cfs_initialize ()
392
+ int cfs_shmem_size ()
393
393
{
394
- cfs_state = (CfsState * )ShmemAlloc (sizeof (CfsState ));
395
- memset (& cfs_state -> gc_stat ,0 ,sizeof cfs_state -> gc_stat );
396
- pg_atomic_init_flag (& cfs_state -> gc_started );
397
- pg_atomic_init_u32 (& cfs_state -> n_active_gc ,0 );
398
- cfs_state -> n_workers = 0 ;
399
- cfs_state -> gc_enabled = true;
400
- cfs_state -> max_iterations = 0 ;
401
-
402
- if (cfs_encryption )
403
- cfs_crypto_init ();
404
-
405
- elog (LOG ,"Start CFS version %s compression algorithm %s encryption %s" ,
406
- CFS_VERSION ,cfs_algorithm (),cfs_encryption ?"enabled" :"disabled" );
394
+ return sizeof (CfsState );
407
395
}
408
396
397
+ void cfs_initialize ()
398
+ {
399
+ bool found ;
400
+ cfs_state = (CfsState * )ShmemInitStruct ("CFS Control" ,sizeof (CfsState ),& found );
401
+ if (!found )
402
+ {
403
+ memset (& cfs_state -> gc_stat ,0 ,sizeof cfs_state -> gc_stat );
404
+ pg_atomic_init_flag (& cfs_state -> gc_started );
405
+ pg_atomic_init_u32 (& cfs_state -> n_active_gc ,0 );
406
+ cfs_state -> n_workers = 0 ;
407
+ cfs_state -> gc_enabled = cfs_gc_enabled ;
408
+ cfs_state -> max_iterations = 0 ;
409
+
410
+ if (cfs_encryption )
411
+ cfs_crypto_init ();
412
+
413
+ elog (LOG ,"Start CFS version %s compression algorithm %s encryption %s GC %s" ,
414
+ CFS_VERSION ,cfs_algorithm (),cfs_encryption ?"enabled" :"disabled" ,cfs_gc_enabled ?"enabled" :"disabled" );
415
+ }
416
+ }
409
417
int cfs_msync (FileMap * map )
410
418
{
411
419
#ifdef WIN32
@@ -540,86 +548,72 @@ static bool cfs_write_file(int fd, void const* data, uint32 size)
540
548
void cfs_lock_file (FileMap * map ,char const * file_path )
541
549
{
542
550
long delay = CFS_LOCK_MIN_TIMEOUT ;
543
- int n_attempts = 0 ;
544
551
545
552
while (true)
546
553
{
547
- uint64 count = pg_atomic_fetch_add_u32 (& map -> lock ,1 );
548
- bool revokeLock = false;
554
+ uint32 count = pg_atomic_fetch_add_u32 (& map -> lock ,1 );
549
555
550
556
if (count < CFS_GC_LOCK )
551
- break ;
552
-
553
- if (InRecovery )
554
557
{
555
- revokeLock = true;
556
- }
557
- else
558
- {
559
- if (pg_atomic_unlocked_test_flag (& cfs_state -> gc_started ))
560
- {
561
- if (++ n_attempts > MAX_LOCK_ATTEMPTS )
562
- {
563
- /* So there is GC lock, but no active GC process during MAX_LOCK_ATTEMPTS.
564
- * Most likely it means that GC is crashed (may be together with other postgres processes or even OS)
565
- * without releasing lock. And for some reasons recovery was not performed and this page left locked.
566
- * We should revoke the the lock to allow access to this segment.
567
- */
568
- revokeLock = true;
569
- elog (WARNING ,"CFS revokes lock on file %s\n" ,file_path );
570
- }
571
- }
572
- else
573
- {
574
- n_attempts = 0 ;/* Reset counter of attempts because GC is in progress */
575
- }
558
+ /* No GC is active for this segment */
559
+ break ;
576
560
}
577
- if (revokeLock
578
- /* use gc_started flag to prevent race condition with other backends and GC */
579
- && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
580
- {
581
- /* Ugggh... looks like last GC was interrupted.
582
- * Try to recover the file.
583
- */
584
- char * map_bck_path = psprintf ("%s.cfm.bck" ,file_path );
585
- char * file_bck_path = psprintf ("%s.bck" ,file_path );
586
561
587
- elog (WARNING ,"CFS indicates that GC of %s was interrupted: try to perform recovery" ,file_path );
562
+ if (pg_atomic_read_u32 (& cfs_state -> n_active_gc )== 0 )
563
+ {
564
+ /* There is no active GC, so lock is set by crashed GC */
588
565
589
- if (access (file_bck_path ,R_OK )!= 0 )
590
- {
591
- /* There is no backup file: new map should be constructed */
592
- int md2 = open (map_bck_path ,O_RDWR |PG_BINARY ,0 );
593
- if (md2 >=0 )
594
- {
595
- /* Recover map. */
596
- if (!cfs_read_file (md2 ,map ,sizeof (FileMap )))
597
- elog (WARNING ,"CFS failed to read file %s: %m" ,map_bck_path );
566
+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent race condition with GC */
598
567
599
- close (md2 );
600
- }
601
- }
602
- else
568
+ /* Recheck under CfsGcLock that map->lock was not released */
569
+ if (pg_atomic_read_u32 (& map -> lock ) >=CFS_GC_LOCK )
603
570
{
604
- /* Presence of backup file means that we still have
605
- * unchanged data and map files. Just remove backup files and
606
- * revoke GC lock.
571
+ /* Uhhh... looks like last GC was interrupted.
572
+ * Try to recover the file.
607
573
*/
608
- unlink (file_bck_path );
609
- unlink (map_bck_path );
574
+ char * map_bck_path = psprintf ("%s.cfm.bck" ,file_path );
575
+ char * file_bck_path = psprintf ("%s.bck" ,file_path );
576
+
577
+ elog (WARNING ,"CFS indicates that GC of %s was interrupted: trying to perform recovery" ,file_path );
578
+
579
+ if (access (file_bck_path ,R_OK )!= 0 )
580
+ {
581
+ /* There is no backup file: new map should be constructed */
582
+ int md2 = open (map_bck_path ,O_RDWR |PG_BINARY ,0 );
583
+ if (md2 >=0 )
584
+ {
585
+ /* Recover map. */
586
+ if (!cfs_read_file (md2 ,map ,sizeof (FileMap )))
587
+ elog (WARNING ,"CFS failed to read file %s: %m" ,map_bck_path );
588
+
589
+ close (md2 );
590
+ }
591
+ }
592
+ else
593
+ {
594
+ /* Presence of backup file means that we still have
595
+ * unchanged data and map files. Just remove backup files and
596
+ * revoke GC lock.
597
+ */
598
+ unlink (file_bck_path );
599
+ unlink (map_bck_path );
600
+ }
601
+
602
+ count = pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* revoke GC lock */
603
+ Assert ((int )count > 0 );
604
+ pfree (file_bck_path );
605
+ pfree (map_bck_path );
610
606
}
611
-
612
- pg_atomic_clear_flag (& cfs_state -> gc_started );
613
- count = pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* revoke GC lock */
614
- Assert ((int )count > 0 );
615
- pfree (file_bck_path );
616
- pfree (map_bck_path );
607
+ LWLockRelease (CfsGcLock );
617
608
break ;
618
- }
609
+ }
610
+ /* Wait until GC of segment is completed */
619
611
pg_atomic_fetch_sub_u32 (& map -> lock ,1 );
620
612
pg_usleep (delay );
621
613
if (delay < CFS_LOCK_MAX_TIMEOUT )
614
+ {
622
615
delay *=2 ;
616
+ }
623
617
}
624
618
625
619
if (IsUnderPostmaster && cfs_gc_workers != 0
@@ -649,11 +643,11 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
649
643
/*
650
644
* Perform garbage collection (if required) on the file
651
645
* @param map_path - path to the map file (*.cfm).
652
- * @paramnoerror -surpress error message (when this function is called bycfs_gc_relation until there are available segments)
646
+ * @parambacground -GC is performed in background byBGW: surpress error message and set CfsGcLock
653
647
*/
654
- static bool cfs_gc_file (char * map_path ,bool noerror )
648
+ static bool cfs_gc_file (char * map_path ,bool background )
655
649
{
656
- int md = open ( map_path , O_RDWR | PG_BINARY , 0 ) ;
650
+ int md ;
657
651
FileMap * map ;
658
652
uint32 physSize ;
659
653
uint32 usedSize ;
@@ -663,29 +657,33 @@ static bool cfs_gc_file(char* map_path, bool noerror)
663
657
int fd2 = -1 ;
664
658
int md2 = -1 ;
665
659
bool succeed = false;
660
+ int rc ;
661
+
666
662
667
663
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
668
664
669
- while (! cfs_state -> gc_enabled )
665
+ if ( background )
670
666
{
671
- int rc ;
672
-
673
- pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
674
-
675
- rc = WaitLatch (MyLatch ,
676
- WL_TIMEOUT |WL_POSTMASTER_DEATH ,
677
- CFS_DISABLE_TIMEOUT /* ms */ );
678
- if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
679
- exit (1 );
667
+ while (!cfs_state -> gc_enabled )
668
+ {
669
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
670
+
671
+ rc = WaitLatch (MyLatch ,
672
+ WL_TIMEOUT |WL_POSTMASTER_DEATH ,
673
+ CFS_DISABLE_TIMEOUT /* ms */ );
674
+ if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
675
+ exit (1 );
676
+
677
+ pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
678
+ }
680
679
681
- pg_atomic_fetch_add_u32 ( & cfs_state -> n_active_gc , 1 );
680
+ LWLockAcquire ( CfsGcLock , LW_SHARED ); /* avoid race condition with cfs_file_lock */
682
681
}
683
682
683
+ md = open (map_path ,O_RDWR |PG_BINARY ,0 );
684
684
if (md < 0 )
685
685
{
686
- if (!noerror ) {
687
- elog (WARNING ,"CFS failed to open map file %s: %m" ,map_path );
688
- }
686
+ elog (DEBUG1 ,"CFS failed to open map file %s: %m" ,map_path );
689
687
gotoFinishGC ;
690
688
}
691
689
@@ -766,6 +764,11 @@ static bool cfs_gc_file(char* map_path, bool noerror)
766
764
remove_backups = false;
767
765
gotoReplaceMap ;
768
766
}
767
+ else
768
+ {
769
+ /* No backups - nothing has to be recovered. Just release GC lock */
770
+ break ;
771
+ }
769
772
}
770
773
else
771
774
{
@@ -908,7 +911,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
908
911
{
909
912
inode_t inode = newMap -> inodes [i ];
910
913
int size = CFS_INODE_SIZE (inode );
911
- if (size != 0 )
914
+ if (size != 0 && size < BLCKSZ )
912
915
{
913
916
char block [BLCKSZ ];
914
917
char decomressedBlock [BLCKSZ ];
@@ -926,7 +929,7 @@ static bool cfs_gc_file(char* map_path, bool noerror)
926
929
pg_atomic_fetch_sub_u32 (& map -> lock ,CFS_GC_LOCK );/* release lock */
927
930
/* TODO Is it worth to PANIC or ERROR will be enough? */
928
931
elog (PANIC ,"Verification failed for block %d of relation %s: error code %d" ,
929
- i ,file_path , (int )res );
932
+ i ,file_bck_path , (int )res );
930
933
}
931
934
}
932
935
}
@@ -1032,7 +1035,12 @@ static bool cfs_gc_file(char* map_path, bool noerror)
1032
1035
}
1033
1036
1034
1037
FinishGC :
1038
+ if (background )
1039
+ {
1040
+ LWLockRelease (CfsGcLock );
1041
+ }
1035
1042
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
1043
+
1036
1044
return succeed ;
1037
1045
}
1038
1046
@@ -1066,7 +1074,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1066
1074
strcmp (file_path + len - 4 ,".cfm" )== 0 )
1067
1075
{
1068
1076
if (entry -> d_ino %cfs_state -> n_workers == worker_id
1069
- && !cfs_gc_file (file_path ,false ))
1077
+ && !cfs_gc_file (file_path ,true ))
1070
1078
{
1071
1079
success = false;
1072
1080
break ;
@@ -1395,31 +1403,35 @@ Datum cfs_fragmentation(PG_FUNCTION_ARGS)
1395
1403
1396
1404
Datum cfs_gc_relation (PG_FUNCTION_ARGS )
1397
1405
{
1398
- cfs_gc_processed_segments = 0 ;
1399
-
1400
- if (cfs_gc_workers == 0 && pg_atomic_test_set_flag (& cfs_state -> gc_started ))
1406
+ Oid oid = PG_GETARG_OID (0 );
1407
+ Relation rel = try_relation_open (oid ,AccessShareLock );
1408
+ int processed_segments = 0 ;
1409
+
1410
+ if (rel != NULL )
1401
1411
{
1402
- Oid oid = PG_GETARG_OID (0 );
1403
- Relation rel = try_relation_open (oid ,AccessShareLock );
1404
-
1405
- if (rel != NULL )
1406
- {
1407
- char * path = relpathbackend (rel -> rd_node ,rel -> rd_backend ,MAIN_FORKNUM );
1408
- char * map_path = (char * )palloc (strlen (path )+ 16 );
1409
- int i = 0 ;
1410
- sprintf (map_path ,"%s.cfm" ,path );
1412
+ char * path ;
1413
+ char * map_path ;
1414
+ int i = 0 ;
1415
+
1416
+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1417
+
1418
+ processed_segments = cfs_gc_processed_segments ;
1419
+
1420
+ path = relpathbackend (rel -> rd_node ,rel -> rd_backend ,MAIN_FORKNUM );
1421
+ map_path = (char * )palloc (strlen (path )+ 16 );
1422
+ sprintf (map_path ,"%s.cfm" ,path );
1411
1423
1412
- while (true)
1413
- {
1414
- if (!cfs_gc_file (map_path , true))
1415
- break ;
1416
- sprintf (map_path ,"%s.%u.cfm" ,path ,++ i );
1417
- }
1418
- pfree (path );
1419
- pfree (map_path );
1420
- relation_close (rel ,AccessShareLock );
1424
+ while (cfs_gc_file (map_path , false))
1425
+ {
1426
+ sprintf (map_path ,"%s.%u.cfm" ,path ,++ i );
1421
1427
}
1422
- pg_atomic_clear_flag (& cfs_state -> gc_started );
1428
+ pfree (path );
1429
+ pfree (map_path );
1430
+ relation_close (rel ,AccessShareLock );
1431
+
1432
+ processed_segments -= cfs_gc_processed_segments ;
1433
+
1434
+ LWLockRelease (CfsGcLock );
1423
1435
}
1424
1436
PG_RETURN_INT32 (cfs_gc_processed_segments );
1425
1437
}