@@ -470,8 +470,18 @@ int cfs_munmap(FileMap* map)
470
470
*/
471
471
uint32 cfs_alloc_page (FileMap * map ,uint32 oldSize ,uint32 newSize )
472
472
{
473
+ uint32 oldPhysSize = pg_atomic_read_u32 (& map -> hdr .physSize );
474
+ uint32 newPhysSize ;
475
+
476
+ do {
477
+ newPhysSize = oldPhysSize + newSize ;
478
+ if (oldPhysSize > newPhysSize )
479
+ elog (ERROR ,"CFS: segment file exceed 4Gb limit" );
480
+ }while (!pg_atomic_compare_exchange_u32 (& map -> hdr .physSize ,& oldPhysSize ,newPhysSize ));
481
+
473
482
pg_atomic_fetch_add_u32 (& map -> hdr .usedSize ,newSize - oldSize );
474
- return pg_atomic_fetch_add_u32 (& map -> hdr .physSize ,newSize );
483
+
484
+ return oldPhysSize ;
475
485
}
476
486
477
487
/*
@@ -643,12 +653,18 @@ static int cfs_cmp_page_offs(void const* p1, void const* p2)
643
653
return o1 < o2 ?-1 :o1 == o2 ?0 :1 ;
644
654
}
645
655
656
+ typedef enum {
657
+ CFS_BACKGROUND ,
658
+ CFS_EXPLICIT ,
659
+ CFS_IMPLICIT
660
+ }GC_CALL_KIND ;
661
+
646
662
/*
647
663
* Perform garbage collection (if required) on the file
648
664
* @param map_path - path to the map file (*.cfm).
649
665
* @param bacground - GC is performed in background by BGW: surpress error message and set CfsGcLock
650
666
*/
651
- static bool cfs_gc_file (char * map_path ,bool background )
667
+ static bool cfs_gc_file (char * map_path ,GC_CALL_KIND background )
652
668
{
653
669
int md ;
654
670
FileMap * map ;
@@ -660,27 +676,37 @@ static bool cfs_gc_file(char* map_path, bool background)
660
676
int fd2 = -1 ;
661
677
int md2 = -1 ;
662
678
bool succeed = false;
679
+ bool performed = false;
663
680
int rc ;
664
681
665
-
666
682
pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
667
-
668
- while (!(background ? (cfs_state -> gc_enabled & cfs_state -> background_gc_enabled ) :cfs_state -> gc_enabled ))
683
+ if (background == CFS_IMPLICIT )
684
+ {
685
+ if (!cfs_state -> gc_enabled )
686
+ {
687
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
688
+ return false;
689
+ }
690
+ }
691
+ else
669
692
{
670
- pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
693
+ while (!cfs_state -> gc_enabled || (background == CFS_BACKGROUND && !cfs_state -> background_gc_enabled ))
694
+ {
695
+ pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
671
696
672
- rc = WaitLatch (MyLatch ,
673
- WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
674
- CFS_DISABLE_TIMEOUT /* ms */ );
675
- if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
676
- exit (1 );
697
+ rc = WaitLatch (MyLatch ,
698
+ WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
699
+ CFS_DISABLE_TIMEOUT /* ms */ );
700
+ if (cfs_gc_stop || (rc & WL_POSTMASTER_DEATH ))
701
+ exit (1 );
677
702
678
- ResetLatch (MyLatch );
679
- CHECK_FOR_INTERRUPTS ();
703
+ ResetLatch (MyLatch );
704
+ CHECK_FOR_INTERRUPTS ();
680
705
681
- pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
706
+ pg_atomic_fetch_add_u32 (& cfs_state -> n_active_gc ,1 );
707
+ }
682
708
}
683
- if (background )
709
+ if (background == CFS_BACKGROUND )
684
710
{
685
711
LWLockAcquire (CfsGcLock ,LW_SHARED );/* avoid race condition with cfs_file_lock */
686
712
}
@@ -708,7 +734,7 @@ static bool cfs_gc_file(char* map_path, bool background)
708
734
cfs_state -> gc_stat .scannedFiles += 1 ;
709
735
710
736
/* do we need to perform defragmentation? */
711
- if ((uint64 )(physSize - usedSize )* 100 > (uint64 )physSize * cfs_gc_threshold )
737
+ if (physSize > CFS_IMPLICIT_GC_THRESHOLD || (uint64 )(physSize - usedSize )* 100 > (uint64 )physSize * cfs_gc_threshold )
712
738
{
713
739
long delay = CFS_LOCK_MIN_TIMEOUT ;
714
740
char * file_path = (char * )palloc (suf + 1 );
@@ -1030,17 +1056,7 @@ static bool cfs_gc_file(char* map_path, bool background)
1030
1056
pfree (inodes );
1031
1057
pfree (newMap );
1032
1058
1033
- if (cfs_gc_delay != 0 )
1034
- {
1035
- int rc = WaitLatch (MyLatch ,
1036
- WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
1037
- cfs_gc_delay /* ms */ );
1038
- if (rc & WL_POSTMASTER_DEATH )
1039
- exit (1 );
1040
-
1041
- ResetLatch (MyLatch );
1042
- CHECK_FOR_INTERRUPTS ();
1043
- }
1059
+ performed = true;
1044
1060
}
1045
1061
else if (cfs_state -> max_iterations == 1 )
1046
1062
elog (LOG ,"CFS GC worker %d: file %.*s: physical size %u, logical size %u, used %u, compression ratio %f" ,
@@ -1058,12 +1074,23 @@ static bool cfs_gc_file(char* map_path, bool background)
1058
1074
}
1059
1075
1060
1076
FinishGC :
1061
- if (background )
1077
+ if (background == CFS_BACKGROUND )
1062
1078
{
1063
1079
LWLockRelease (CfsGcLock );
1064
1080
}
1065
1081
pg_atomic_fetch_sub_u32 (& cfs_state -> n_active_gc ,1 );
1066
1082
1083
+ if (cfs_gc_delay != 0 && performed && background == CFS_BACKGROUND )
1084
+ {
1085
+ int rc = WaitLatch (MyLatch ,
1086
+ WL_LATCH_SET |WL_TIMEOUT |WL_POSTMASTER_DEATH ,
1087
+ cfs_gc_delay /* ms */ );
1088
+ if (rc & WL_POSTMASTER_DEATH )
1089
+ exit (1 );
1090
+
1091
+ ResetLatch (MyLatch );
1092
+ CHECK_FOR_INTERRUPTS ();
1093
+ }
1067
1094
return succeed ;
1068
1095
}
1069
1096
@@ -1097,7 +1124,7 @@ static bool cfs_gc_directory(int worker_id, char const* path)
1097
1124
strcmp (file_path + len - 4 ,".cfm" )== 0 )
1098
1125
{
1099
1126
if (entry -> d_ino %cfs_state -> n_workers == worker_id
1100
- && !cfs_gc_file (file_path ,true ))
1127
+ && !cfs_gc_file (file_path ,CFS_BACKGROUND ))
1101
1128
{
1102
1129
success = false;
1103
1130
break ;
@@ -1195,6 +1222,7 @@ bool cfs_control_gc(bool enabled)
1195
1222
{
1196
1223
bool was_enabled = cfs_state -> gc_enabled ;
1197
1224
cfs_state -> gc_enabled = enabled ;
1225
+ pg_memory_barrier ();
1198
1226
if (was_enabled && !enabled )
1199
1227
{
1200
1228
/* Wait until there are no active GC workers */
@@ -1238,9 +1266,7 @@ Datum cfs_start_gc(PG_FUNCTION_ARGS)
1238
1266
int j ;
1239
1267
BackgroundWorkerHandle * * handles ;
1240
1268
1241
- cfs_gc_stop = true;/* do just one iteration */
1242
-
1243
- cfs_state -> max_iterations = 1 ;
1269
+ cfs_state -> max_iterations = 1 ;/* do just one iteration */
1244
1270
cfs_state -> n_workers = PG_GETARG_INT32 (0 );
1245
1271
handles = (BackgroundWorkerHandle * * )palloc (cfs_state -> n_workers * sizeof (BackgroundWorkerHandle * ));
1246
1272
@@ -1460,45 +1486,95 @@ Datum cfs_gc_relation(PG_FUNCTION_ARGS)
1460
1486
char * path ;
1461
1487
char * map_path ;
1462
1488
int i = 0 ;
1463
-
1464
- LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1489
+ bool stop = false;
1465
1490
1466
1491
processed_segments = cfs_gc_processed_segments ;
1467
1492
1468
1493
path = relpathbackend (rel -> rd_node ,rel -> rd_backend ,MAIN_FORKNUM );
1469
1494
map_path = (char * )palloc (strlen (path )+ 16 );
1470
1495
sprintf (map_path ,"%s.cfm" ,path );
1471
1496
1472
- while ( cfs_gc_file ( map_path , false))
1497
+ do
1473
1498
{
1499
+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1500
+ stop = !cfs_gc_file (map_path ,CFS_EXPLICIT );
1501
+ LWLockRelease (CfsGcLock );
1474
1502
sprintf (map_path ,"%s.%u.cfm" ,path ,++ i );
1475
- }
1503
+ }while (! stop );
1476
1504
pfree (path );
1477
1505
pfree (map_path );
1478
1506
relation_close (rel ,AccessShareLock );
1479
1507
1480
1508
processed_segments = cfs_gc_processed_segments - processed_segments ;
1481
-
1482
- LWLockRelease (CfsGcLock );
1483
1509
}
1484
1510
PG_RETURN_INT32 (processed_segments );
1485
1511
}
1486
1512
1487
1513
1488
- void cfs_gc_segment (char const * fileName )
1514
+ void cfs_gc_segment (char const * fileName , bool optional )
1489
1515
{
1490
- char * mapFileName = psprintf ( "%s.cfm" , fileName ) ;
1516
+ char * mapFileName ;
1491
1517
1492
- LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1518
+ if (optional )
1519
+ {
1520
+ if (!LWLockConditionalAcquire (CfsGcLock ,LW_EXCLUSIVE ))/* Prevent interaction with background GC */
1521
+ return ;
1522
+ }
1523
+ else
1524
+ LWLockAcquire (CfsGcLock ,LW_EXCLUSIVE );/* Prevent interaction with background GC */
1493
1525
1494
- cfs_gc_file ( mapFileName , false );
1526
+ mapFileName = psprintf ( "%s.cfm" , fileName );
1495
1527
1528
+ cfs_gc_file (mapFileName ,optional ?CFS_IMPLICIT :CFS_EXPLICIT );
1496
1529
LWLockRelease (CfsGcLock );
1497
1530
1498
1531
pfree (mapFileName );
1499
1532
}
1500
1533
1501
1534
1535
+ void cfs_recover_map (FileMap * map )
1536
+ {
1537
+ int i ;
1538
+ uint32 physSize ;
1539
+ uint32 virtSize ;
1540
+ uint32 usedSize = 0 ;
1541
+
1542
+ physSize = pg_atomic_read_u32 (& map -> hdr .physSize );
1543
+ virtSize = pg_atomic_read_u32 (& map -> hdr .virtSize );
1544
+
1545
+ for (i = 0 ;i < RELSEG_SIZE ;i ++ )
1546
+ {
1547
+ inode_t inode = map -> inodes [i ];
1548
+ int size = CFS_INODE_SIZE (inode );
1549
+ if (size != 0 )
1550
+ {
1551
+ uint32 offs = CFS_INODE_OFFS (inode );
1552
+ if (offs + size > physSize )
1553
+ {
1554
+ physSize = offs + size ;
1555
+ }
1556
+ if ((i + 1 )* BLCKSZ > virtSize )
1557
+ {
1558
+ virtSize = (i + 1 )* BLCKSZ ;
1559
+ }
1560
+ usedSize += size ;
1561
+ }
1562
+ if (usedSize != pg_atomic_read_u32 (& map -> hdr .usedSize ))
1563
+ {
1564
+ pg_atomic_write_u32 (& map -> hdr .usedSize ,usedSize );
1565
+ }
1566
+ if (physSize != pg_atomic_read_u32 (& map -> hdr .physSize ))
1567
+ {
1568
+ pg_atomic_write_u32 (& map -> hdr .physSize ,physSize );
1569
+ }
1570
+ if (virtSize != pg_atomic_read_u32 (& map -> hdr .virtSize ))
1571
+ {
1572
+ pg_atomic_write_u32 (& map -> hdr .virtSize ,virtSize );
1573
+ }
1574
+ }
1575
+ }
1576
+
1577
+
1502
1578
Datum cfs_gc_activity_processed_bytes (PG_FUNCTION_ARGS )
1503
1579
{
1504
1580
PG_RETURN_INT64 (cfs_state -> gc_stat .processedBytes );