Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0ca3b16

Browse files
committed
Add parallelism support for TID Range Scans
In v14,bb437f9 added support for scanning for ranges of TIDs using adedicated executor node for the purpose. Here, we allow these scans tobe parallelized. The range of blocks to scan is divvied up similarly tohow a Parallel Seq Scans does that, where 'chunks' of blocks areallocated to each worker and the size of those chunks is slowly reduceddown to 1 block per worker by the time we're nearing the end of thescan. Doing that means workers finish at roughly the same time.Allowing TID Range Scans to be parallelized removes the dilemma from theplanner as to whether a Parallel Seq Scan will cost less than anon-parallel TID Range Scan due to the CPU concurrency of the Seq Scan(disk costs are not divided by the number of workers). It was possiblethe planner could choose the Parallel Seq Scan which would result inreading additional blocks during execution than the TID Scan would have.Allowing Parallel TID Range Scans removes the trade-off the plannermakes when choosing between reduced CPU costs due to parallelism vsadditional I/O from the Parallel Seq Scan due to it scanning blocks fromoutside of the required TID range. There is also, of course, thetraditional parallelism performance benefits to be gained as well, whichlikely doesn't need to be explained here.Author: Cary Huang <cary.huang@highgo.ca>Author: David Rowley <dgrowleyml@gmail.com>Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>Reviewed-by: Steven Niu <niushiji@gmail.com>Discussion:https://postgr.es/m/18f2c002a24.11bc2ab825151706.3749144144619388582@highgo.ca
1 parent42473b3 commit0ca3b16

File tree

15 files changed

+446
-58
lines changed

15 files changed

+446
-58
lines changed

‎doc/src/sgml/parallel.sgml‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,15 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
299299
within each worker process.
300300
</para>
301301
</listitem>
302+
<listitem>
303+
<para>
304+
In a <emphasis>parallel tid range scan</emphasis>, the range of blocks
305+
will be subdivided into smaller ranges which are shared among the
306+
cooperating processes. Each worker process will complete the scanning
307+
of its given range of blocks before requesting an additional range of
308+
blocks.
309+
</para>
310+
</listitem>
302311
</itemizedlist>
303312

304313
Other scan types, such as scans of non-btree indexes, may support

‎src/backend/access/heap/heapam.c‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,9 @@ heap_scan_stream_read_next_parallel(ReadStream *stream,
258258
/* parallel scan */
259259
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
260260
scan->rs_parallelworkerdata,
261-
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel);
261+
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel,
262+
scan->rs_startblock,
263+
scan->rs_numblocks);
262264

263265
/* may return InvalidBlockNumber if there are no more blocks */
264266
scan->rs_prefetch_block=table_block_parallelscan_nextpage(scan->rs_base.rs_rd,

‎src/backend/access/table/tableam.c‎

Lines changed: 109 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,37 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
188188
pscan,flags);
189189
}
190190

191+
TableScanDesc
192+
table_beginscan_parallel_tidrange(Relationrelation,
193+
ParallelTableScanDescpscan)
194+
{
195+
Snapshotsnapshot;
196+
uint32flags=SO_TYPE_TIDRANGESCAN |SO_ALLOW_PAGEMODE;
197+
TableScanDescsscan;
198+
199+
Assert(RelFileLocatorEquals(relation->rd_locator,pscan->phs_locator));
200+
201+
/* disable syncscan in parallel tid range scan. */
202+
pscan->phs_syncscan= false;
203+
204+
if (!pscan->phs_snapshot_any)
205+
{
206+
/* Snapshot was serialized -- restore it */
207+
snapshot=RestoreSnapshot((char*)pscan+pscan->phs_snapshot_off);
208+
RegisterSnapshot(snapshot);
209+
flags |=SO_TEMP_SNAPSHOT;
210+
}
211+
else
212+
{
213+
/* SnapshotAny passed by caller (not serialized) */
214+
snapshot=SnapshotAny;
215+
}
216+
217+
sscan=relation->rd_tableam->scan_begin(relation,snapshot,0,NULL,
218+
pscan,flags);
219+
returnsscan;
220+
}
221+
191222

192223
/* ----------------------------------------------------------------------------
193224
* Index scan related functions.
@@ -398,6 +429,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
398429
bpscan->phs_nblocks>NBuffers /4;
399430
SpinLockInit(&bpscan->phs_mutex);
400431
bpscan->phs_startblock=InvalidBlockNumber;
432+
bpscan->phs_numblock=InvalidBlockNumber;
401433
pg_atomic_init_u64(&bpscan->phs_nallocated,0);
402434

403435
returnsizeof(ParallelBlockTableScanDescData);
@@ -416,57 +448,59 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
416448
*
417449
* Determine where the parallel seq scan should start. This function may be
418450
* called many times, once by each parallel worker. We must be careful only
419-
* to set the startblock once.
451+
* to set the phs_startblock and phs_numblock fields once.
452+
*
453+
* Callers may optionally specify a non-InvalidBlockNumber value for
454+
* 'startblock' to force the scan to start at the given page. Likewise,
455+
* 'numblocks' can be specified as a non-InvalidBlockNumber to limit the
456+
* number of blocks to scan to that many blocks.
420457
*/
421458
void
422459
table_block_parallelscan_startblock_init(Relationrel,
423460
ParallelBlockTableScanWorkerpbscanwork,
424-
ParallelBlockTableScanDescpbscan)
461+
ParallelBlockTableScanDescpbscan,
462+
BlockNumberstartblock,
463+
BlockNumbernumblocks)
425464
{
426465
BlockNumbersync_startpage=InvalidBlockNumber;
466+
BlockNumberscan_nblocks;
427467

428468
/* Reset the state we use for controlling allocation size. */
429469
memset(pbscanwork,0,sizeof(*pbscanwork));
430470

431471
StaticAssertStmt(MaxBlockNumber <=0xFFFFFFFE,
432472
"pg_nextpower2_32 may be too small for non-standard BlockNumber width");
433473

434-
/*
435-
* We determine the chunk size based on the size of the relation. First we
436-
* split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
437-
* take the next highest power of 2 number of the chunk size. This means
438-
* we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
439-
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
440-
*/
441-
pbscanwork->phsw_chunk_size=pg_nextpower2_32(Max(pbscan->phs_nblocks /
442-
PARALLEL_SEQSCAN_NCHUNKS,1));
443-
444-
/*
445-
* Ensure we don't go over the maximum chunk size with larger tables. This
446-
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
447-
* tables. Too large a chunk size has been shown to be detrimental to
448-
* synchronous scan performance.
449-
*/
450-
pbscanwork->phsw_chunk_size=Min(pbscanwork->phsw_chunk_size,
451-
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
452-
453474
retry:
454475
/* Grab the spinlock. */
455476
SpinLockAcquire(&pbscan->phs_mutex);
456477

457478
/*
458-
* If the scan's startblock has not yet been initialized, we must do so
459-
* now. If this is not a synchronized scan, we just start at block 0, but
460-
* if it is a synchronized scan, we must get the starting position from
461-
* the synchronized scan machinery. We can't hold the spinlock while
462-
* doing that, though, so release the spinlock, get the information we
463-
* need, and retry. If nobody else has initialized the scan in the
464-
* meantime, we'll fill in the value we fetched on the second time
465-
* through.
479+
* When the caller specified a limit on the number of blocks to scan, set
480+
* that in the ParallelBlockTableScanDesc, if it's not been done by
481+
* another worker already.
482+
*/
483+
if (numblocks!=InvalidBlockNumber&&
484+
pbscan->phs_numblock==InvalidBlockNumber)
485+
{
486+
pbscan->phs_numblock=numblocks;
487+
}
488+
489+
/*
490+
* If the scan's phs_startblock has not yet been initialized, we must do
491+
* so now. If a startblock was specified, start there, otherwise if this
492+
* is not a synchronized scan, we just start at block 0, but if it is a
493+
* synchronized scan, we must get the starting position from the
494+
* synchronized scan machinery. We can't hold the spinlock while doing
495+
* that, though, so release the spinlock, get the information we need, and
496+
* retry. If nobody else has initialized the scan in the meantime, we'll
497+
* fill in the value we fetched on the second time through.
466498
*/
467499
if (pbscan->phs_startblock==InvalidBlockNumber)
468500
{
469-
if (!pbscan->base.phs_syncscan)
501+
if (startblock!=InvalidBlockNumber)
502+
pbscan->phs_startblock=startblock;
503+
elseif (!pbscan->base.phs_syncscan)
470504
pbscan->phs_startblock=0;
471505
elseif (sync_startpage!=InvalidBlockNumber)
472506
pbscan->phs_startblock=sync_startpage;
@@ -478,6 +512,34 @@ table_block_parallelscan_startblock_init(Relation rel,
478512
}
479513
}
480514
SpinLockRelease(&pbscan->phs_mutex);
515+
516+
/*
517+
* Figure out how many blocks we're going to scan; either all of them, or
518+
* just phs_numblock's worth, if a limit has been imposed.
519+
*/
520+
if (pbscan->phs_numblock==InvalidBlockNumber)
521+
scan_nblocks=pbscan->phs_nblocks;
522+
else
523+
scan_nblocks=pbscan->phs_numblock;
524+
525+
/*
526+
* We determine the chunk size based on scan_nblocks. First we split
527+
* scan_nblocks into PARALLEL_SEQSCAN_NCHUNKS chunks then we calculate the
528+
* next highest power of 2 number of the result. This means we split the
529+
* blocks we're scanning into somewhere between PARALLEL_SEQSCAN_NCHUNKS
530+
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
531+
*/
532+
pbscanwork->phsw_chunk_size=pg_nextpower2_32(Max(scan_nblocks /
533+
PARALLEL_SEQSCAN_NCHUNKS,1));
534+
535+
/*
536+
* Ensure we don't go over the maximum chunk size with larger tables. This
537+
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
538+
* tables. Too large a chunk size has been shown to be detrimental to
539+
* sequential scan performance.
540+
*/
541+
pbscanwork->phsw_chunk_size=Min(pbscanwork->phsw_chunk_size,
542+
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
481543
}
482544

483545
/*
@@ -493,6 +555,7 @@ table_block_parallelscan_nextpage(Relation rel,
493555
ParallelBlockTableScanWorkerpbscanwork,
494556
ParallelBlockTableScanDescpbscan)
495557
{
558+
BlockNumberscan_nblocks;
496559
BlockNumberpage;
497560
uint64nallocated;
498561

@@ -513,7 +576,7 @@ table_block_parallelscan_nextpage(Relation rel,
513576
*
514577
* Here we name these ranges of blocks "chunks". The initial size of
515578
* these chunks is determined in table_block_parallelscan_startblock_init
516-
* based on thesize ofthe relation. Towards the end of the scan, we
579+
* based on thenumber ofblocks to scan. Towards the end of the scan, we
517580
* start making reductions in the size of the chunks in order to attempt
518581
* to divide the remaining work over all the workers as evenly as
519582
* possible.
@@ -530,17 +593,23 @@ table_block_parallelscan_nextpage(Relation rel,
530593
* phs_nallocated counter will exceed rs_nblocks, because workers will
531594
* still increment the value, when they try to allocate the next block but
532595
* all blocks have been allocated already. The counter must be 64 bits
533-
* wide because of that, to avoid wrapping around whenrs_nblocks is close
534-
* to 2^32.
596+
* wide because of that, to avoid wrapping around whenscan_nblocks is
597+
*closeto 2^32.
535598
*
536599
* The actual block to return is calculated by adding the counter to the
537-
* starting block number, modulonblocks.
600+
* starting block number, modulophs_nblocks.
538601
*/
539602

603+
/* First, figure out how many blocks we're planning on scanning */
604+
if (pbscan->phs_numblock==InvalidBlockNumber)
605+
scan_nblocks=pbscan->phs_nblocks;
606+
else
607+
scan_nblocks=pbscan->phs_numblock;
608+
540609
/*
541-
*First check if we have any remaining blocks in a previous chunk for
542-
*thisworker. We must consume all of the blocks from that before we
543-
*allocatea new chunk to the worker.
610+
*Now check if we have any remaining blocks in a previous chunk for this
611+
* worker. We must consume all of the blocks from that before we allocate
612+
* a new chunk to the worker.
544613
*/
545614
if (pbscanwork->phsw_chunk_remaining>0)
546615
{
@@ -562,7 +631,7 @@ table_block_parallelscan_nextpage(Relation rel,
562631
* chunk size set to 1.
563632
*/
564633
if (pbscanwork->phsw_chunk_size>1&&
565-
pbscanwork->phsw_nallocated>pbscan->phs_nblocks-
634+
pbscanwork->phsw_nallocated>scan_nblocks-
566635
(pbscanwork->phsw_chunk_size*PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
567636
pbscanwork->phsw_chunk_size >>=1;
568637

@@ -577,7 +646,8 @@ table_block_parallelscan_nextpage(Relation rel,
577646
pbscanwork->phsw_chunk_remaining=pbscanwork->phsw_chunk_size-1;
578647
}
579648

580-
if (nallocated >=pbscan->phs_nblocks)
649+
/* Check if we've run out of blocks to scan */
650+
if (nallocated >=scan_nblocks)
581651
page=InvalidBlockNumber;/* all blocks have been allocated */
582652
else
583653
page= (nallocated+pbscan->phs_startblock) %pbscan->phs_nblocks;

‎src/backend/executor/execParallel.c‎

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include"executor/nodeSeqscan.h"
4141
#include"executor/nodeSort.h"
4242
#include"executor/nodeSubplan.h"
43+
#include"executor/nodeTidrangescan.h"
4344
#include"executor/tqueue.h"
4445
#include"jit/jit.h"
4546
#include"nodes/nodeFuncs.h"
@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
266267
ExecForeignScanEstimate((ForeignScanState*)planstate,
267268
e->pcxt);
268269
break;
270+
caseT_TidRangeScanState:
271+
if (planstate->plan->parallel_aware)
272+
ExecTidRangeScanEstimate((TidRangeScanState*)planstate,
273+
e->pcxt);
274+
break;
269275
caseT_AppendState:
270276
if (planstate->plan->parallel_aware)
271277
ExecAppendEstimate((AppendState*)planstate,
@@ -493,6 +499,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
493499
ExecForeignScanInitializeDSM((ForeignScanState*)planstate,
494500
d->pcxt);
495501
break;
502+
caseT_TidRangeScanState:
503+
if (planstate->plan->parallel_aware)
504+
ExecTidRangeScanInitializeDSM((TidRangeScanState*)planstate,
505+
d->pcxt);
506+
break;
496507
caseT_AppendState:
497508
if (planstate->plan->parallel_aware)
498509
ExecAppendInitializeDSM((AppendState*)planstate,
@@ -994,6 +1005,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
9941005
ExecForeignScanReInitializeDSM((ForeignScanState*)planstate,
9951006
pcxt);
9961007
break;
1008+
caseT_TidRangeScanState:
1009+
if (planstate->plan->parallel_aware)
1010+
ExecTidRangeScanReInitializeDSM((TidRangeScanState*)planstate,
1011+
pcxt);
1012+
break;
9971013
caseT_AppendState:
9981014
if (planstate->plan->parallel_aware)
9991015
ExecAppendReInitializeDSM((AppendState*)planstate,pcxt);
@@ -1362,6 +1378,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
13621378
ExecForeignScanInitializeWorker((ForeignScanState*)planstate,
13631379
pwcxt);
13641380
break;
1381+
caseT_TidRangeScanState:
1382+
if (planstate->plan->parallel_aware)
1383+
ExecTidRangeScanInitializeWorker((TidRangeScanState*)planstate,
1384+
pwcxt);
1385+
break;
13651386
caseT_AppendState:
13661387
if (planstate->plan->parallel_aware)
13671388
ExecAppendInitializeWorker((AppendState*)planstate,pwcxt);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp