Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3cda10f

Browse files
committed
Use atomic ops to hand out pages to scan in parallel scan.
With a lot of CPUs, the spinlock that protects the current scan locationin a parallel scan can become a bottleneck. Use an atomic fetch-and-addinstruction instead.David RowleyDiscussion:https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com
1 parent0c504a8 commit3cda10f

File tree

2 files changed

+62
-45
lines changed

2 files changed

+62
-45
lines changed

‎src/backend/access/heap/heapam.c

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include"catalog/namespace.h"
5959
#include"miscadmin.h"
6060
#include"pgstat.h"
61+
#include"port/atomics.h"
6162
#include"storage/bufmgr.h"
6263
#include"storage/freespace.h"
6364
#include"storage/lmgr.h"
@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
8990
boolis_bitmapscan,
9091
boolis_samplescan,
9192
booltemp_snap);
93+
staticvoidheap_parallelscan_startblock_init(HeapScanDescscan);
9294
staticBlockNumberheap_parallelscan_nextpage(HeapScanDescscan);
9395
staticHeapTupleheap_prepare_insert(Relationrelation,HeapTupletup,
9496
TransactionIdxid,CommandIdcid,intoptions);
@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
510512
}
511513
if (scan->rs_parallel!=NULL)
512514
{
515+
heap_parallelscan_startblock_init(scan);
516+
513517
page=heap_parallelscan_nextpage(scan);
514518

515519
/* Other processes might have already finished the scan. */
@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
812816
}
813817
if (scan->rs_parallel!=NULL)
814818
{
819+
heap_parallelscan_startblock_init(scan);
820+
815821
page=heap_parallelscan_nextpage(scan);
816822

817823
/* Other processes might have already finished the scan. */
@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
15351541

15361542
/*
15371543
* Caller is responsible for making sure that all workers have
1538-
* finished the scan before calling this, so it really shouldn't be
1539-
* necessary to acquire the mutex at all. We acquire it anyway, just
1540-
* to be tidy.
1544+
* finished the scan before calling this.
15411545
*/
15421546
parallel_scan=scan->rs_parallel;
1543-
SpinLockAcquire(&parallel_scan->phs_mutex);
1544-
parallel_scan->phs_cblock=parallel_scan->phs_startblock;
1545-
SpinLockRelease(&parallel_scan->phs_mutex);
1547+
pg_atomic_write_u64(&parallel_scan->phs_nallocated,0);
15461548
}
15471549
}
15481550

@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
16351637
!RelationUsesLocalBuffers(relation)&&
16361638
target->phs_nblocks>NBuffers /4;
16371639
SpinLockInit(&target->phs_mutex);
1638-
target->phs_cblock=InvalidBlockNumber;
16391640
target->phs_startblock=InvalidBlockNumber;
1641+
pg_atomic_write_u64(&target->phs_nallocated,0);
16401642
SerializeSnapshot(snapshot,target->phs_snapshot_data);
16411643
}
16421644

@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
16601662
}
16611663

16621664
/* ----------------
1663-
*heap_parallelscan_nextpage -get the next page toscan
1665+
*heap_parallelscan_startblock_init -find and set thescan's startblock
16641666
*
1665-
*Get the next page to scan. Even if there are no pages left to scan,
1666-
*another backend could have grabbed a page to scan and not yet finished
1667-
*looking at it, so it doesn't follow that the scan is done when the
1668-
*first backend gets an InvalidBlockNumber return.
1667+
*Determine where the parallel seq scan should start. This function may
1668+
*be called many times, once by each parallel worker. We must be careful
1669+
*only to set the startblock once.
16691670
* ----------------
16701671
*/
1671-
staticBlockNumber
1672-
heap_parallelscan_nextpage(HeapScanDescscan)
1672+
staticvoid
1673+
heap_parallelscan_startblock_init(HeapScanDescscan)
16731674
{
1674-
BlockNumberpage=InvalidBlockNumber;
16751675
BlockNumbersync_startpage=InvalidBlockNumber;
1676-
BlockNumberreport_page=InvalidBlockNumber;
16771676
ParallelHeapScanDescparallel_scan;
16781677

16791678
Assert(scan->rs_parallel);
@@ -1705,46 +1704,63 @@ heap_parallelscan_nextpage(HeapScanDesc scan)
17051704
sync_startpage=ss_get_location(scan->rs_rd,scan->rs_nblocks);
17061705
gotoretry;
17071706
}
1708-
parallel_scan->phs_cblock=parallel_scan->phs_startblock;
17091707
}
1708+
SpinLockRelease(&parallel_scan->phs_mutex);
1709+
}
1710+
1711+
/* ----------------
1712+
*heap_parallelscan_nextpage - get the next page to scan
1713+
*
1714+
*Get the next page to scan. Even if there are no pages left to scan,
1715+
*another backend could have grabbed a page to scan and not yet finished
1716+
*looking at it, so it doesn't follow that the scan is done when the
1717+
*first backend gets an InvalidBlockNumber return.
1718+
* ----------------
1719+
*/
1720+
staticBlockNumber
1721+
heap_parallelscan_nextpage(HeapScanDescscan)
1722+
{
1723+
BlockNumberpage;
1724+
ParallelHeapScanDescparallel_scan;
1725+
uint64nallocated;
1726+
1727+
Assert(scan->rs_parallel);
1728+
parallel_scan=scan->rs_parallel;
17101729

17111730
/*
1712-
* The current block number is the next one that needs to be scanned,
1713-
* unless it's InvalidBlockNumber already, in which case there are no more
1714-
* blocks to scan. After remembering the current value, we must advance
1715-
* it so that the next call to this function returns the next block to be
1716-
* scanned.
1731+
* phs_nallocated tracks how many pages have been allocated to workers
1732+
* already. When phs_nallocated >= rs_nblocks, all blocks have been
1733+
* allocated.
1734+
*
1735+
* Because we use an atomic fetch-and-add to fetch the current value, the
1736+
* phs_nallocated counter will exceed rs_nblocks, because workers will
1737+
* still increment the value, when they try to allocate the next block but
1738+
* all blocks have been allocated already. The counter must be 64 bits
1739+
* wide because of that, to avoid wrapping around when rs_nblocks is close
1740+
* to 2^32.
1741+
*
1742+
* The actual page to return is calculated by adding the counter to the
1743+
* starting block number, modulo nblocks.
17171744
*/
1718-
page=parallel_scan->phs_cblock;
1719-
if (page!=InvalidBlockNumber)
1720-
{
1721-
parallel_scan->phs_cblock++;
1722-
if (parallel_scan->phs_cblock >=scan->rs_nblocks)
1723-
parallel_scan->phs_cblock=0;
1724-
if (parallel_scan->phs_cblock==parallel_scan->phs_startblock)
1725-
{
1726-
parallel_scan->phs_cblock=InvalidBlockNumber;
1727-
report_page=parallel_scan->phs_startblock;
1728-
}
1729-
}
1730-
1731-
/* Release the lock. */
1732-
SpinLockRelease(&parallel_scan->phs_mutex);
1745+
nallocated=pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated,1);
1746+
if (nallocated >=scan->rs_nblocks)
1747+
page=InvalidBlockNumber;/* all blocks have been allocated */
1748+
else
1749+
page= (nallocated+parallel_scan->phs_startblock) %scan->rs_nblocks;
17331750

17341751
/*
17351752
* Report scan location. Normally, we report the current page number.
17361753
* When we reach the end of the scan, though, we report the starting page,
17371754
* not the ending page, just so the starting positions for later scans
17381755
* doesn't slew backwards. We only report the position at the end of the
1739-
* scan once, though: subsequent callers will have report nothing, since
1740-
* they will have page == InvalidBlockNumber.
1756+
* scan once, though: subsequent callers will report nothing.
17411757
*/
17421758
if (scan->rs_syncscan)
17431759
{
1744-
if (report_page==InvalidBlockNumber)
1745-
report_page=page;
1746-
if (report_page!=InvalidBlockNumber)
1747-
ss_report_location(scan->rs_rd,report_page);
1760+
if (page!=InvalidBlockNumber)
1761+
ss_report_location(scan->rs_rd,page);
1762+
elseif (nallocated==scan->rs_nblocks)
1763+
ss_report_location(scan->rs_rd,parallel_scan->phs_startblock);
17481764
}
17491765

17501766
returnpage;

‎src/include/access/relscan.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData
3535
Oidphs_relid;/* OID of relation to scan */
3636
boolphs_syncscan;/* report location to syncscan logic? */
3737
BlockNumberphs_nblocks;/* # blocks in relation at start of scan */
38-
slock_tphs_mutex;/* mutual exclusion forblock number fields */
38+
slock_tphs_mutex;/* mutual exclusion forsetting startblock */
3939
BlockNumberphs_startblock;/* starting block number */
40-
BlockNumberphs_cblock;/* current block number */
40+
pg_atomic_uint64phs_nallocated;/* number of blocks allocated to
41+
* workers so far. */
4142
charphs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
4243
}ParallelHeapScanDescData;
4344

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp