5858#include "catalog/namespace.h"
5959#include "miscadmin.h"
6060#include "pgstat.h"
61+ #include "port/atomics.h"
6162#include "storage/bufmgr.h"
6263#include "storage/freespace.h"
6364#include "storage/lmgr.h"
@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
8990bool is_bitmapscan ,
9091bool is_samplescan ,
9192bool temp_snap );
93+ static void heap_parallelscan_startblock_init (HeapScanDesc scan );
9294static BlockNumber heap_parallelscan_nextpage (HeapScanDesc scan );
9395static HeapTuple heap_prepare_insert (Relation relation ,HeapTuple tup ,
9496TransactionId xid ,CommandId cid ,int options );
@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
510512}
511513if (scan -> rs_parallel != NULL )
512514{
515+ heap_parallelscan_startblock_init (scan );
516+
513517page = heap_parallelscan_nextpage (scan );
514518
515519/* Other processes might have already finished the scan. */
@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
812816}
813817if (scan -> rs_parallel != NULL )
814818{
819+ heap_parallelscan_startblock_init (scan );
820+
815821page = heap_parallelscan_nextpage (scan );
816822
817823/* Other processes might have already finished the scan. */
@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
15351541
15361542/*
15371543 * Caller is responsible for making sure that all workers have
1538- * finished the scan before calling this, so it really shouldn't be
1539- * necessary to acquire the mutex at all. We acquire it anyway, just
1540- * to be tidy.
1544+ * finished the scan before calling this.
15411545 */
15421546parallel_scan = scan -> rs_parallel ;
1543- SpinLockAcquire (& parallel_scan -> phs_mutex );
1544- parallel_scan -> phs_cblock = parallel_scan -> phs_startblock ;
1545- SpinLockRelease (& parallel_scan -> phs_mutex );
1547+ pg_atomic_write_u64 (& parallel_scan -> phs_nallocated ,0 );
15461548}
15471549}
15481550
@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
16351637!RelationUsesLocalBuffers (relation )&&
16361638target -> phs_nblocks > NBuffers /4 ;
16371639SpinLockInit (& target -> phs_mutex );
1638- target -> phs_cblock = InvalidBlockNumber ;
16391640target -> phs_startblock = InvalidBlockNumber ;
1641+ pg_atomic_write_u64 (& target -> phs_nallocated ,0 );
16401642SerializeSnapshot (snapshot ,target -> phs_snapshot_data );
16411643}
16421644
@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
16601662}
16611663
16621664/* ----------------
1663- *heap_parallelscan_nextpage -get the next page to scan
1665+ *heap_parallelscan_startblock_init -find and set the scan's startblock
16641666 *
1665- *Get the next page to scan. Even if there are no pages left to scan,
1666- *another backend could have grabbed a page to scan and not yet finished
1667- *looking at it, so it doesn't follow that the scan is done when the
1668- *first backend gets an InvalidBlockNumber return.
1667+ *Determine where the parallel seq scan should start. This function may
1668+ *be called many times, once by each parallel worker. We must be careful
1669+ *only to set the startblock once.
16691670 * ----------------
16701671 */
1671- static BlockNumber
1672- heap_parallelscan_nextpage (HeapScanDesc scan )
1672+ static void
1673+ heap_parallelscan_startblock_init (HeapScanDesc scan )
16731674{
1674- BlockNumber page = InvalidBlockNumber ;
16751675BlockNumber sync_startpage = InvalidBlockNumber ;
1676- BlockNumber report_page = InvalidBlockNumber ;
16771676ParallelHeapScanDesc parallel_scan ;
16781677
16791678Assert (scan -> rs_parallel );
@@ -1705,46 +1704,63 @@ heap_parallelscan_nextpage(HeapScanDesc scan)
17051704sync_startpage = ss_get_location (scan -> rs_rd ,scan -> rs_nblocks );
17061705gotoretry ;
17071706}
1708- parallel_scan -> phs_cblock = parallel_scan -> phs_startblock ;
17091707}
1708+ SpinLockRelease (& parallel_scan -> phs_mutex );
1709+ }
1710+
1711+ /* ----------------
1712+ *heap_parallelscan_nextpage - get the next page to scan
1713+ *
1714+ *Get the next page to scan. Even if there are no pages left to scan,
1715+ *another backend could have grabbed a page to scan and not yet finished
1716+ *looking at it, so it doesn't follow that the scan is done when the
1717+ *first backend gets an InvalidBlockNumber return.
1718+ * ----------------
1719+ */
1720+ static BlockNumber
1721+ heap_parallelscan_nextpage (HeapScanDesc scan )
1722+ {
1723+ BlockNumber page ;
1724+ ParallelHeapScanDesc parallel_scan ;
1725+ uint64 nallocated ;
1726+
1727+ Assert (scan -> rs_parallel );
1728+ parallel_scan = scan -> rs_parallel ;
17101729
17111730/*
1712- * The current block number is the next one that needs to be scanned,
1713- * unless it's InvalidBlockNumber already, in which case there are no more
1714- * blocks to scan. After remembering the current value, we must advance
1715- * it so that the next call to this function returns the next block to be
1716- * scanned.
1731+ * phs_nallocated tracks how many pages have been allocated to workers
1732+ * already. When phs_nallocated >= rs_nblocks, all blocks have been
1733+ * allocated.
1734+ *
1735+ * Because we use an atomic fetch-and-add to fetch the current value, the
1736+ * phs_nallocated counter will exceed rs_nblocks, because workers will
1737+ * still increment the value, when they try to allocate the next block but
1738+ * all blocks have been allocated already. The counter must be 64 bits
1739+ * wide because of that, to avoid wrapping around when rs_nblocks is close
1740+ * to 2^32.
1741+ *
1742+ * The actual page to return is calculated by adding the counter to the
1743+ * starting block number, modulo nblocks.
17171744 */
1718- page = parallel_scan -> phs_cblock ;
1719- if (page != InvalidBlockNumber )
1720- {
1721- parallel_scan -> phs_cblock ++ ;
1722- if (parallel_scan -> phs_cblock >=scan -> rs_nblocks )
1723- parallel_scan -> phs_cblock = 0 ;
1724- if (parallel_scan -> phs_cblock == parallel_scan -> phs_startblock )
1725- {
1726- parallel_scan -> phs_cblock = InvalidBlockNumber ;
1727- report_page = parallel_scan -> phs_startblock ;
1728- }
1729- }
1730-
1731- /* Release the lock. */
1732- SpinLockRelease (& parallel_scan -> phs_mutex );
1745+ nallocated = pg_atomic_fetch_add_u64 (& parallel_scan -> phs_nallocated ,1 );
1746+ if (nallocated >=scan -> rs_nblocks )
1747+ page = InvalidBlockNumber ;/* all blocks have been allocated */
1748+ else
1749+ page = (nallocated + parallel_scan -> phs_startblock ) %scan -> rs_nblocks ;
17331750
17341751/*
17351752 * Report scan location. Normally, we report the current page number.
17361753 * When we reach the end of the scan, though, we report the starting page,
17371754 * not the ending page, just so the starting positions for later scans
17381755 * doesn't slew backwards. We only report the position at the end of the
1739- * scan once, though: subsequent callers will have report nothing, since
1740- * they will have page == InvalidBlockNumber.
1756+ * scan once, though: subsequent callers will report nothing.
17411757 */
17421758if (scan -> rs_syncscan )
17431759{
1744- if (report_page = =InvalidBlockNumber )
1745- report_page = page ;
1746- if (report_page != InvalidBlockNumber )
1747- ss_report_location (scan -> rs_rd ,report_page );
1760+ if (page ! =InvalidBlockNumber )
1761+ ss_report_location ( scan -> rs_rd , page ) ;
1762+ else if (nallocated == scan -> rs_nblocks )
1763+ ss_report_location (scan -> rs_rd ,parallel_scan -> phs_startblock );
17481764}
17491765
17501766return page ;