Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb7b0f3f

Browse files
committed
Use streaming I/O in sequential scans.
Instead of calling ReadBuffer() for each block, heap sequential scansand TID range scans now use the streaming API introduced inb5a9b18.Author: Melanie Plageman <melanieplageman@gmail.com>Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Thomas Munro <thomas.munro@gmail.com>Discussion:https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com
1 parent6ed83d5 commitb7b0f3f

File tree

2 files changed

+177
-73
lines changed

2 files changed

+177
-73
lines changed

‎src/backend/access/heap/heapam.c

Lines changed: 162 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
223223
* ----------------------------------------------------------------
224224
*/
225225

226+
/*
227+
* Streaming read API callback for parallel sequential scans. Returns the next
228+
* block the caller wants from the read stream or InvalidBlockNumber when done.
229+
*/
230+
staticBlockNumber
231+
heap_scan_stream_read_next_parallel(ReadStream*stream,
232+
void*callback_private_data,
233+
void*per_buffer_data)
234+
{
235+
HeapScanDescscan= (HeapScanDesc)callback_private_data;
236+
237+
Assert(ScanDirectionIsForward(scan->rs_dir));
238+
Assert(scan->rs_base.rs_parallel);
239+
240+
if (unlikely(!scan->rs_inited))
241+
{
242+
/* parallel scan */
243+
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
244+
scan->rs_parallelworkerdata,
245+
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel);
246+
247+
/* may return InvalidBlockNumber if there are no more blocks */
248+
scan->rs_prefetch_block=table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
249+
scan->rs_parallelworkerdata,
250+
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel);
251+
scan->rs_inited= true;
252+
}
253+
else
254+
{
255+
scan->rs_prefetch_block=table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
256+
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
257+
scan->rs_base.rs_parallel);
258+
}
259+
260+
returnscan->rs_prefetch_block;
261+
}
262+
263+
/*
264+
* Streaming read API callback for serial sequential and TID range scans.
265+
* Returns the next block the caller wants from the read stream or
266+
* InvalidBlockNumber when done.
267+
*/
268+
staticBlockNumber
269+
heap_scan_stream_read_next_serial(ReadStream*stream,
270+
void*callback_private_data,
271+
void*per_buffer_data)
272+
{
273+
HeapScanDescscan= (HeapScanDesc)callback_private_data;
274+
275+
if (unlikely(!scan->rs_inited))
276+
{
277+
scan->rs_prefetch_block=heapgettup_initial_block(scan,scan->rs_dir);
278+
scan->rs_inited= true;
279+
}
280+
else
281+
scan->rs_prefetch_block=heapgettup_advance_block(scan,
282+
scan->rs_prefetch_block,
283+
scan->rs_dir);
284+
285+
returnscan->rs_prefetch_block;
286+
}
287+
226288
/* ----------------
227289
*initscan - scan code common to heap_beginscan and heap_rescan
228290
* ----------------
@@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
325387
scan->rs_cbuf=InvalidBuffer;
326388
scan->rs_cblock=InvalidBlockNumber;
327389

390+
/*
391+
* Initialize to ForwardScanDirection because it is most common and
392+
* because heap scans go forward before going backward (e.g. CURSORs).
393+
*/
394+
scan->rs_dir=ForwardScanDirection;
395+
scan->rs_prefetch_block=InvalidBlockNumber;
396+
328397
/* page-at-a-time fields are always invalid when not rs_inited */
329398

330399
/*
@@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
508577
/*
509578
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
510579
*
511-
* Read the next block of the scan relationinto a bufferandpin that buffer
512-
*before saving itin the scan descriptor.
580+
* Read the next block of the scan relationfrom the read streamandsave it
581+
* in the scan descriptor. It is already pinned.
513582
*/
514583
staticinlinevoid
515584
heap_fetch_next_buffer(HeapScanDescscan,ScanDirectiondir)
516585
{
586+
Assert(scan->rs_read_stream);
587+
517588
/* release previous scan buffer, if any */
518589
if (BufferIsValid(scan->rs_cbuf))
519590
{
@@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
528599
*/
529600
CHECK_FOR_INTERRUPTS();
530601

531-
if (unlikely(!scan->rs_inited))
602+
/*
603+
* If the scan direction is changing, reset the prefetch block to the
604+
* current block. Otherwise, we will incorrectly prefetch the blocks
605+
* between the prefetch block and the current block again before
606+
* prefetching blocks in the new, correct scan direction.
607+
*/
608+
if (unlikely(scan->rs_dir!=dir))
532609
{
533-
scan->rs_cblock=heapgettup_initial_block(scan,dir);
610+
scan->rs_prefetch_block=scan->rs_cblock;
611+
read_stream_reset(scan->rs_read_stream);
612+
}
534613

535-
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
536-
Assert(scan->rs_cblock!=InvalidBlockNumber||
537-
!BufferIsValid(scan->rs_cbuf));
614+
scan->rs_dir=dir;
538615

539-
scan->rs_inited= true;
540-
}
541-
else
542-
scan->rs_cblock=heapgettup_advance_block(scan,scan->rs_cblock,
543-
dir);
544-
545-
/* read block if valid */
546-
if (BlockNumberIsValid(scan->rs_cblock))
547-
scan->rs_cbuf=ReadBufferExtended(scan->rs_base.rs_rd,MAIN_FORKNUM,
548-
scan->rs_cblock,RBM_NORMAL,
549-
scan->rs_strategy);
616+
scan->rs_cbuf=read_stream_next_buffer(scan->rs_read_stream,NULL);
617+
if (BufferIsValid(scan->rs_cbuf))
618+
scan->rs_cblock=BufferGetBlockNumber(scan->rs_cbuf);
550619
}
551620

552621
/*
@@ -560,34 +629,18 @@ static pg_noinline BlockNumber
560629
heapgettup_initial_block(HeapScanDescscan,ScanDirectiondir)
561630
{
562631
Assert(!scan->rs_inited);
632+
Assert(scan->rs_base.rs_parallel==NULL);
563633

564634
/* When there are no pages to scan, return InvalidBlockNumber */
565635
if (scan->rs_nblocks==0||scan->rs_numblocks==0)
566636
returnInvalidBlockNumber;
567637

568638
if (ScanDirectionIsForward(dir))
569639
{
570-
/* serial scan */
571-
if (scan->rs_base.rs_parallel==NULL)
572-
returnscan->rs_startblock;
573-
else
574-
{
575-
/* parallel scan */
576-
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
577-
scan->rs_parallelworkerdata,
578-
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel);
579-
580-
/* may return InvalidBlockNumber if there are no more blocks */
581-
returntable_block_parallelscan_nextpage(scan->rs_base.rs_rd,
582-
scan->rs_parallelworkerdata,
583-
(ParallelBlockTableScanDesc)scan->rs_base.rs_parallel);
584-
}
640+
returnscan->rs_startblock;
585641
}
586642
else
587643
{
588-
/* backward parallel scan not supported */
589-
Assert(scan->rs_base.rs_parallel==NULL);
590-
591644
/*
592645
* Disable reporting to syncscan logic in a backwards scan; it's not
593646
* very likely anyone else is doing the same thing at the same time,
@@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
699752
staticinlineBlockNumber
700753
heapgettup_advance_block(HeapScanDescscan,BlockNumberblock,ScanDirectiondir)
701754
{
702-
if (ScanDirectionIsForward(dir))
755+
Assert(scan->rs_base.rs_parallel==NULL);
756+
757+
if (likely(ScanDirectionIsForward(dir)))
703758
{
704-
if (scan->rs_base.rs_parallel==NULL)
705-
{
706-
block++;
759+
block++;
707760

708-
/* wrap back to the start of the heap */
709-
if (block >=scan->rs_nblocks)
710-
block=0;
761+
/* wrap back to the start of the heap */
762+
if (block >=scan->rs_nblocks)
763+
block=0;
711764

712-
/*
713-
* Report our new scan position for synchronization purposes. We
714-
* don't do that when moving backwards, however. That would just
715-
* mess up any other forward-moving scanners.
716-
*
717-
* Note: we do this before checking for end of scan so that the
718-
* final state of the position hint is back at the start of the
719-
* rel. That's not strictly necessary, but otherwise when you run
720-
* the same query multiple times the starting position would shift
721-
* a little bit backwards on every invocation, which is confusing.
722-
* We don't guarantee any specific ordering in general, though.
723-
*/
724-
if (scan->rs_base.rs_flags&SO_ALLOW_SYNC)
725-
ss_report_location(scan->rs_base.rs_rd,block);
726-
727-
/* we're done if we're back at where we started */
728-
if (block==scan->rs_startblock)
729-
returnInvalidBlockNumber;
765+
/*
766+
* Report our new scan position for synchronization purposes. We don't
767+
* do that when moving backwards, however. That would just mess up any
768+
* other forward-moving scanners.
769+
*
770+
* Note: we do this before checking for end of scan so that the final
771+
* state of the position hint is back at the start of the rel. That's
772+
* not strictly necessary, but otherwise when you run the same query
773+
* multiple times the starting position would shift a little bit
774+
* backwards on every invocation, which is confusing. We don't
775+
* guarantee any specific ordering in general, though.
776+
*/
777+
if (scan->rs_base.rs_flags&SO_ALLOW_SYNC)
778+
ss_report_location(scan->rs_base.rs_rd,block);
730779

731-
/* check if the limit imposed by heap_setscanlimits() is met */
732-
if (scan->rs_numblocks!=InvalidBlockNumber)
733-
{
734-
if (--scan->rs_numblocks==0)
735-
returnInvalidBlockNumber;
736-
}
780+
/* we're done if we're back at where we started */
781+
if (block==scan->rs_startblock)
782+
returnInvalidBlockNumber;
737783

738-
returnblock;
739-
}
740-
else
784+
/* check if the limit imposed by heap_setscanlimits() is met */
785+
if (scan->rs_numblocks!=InvalidBlockNumber)
741786
{
742-
returntable_block_parallelscan_nextpage(scan->rs_base.rs_rd,
743-
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
744-
scan->rs_base.rs_parallel);
787+
if (--scan->rs_numblocks==0)
788+
returnInvalidBlockNumber;
745789
}
790+
791+
returnblock;
746792
}
747793
else
748794
{
@@ -879,6 +925,7 @@ heapgettup(HeapScanDesc scan,
879925

880926
scan->rs_cbuf=InvalidBuffer;
881927
scan->rs_cblock=InvalidBlockNumber;
928+
scan->rs_prefetch_block=InvalidBlockNumber;
882929
tuple->t_data=NULL;
883930
scan->rs_inited= false;
884931
}
@@ -974,6 +1021,7 @@ heapgettup_pagemode(HeapScanDesc scan,
9741021
ReleaseBuffer(scan->rs_cbuf);
9751022
scan->rs_cbuf=InvalidBuffer;
9761023
scan->rs_cblock=InvalidBlockNumber;
1024+
scan->rs_prefetch_block=InvalidBlockNumber;
9771025
tuple->t_data=NULL;
9781026
scan->rs_inited= false;
9791027
}
@@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot,
10691117

10701118
initscan(scan,key, false);
10711119

1120+
scan->rs_read_stream=NULL;
1121+
1122+
/*
1123+
* Set up a read stream for sequential scans and TID range scans. This
1124+
* should be done after initscan() because initscan() allocates the
1125+
* BufferAccessStrategy object passed to the streaming read API.
1126+
*/
1127+
if (scan->rs_base.rs_flags&SO_TYPE_SEQSCAN||
1128+
scan->rs_base.rs_flags&SO_TYPE_TIDRANGESCAN)
1129+
{
1130+
ReadStreamBlockNumberCBcb;
1131+
1132+
if (scan->rs_base.rs_parallel)
1133+
cb=heap_scan_stream_read_next_parallel;
1134+
else
1135+
cb=heap_scan_stream_read_next_serial;
1136+
1137+
scan->rs_read_stream=read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
1138+
scan->rs_strategy,
1139+
scan->rs_base.rs_rd,
1140+
MAIN_FORKNUM,
1141+
cb,
1142+
scan,
1143+
0);
1144+
}
1145+
1146+
10721147
return (TableScanDesc)scan;
10731148
}
10741149

@@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
11111186

11121187
Assert(scan->rs_empty_tuples_pending==0);
11131188

1189+
/*
1190+
* The read stream is reset on rescan. This must be done before
1191+
* initscan(), as some state referred to by read_stream_reset() is reset
1192+
* in initscan().
1193+
*/
1194+
if (scan->rs_read_stream)
1195+
read_stream_reset(scan->rs_read_stream);
1196+
11141197
/*
11151198
* reinitialize scan descriptor
11161199
*/
@@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan)
11351218

11361219
Assert(scan->rs_empty_tuples_pending==0);
11371220

1221+
/*
1222+
* Must free the read stream before freeing the BufferAccessStrategy.
1223+
*/
1224+
if (scan->rs_read_stream)
1225+
read_stream_end(scan->rs_read_stream);
1226+
11381227
/*
11391228
* decrement relation reference count and free scan descriptor storage
11401229
*/

‎src/include/access/heapam.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include"storage/bufpage.h"
2626
#include"storage/dsm.h"
2727
#include"storage/lockdefs.h"
28+
#include"storage/read_stream.h"
2829
#include"storage/shm_toc.h"
2930
#include"utils/relcache.h"
3031
#include"utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
7071

7172
HeapTupleDatars_ctup;/* current tuple in scan, if any */
7273

74+
/* For scans that stream reads */
75+
ReadStream*rs_read_stream;
76+
77+
/*
78+
* For sequential scans and TID range scans to stream reads. The read
79+
* stream is allocated at the beginning of the scan and reset on rescan or
80+
* when the scan direction changes. The scan direction is saved each time
81+
* a new page is requested. If the scan direction changes from one page to
82+
* the next, the read stream releases all previously pinned buffers and
83+
* resets the prefetch block.
84+
*/
85+
ScanDirectionrs_dir;
86+
BlockNumberrs_prefetch_block;
87+
7388
/*
7489
* For parallel scans to store page allocation data. NULL when not
7590
* performing a parallel scan.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp