Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitee7ca55

Browse files
committed
Add a C API for parallel heap scans.
Using this API, one backend can set up a ParallelHeapScanDesc towhich multiple backends can then attach. Each tuple in the relationwill be returned to exactly one of the scanning backends. Onlyforward scans are supported, and rescans must be carefullycoordinated.This is not exposed to the planner or executor yet.The original version of this code was written by me. Amit Kapilareviewed it, tested it, and improved it, including adding support forsynchronized scans, per review comments from Jeff Davis. Extensivetesting of this and related patches was performed by Haribabu Kommi.Final cleanup of this patch by me.
1 parentb0b0d84 commitee7ca55

File tree

3 files changed

+261
-11
lines changed

3 files changed

+261
-11
lines changed

‎src/backend/access/heap/heapam.c

Lines changed: 234 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
#include"storage/predicate.h"
6464
#include"storage/procarray.h"
6565
#include"storage/smgr.h"
66+
#include"storage/spin.h"
6667
#include"storage/standby.h"
6768
#include"utils/datum.h"
6869
#include"utils/inval.h"
@@ -80,12 +81,14 @@ boolsynchronize_seqscans = true;
8081
staticHeapScanDescheap_beginscan_internal(Relationrelation,
8182
Snapshotsnapshot,
8283
intnkeys,ScanKeykey,
84+
ParallelHeapScanDescparallel_scan,
8385
boolallow_strat,
8486
boolallow_sync,
8587
boolallow_pagemode,
8688
boolis_bitmapscan,
8789
boolis_samplescan,
8890
booltemp_snap);
91+
staticBlockNumberheap_parallelscan_nextpage(HeapScanDescscan);
8992
staticHeapTupleheap_prepare_insert(Relationrelation,HeapTupletup,
9093
TransactionIdxid,CommandIdcid,intoptions);
9194
staticXLogRecPtrlog_heap_update(Relationreln,Bufferoldbuf,
@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
226229
* results for a non-MVCC snapshot, the caller must hold some higher-level
227230
* lock that ensures the interesting tuple(s) won't change.)
228231
*/
229-
scan->rs_nblocks=RelationGetNumberOfBlocks(scan->rs_rd);
232+
if (scan->rs_parallel!=NULL)
233+
scan->rs_nblocks=scan->rs_parallel->phs_nblocks;
234+
else
235+
scan->rs_nblocks=RelationGetNumberOfBlocks(scan->rs_rd);
230236

231237
/*
232238
* If the table is large relative to NBuffers, use a bulk-read access
@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
237243
* behaviors, independently of the size of the table; also there is a GUC
238244
* variable that can disable synchronized scanning.)
239245
*
240-
* During a rescan, don't make a new strategy object if we don't have to.
246+
* Note that heap_parallelscan_initialize has a very similar test; if you
247+
* change this, consider changing that one, too.
241248
*/
242249
if (!RelationUsesLocalBuffers(scan->rs_rd)&&
243250
scan->rs_nblocks>NBuffers /4)
@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
250257

251258
if (allow_strat)
252259
{
260+
/* During a rescan, keep the previous strategy object. */
253261
if (scan->rs_strategy==NULL)
254262
scan->rs_strategy=GetAccessStrategy(BAS_BULKREAD);
255263
}
@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
260268
scan->rs_strategy=NULL;
261269
}
262270

263-
if (keep_startblock)
271+
if (scan->rs_parallel!=NULL)
272+
{
273+
/* For parallel scan, believe whatever ParallelHeapScanDesc says. */
274+
scan->rs_syncscan=scan->rs_parallel->phs_syncscan;
275+
}
276+
elseif (keep_startblock)
264277
{
265278
/*
266279
* When rescanning, we want to keep the previous startblock setting,
@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
496509
tuple->t_data=NULL;
497510
return;
498511
}
499-
page=scan->rs_startblock;/* first page */
512+
if (scan->rs_parallel!=NULL)
513+
{
514+
page=heap_parallelscan_nextpage(scan);
515+
516+
/* Other processes might have already finished the scan. */
517+
if (page==InvalidBlockNumber)
518+
{
519+
Assert(!BufferIsValid(scan->rs_cbuf));
520+
tuple->t_data=NULL;
521+
return;
522+
}
523+
}
524+
else
525+
page=scan->rs_startblock;/* first page */
500526
heapgetpage(scan,page);
501527
lineoff=FirstOffsetNumber;/* first offnum */
502528
scan->rs_inited= true;
@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
519545
}
520546
elseif (backward)
521547
{
548+
/* backward parallel scan not supported */
549+
Assert(scan->rs_parallel==NULL);
550+
522551
if (!scan->rs_inited)
523552
{
524553
/*
@@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan,
669698
page=scan->rs_nblocks;
670699
page--;
671700
}
701+
elseif (scan->rs_parallel!=NULL)
702+
{
703+
page=heap_parallelscan_nextpage(scan);
704+
finished= (page==InvalidBlockNumber);
705+
}
672706
else
673707
{
674708
page++;
@@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan,
773807
tuple->t_data=NULL;
774808
return;
775809
}
776-
page=scan->rs_startblock;/* first page */
810+
if (scan->rs_parallel!=NULL)
811+
{
812+
page=heap_parallelscan_nextpage(scan);
813+
814+
/* Other processes might have already finished the scan. */
815+
if (page==InvalidBlockNumber)
816+
{
817+
Assert(!BufferIsValid(scan->rs_cbuf));
818+
tuple->t_data=NULL;
819+
return;
820+
}
821+
}
822+
else
823+
page=scan->rs_startblock;/* first page */
777824
heapgetpage(scan,page);
778825
lineindex=0;
779826
scan->rs_inited= true;
@@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan,
793840
}
794841
elseif (backward)
795842
{
843+
/* backward parallel scan not supported */
844+
Assert(scan->rs_parallel==NULL);
845+
796846
if (!scan->rs_inited)
797847
{
798848
/*
@@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan,
932982
page=scan->rs_nblocks;
933983
page--;
934984
}
985+
elseif (scan->rs_parallel!=NULL)
986+
{
987+
page=heap_parallelscan_nextpage(scan);
988+
finished= (page==InvalidBlockNumber);
989+
}
935990
else
936991
{
937992
page++;
@@ -1341,7 +1396,7 @@ HeapScanDesc
13411396
heap_beginscan(Relationrelation,Snapshotsnapshot,
13421397
intnkeys,ScanKeykey)
13431398
{
1344-
returnheap_beginscan_internal(relation,snapshot,nkeys,key,
1399+
returnheap_beginscan_internal(relation,snapshot,nkeys,key,NULL,
13451400
true, true, true, false, false, false);
13461401
}
13471402

@@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
13511406
Oidrelid=RelationGetRelid(relation);
13521407
Snapshotsnapshot=RegisterSnapshot(GetCatalogSnapshot(relid));
13531408

1354-
returnheap_beginscan_internal(relation,snapshot,nkeys,key,
1409+
returnheap_beginscan_internal(relation,snapshot,nkeys,key,NULL,
13551410
true, true, true, false, false, true);
13561411
}
13571412

@@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
13601415
intnkeys,ScanKeykey,
13611416
boolallow_strat,boolallow_sync)
13621417
{
1363-
returnheap_beginscan_internal(relation,snapshot,nkeys,key,
1418+
returnheap_beginscan_internal(relation,snapshot,nkeys,key,NULL,
13641419
allow_strat,allow_sync, true,
13651420
false, false, false);
13661421
}
@@ -1369,7 +1424,7 @@ HeapScanDesc
13691424
heap_beginscan_bm(Relationrelation,Snapshotsnapshot,
13701425
intnkeys,ScanKeykey)
13711426
{
1372-
returnheap_beginscan_internal(relation,snapshot,nkeys,key,
1427+
returnheap_beginscan_internal(relation,snapshot,nkeys,key,NULL,
13731428
false, false, true, true, false, false);
13741429
}
13751430

@@ -1378,14 +1433,15 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
13781433
intnkeys,ScanKeykey,
13791434
boolallow_strat,boolallow_sync,boolallow_pagemode)
13801435
{
1381-
returnheap_beginscan_internal(relation,snapshot,nkeys,key,
1436+
returnheap_beginscan_internal(relation,snapshot,nkeys,key,NULL,
13821437
allow_strat,allow_sync,allow_pagemode,
13831438
false, true, false);
13841439
}
13851440

13861441
staticHeapScanDesc
13871442
heap_beginscan_internal(Relationrelation,Snapshotsnapshot,
13881443
intnkeys,ScanKeykey,
1444+
ParallelHeapScanDescparallel_scan,
13891445
boolallow_strat,
13901446
boolallow_sync,
13911447
boolallow_pagemode,
@@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
14181474
scan->rs_allow_strat=allow_strat;
14191475
scan->rs_allow_sync=allow_sync;
14201476
scan->rs_temp_snap=temp_snap;
1477+
scan->rs_parallel=parallel_scan;
14211478

14221479
/*
14231480
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan,
14731530
* reinitialize scan descriptor
14741531
*/
14751532
initscan(scan,key, true);
1533+
1534+
/*
1535+
* reset parallel scan, if present
1536+
*/
1537+
if (scan->rs_parallel!=NULL)
1538+
{
1539+
ParallelHeapScanDescparallel_scan;
1540+
1541+
/*
1542+
* Caller is responsible for making sure that all workers have
1543+
* finished the scan before calling this, so it really shouldn't be
1544+
* necessary to acquire the mutex at all. We acquire it anyway, just
1545+
* to be tidy.
1546+
*/
1547+
parallel_scan=scan->rs_parallel;
1548+
SpinLockAcquire(&parallel_scan->phs_mutex);
1549+
parallel_scan->phs_cblock=parallel_scan->phs_startblock;
1550+
SpinLockRelease(&parallel_scan->phs_mutex);
1551+
}
14761552
}
14771553

14781554
/* ----------------
@@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan)
15311607
pfree(scan);
15321608
}
15331609

1610+
/* ----------------
1611+
*heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
1612+
*
1613+
*Sadly, this doesn't reduce to a constant, because the size required
1614+
*to serialize the snapshot can vary.
1615+
* ----------------
1616+
*/
1617+
Size
1618+
heap_parallelscan_estimate(Snapshotsnapshot)
1619+
{
1620+
returnadd_size(offsetof(ParallelHeapScanDescData,phs_snapshot_data),
1621+
EstimateSnapshotSpace(snapshot));
1622+
}
1623+
1624+
/* ----------------
1625+
*heap_parallelscan_initialize - initialize ParallelHeapScanDesc
1626+
*
1627+
*Must allow as many bytes of shared memory as returned by
1628+
*heap_parallelscan_estimate. Call this just once in the leader
1629+
*process; then, individual workers attach via heap_beginscan_parallel.
1630+
* ----------------
1631+
*/
1632+
void
1633+
heap_parallelscan_initialize(ParallelHeapScanDesctarget,Relationrelation,
1634+
Snapshotsnapshot)
1635+
{
1636+
target->phs_relid=RelationGetRelid(relation);
1637+
target->phs_nblocks=RelationGetNumberOfBlocks(relation);
1638+
/* compare phs_syncscan initialization to similar logic in initscan */
1639+
target->phs_syncscan=synchronize_seqscans&&
1640+
!RelationUsesLocalBuffers(relation)&&
1641+
target->phs_nblocks>NBuffers /4;
1642+
SpinLockInit(&target->phs_mutex);
1643+
target->phs_cblock=InvalidBlockNumber;
1644+
target->phs_startblock=InvalidBlockNumber;
1645+
SerializeSnapshot(snapshot,target->phs_snapshot_data);
1646+
}
1647+
1648+
/* ----------------
1649+
*heap_beginscan_parallel - join a parallel scan
1650+
*
1651+
*Caller must hold a suitable lock on the correct relation.
1652+
* ----------------
1653+
*/
1654+
HeapScanDesc
1655+
heap_beginscan_parallel(Relationrelation,ParallelHeapScanDescparallel_scan)
1656+
{
1657+
Snapshotsnapshot;
1658+
1659+
Assert(RelationGetRelid(relation)==parallel_scan->phs_relid);
1660+
snapshot=RestoreSnapshot(parallel_scan->phs_snapshot_data);
1661+
RegisterSnapshot(snapshot);
1662+
1663+
returnheap_beginscan_internal(relation,snapshot,0,NULL,parallel_scan,
1664+
true, true, true, false, false, true);
1665+
}
1666+
1667+
/* ----------------
1668+
*heap_parallelscan_nextpage - get the next page to scan
1669+
*
1670+
*Get the next page to scan. Even if there are no pages left to scan,
1671+
*another backend could have grabbed a page to scan and not yet finished
1672+
*looking at it, so it doesn't follow that the scan is done when the
1673+
*first backend gets an InvalidBlockNumber return.
1674+
* ----------------
1675+
*/
1676+
staticBlockNumber
1677+
heap_parallelscan_nextpage(HeapScanDescscan)
1678+
{
1679+
BlockNumberpage=InvalidBlockNumber;
1680+
BlockNumbersync_startpage=InvalidBlockNumber;
1681+
BlockNumberreport_page=InvalidBlockNumber;
1682+
ParallelHeapScanDescparallel_scan;
1683+
1684+
Assert(scan->rs_parallel);
1685+
parallel_scan=scan->rs_parallel;
1686+
1687+
retry:
1688+
/* Grab the spinlock. */
1689+
SpinLockAcquire(&parallel_scan->phs_mutex);
1690+
1691+
/*
1692+
* If the scan's startblock has not yet been initialized, we must do so
1693+
* now. If this is not a synchronized scan, we just start at block 0, but
1694+
* if it is a synchronized scan, we must get the starting position from
1695+
* the synchronized scan machinery. We can't hold the spinlock while
1696+
* doing that, though, so release the spinlock, get the information we
1697+
* need, and retry. If nobody else has initialized the scan in the
1698+
* meantime, we'll fill in the value we fetched on the second time
1699+
* through.
1700+
*/
1701+
if (parallel_scan->phs_startblock==InvalidBlockNumber)
1702+
{
1703+
if (!parallel_scan->phs_syncscan)
1704+
parallel_scan->phs_startblock=0;
1705+
elseif (sync_startpage!=InvalidBlockNumber)
1706+
parallel_scan->phs_startblock=sync_startpage;
1707+
else
1708+
{
1709+
SpinLockRelease(&parallel_scan->phs_mutex);
1710+
sync_startpage=ss_get_location(scan->rs_rd,scan->rs_nblocks);
1711+
gotoretry;
1712+
}
1713+
parallel_scan->phs_cblock=parallel_scan->phs_startblock;
1714+
}
1715+
1716+
/*
1717+
* The current block number is the next one that needs to be scanned,
1718+
* unless it's InvalidBlockNumber already, in which case there are no more
1719+
* blocks to scan. After remembering the current value, we must advance
1720+
* it so that the next call to this function returns the next block to be
1721+
* scanned.
1722+
*/
1723+
page=parallel_scan->phs_cblock;
1724+
if (page!=InvalidBlockNumber)
1725+
{
1726+
parallel_scan->phs_cblock++;
1727+
if (parallel_scan->phs_cblock >=scan->rs_nblocks)
1728+
parallel_scan->phs_cblock=0;
1729+
if (parallel_scan->phs_cblock==parallel_scan->phs_startblock)
1730+
{
1731+
parallel_scan->phs_cblock=InvalidBlockNumber;
1732+
report_page=parallel_scan->phs_startblock;
1733+
}
1734+
}
1735+
1736+
/* Release the lock. */
1737+
SpinLockRelease(&parallel_scan->phs_mutex);
1738+
1739+
/*
1740+
* Report scan location. Normally, we report the current page number.
1741+
* When we reach the end of the scan, though, we report the starting page,
1742+
* not the ending page, just so the starting positions for later scans
1743+
* doesn't slew backwards. We only report the position at the end of the
1744+
* scan once, though: subsequent callers will have report nothing, since
1745+
* they will have page == InvalidBlockNumber.
1746+
*/
1747+
if (scan->rs_syncscan)
1748+
{
1749+
if (report_page==InvalidBlockNumber)
1750+
report_page=page;
1751+
if (report_page!=InvalidBlockNumber)
1752+
ss_report_location(scan->rs_rd,report_page);
1753+
}
1754+
1755+
returnpage;
1756+
}
1757+
15341758
/* ----------------
15351759
*heap_getnext- retrieve next tuple in scan
15361760
*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp