Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commited0b87c

Browse files
committed
Support buffer forwarding in read_stream.c.
In preparation for a follow-up change to the buffer manager, teachread_stream.c to manage buffers "forwarded" from one StartReadBuffers()call to the next after a short read. This involves a small amount ofextra book-keeping, and opens the way for lower levels to split I/Ooperations without having to drop pins, as required for efficienthandling of various edge cases.Concretely, the "buffers" argument will change from an out parameter toan in/out parameter. Buffer queue elements must be initialized on firstuse and cleared after they're consumed, but forwarded buffers are leftwhere they fall ahead of the current pending read in the queue, readyfor use by the operation that continues where a short read left off.The stream also needs to count them for pin limit management and releasethem on reset/early end.Tested-by: Andres Freund <andres@anarazel.de> (earlier versions)Discussion:https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
1 parent14413d0 commited0b87c

File tree

1 file changed

+78
-11
lines changed

1 file changed

+78
-11
lines changed

‎src/backend/storage/aio/read_stream.c

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ struct ReadStream
9595
int16ios_in_progress;
9696
int16queue_size;
9797
int16max_pinned_buffers;
98+
int16forwarded_buffers;
9899
int16pinned_buffers;
99100
int16distance;
101+
int16initialized_buffers;
100102
booladvice_enabled;
101103
booltemporary;
102104

@@ -224,8 +226,10 @@ static bool
224226
read_stream_start_pending_read(ReadStream*stream)
225227
{
226228
boolneed_wait;
229+
intrequested_nblocks;
227230
intnblocks;
228231
intflags;
232+
intforwarded;
229233
int16io_index;
230234
int16overflow;
231235
int16buffer_index;
@@ -272,11 +276,21 @@ read_stream_start_pending_read(ReadStream *stream)
272276
}
273277
}
274278

275-
/* How many more buffers is this backend allowed? */
279+
/*
280+
* How many more buffers is this backend allowed?
281+
*
282+
* Forwarded buffers are already pinned and map to the leading blocks of
283+
* the pending read (the remaining portion of an earlier short read that
284+
* we're about to continue). They are not counted in pinned_buffers, but
285+
* they are counted as pins already held by this backend according to the
286+
* buffer manager, so they must be added to the limit it grants us.
287+
*/
276288
if (stream->temporary)
277289
buffer_limit=Min(GetAdditionalLocalPinLimit(),PG_INT16_MAX);
278290
else
279291
buffer_limit=Min(GetAdditionalPinLimit(),PG_INT16_MAX);
292+
Assert(stream->forwarded_buffers <=stream->pending_read_nblocks);
293+
buffer_limit+=stream->forwarded_buffers;
280294
if (buffer_limit==0&&stream->pinned_buffers==0)
281295
buffer_limit=1;/* guarantee progress */
282296

@@ -301,10 +315,16 @@ read_stream_start_pending_read(ReadStream *stream)
301315

302316
/*
303317
* We say how many blocks we want to read, but it may be smaller on return
304-
* if the buffer manager decides to shorten the read.
318+
* if the buffer manager decides to shorten the read. Initialize buffers
319+
* to InvalidBuffer (= not a forwarded buffer) as input on first use only,
320+
* and keep the original nblocks number so we can check for forwarded
321+
* buffers as output, below.
305322
*/
306323
buffer_index=stream->next_buffer_index;
307324
io_index=stream->next_io_index;
325+
while (stream->initialized_buffers<buffer_index+nblocks)
326+
stream->buffers[stream->initialized_buffers++]=InvalidBuffer;
327+
requested_nblocks=nblocks;
308328
need_wait=StartReadBuffers(&stream->ios[io_index].op,
309329
&stream->buffers[buffer_index],
310330
stream->pending_read_blocknum,
@@ -333,16 +353,35 @@ read_stream_start_pending_read(ReadStream *stream)
333353
stream->seq_blocknum=stream->pending_read_blocknum+nblocks;
334354
}
335355

356+
/*
357+
* How many pins were acquired but forwarded to the next call? These need
358+
* to be passed to the next StartReadBuffers() call by leaving them
359+
* exactly where they are in the queue, or released if the stream ends
360+
* early. We need the number for accounting purposes, since they are not
361+
* counted in stream->pinned_buffers but we already hold them.
362+
*/
363+
forwarded=0;
364+
while (nblocks+forwarded<requested_nblocks&&
365+
stream->buffers[buffer_index+nblocks+forwarded]!=InvalidBuffer)
366+
forwarded++;
367+
stream->forwarded_buffers=forwarded;
368+
336369
/*
337370
* We gave a contiguous range of buffer space to StartReadBuffers(), but
338-
* we want it to wrap around at queue_size. Slide overflowing buffers to
339-
* the front of the array.
371+
* we want it to wrap around at queue_size. Copy overflowing buffers to
372+
* the front of the array where they'll be consumed, but also leave a copy
373+
* in the overflow zone which the I/O operation has a pointer to (it needs
374+
* a contiguous array). Both copies will be cleared when the buffers are
375+
* handed to the consumer.
340376
*/
341-
overflow= (buffer_index+nblocks)-stream->queue_size;
377+
overflow= (buffer_index+nblocks+forwarded)-stream->queue_size;
342378
if (overflow>0)
343-
memmove(&stream->buffers[0],
344-
&stream->buffers[stream->queue_size],
345-
sizeof(stream->buffers[0])*overflow);
379+
{
380+
Assert(overflow<stream->queue_size);/* can't overlap */
381+
memcpy(&stream->buffers[0],
382+
&stream->buffers[stream->queue_size],
383+
sizeof(stream->buffers[0])*overflow);
384+
}
346385

347386
/* Compute location of start of next read, without using % operator. */
348387
buffer_index+=nblocks;
@@ -719,10 +758,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
719758

720759
/* Fast path assumptions. */
721760
Assert(stream->ios_in_progress==0);
761+
Assert(stream->forwarded_buffers==0);
722762
Assert(stream->pinned_buffers==1);
723763
Assert(stream->distance==1);
724764
Assert(stream->pending_read_nblocks==0);
725765
Assert(stream->per_buffer_data_size==0);
766+
Assert(stream->initialized_buffers>stream->oldest_buffer_index);
726767

727768
/* We're going to return the buffer we pinned last time. */
728769
oldest_buffer_index=stream->oldest_buffer_index;
@@ -771,6 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
771812
stream->distance=0;
772813
stream->oldest_buffer_index=stream->next_buffer_index;
773814
stream->pinned_buffers=0;
815+
stream->buffers[oldest_buffer_index]=InvalidBuffer;
774816
}
775817

776818
stream->fast_path= false;
@@ -846,10 +888,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
846888
stream->seq_until_processed=InvalidBlockNumber;
847889
}
848890

849-
#ifdefCLOBBER_FREED_MEMORY
850-
/* Clobber old buffer for debugging purposes. */
891+
/*
892+
* We must zap this queue entry, or else it would appear as a forwarded
893+
* buffer. If it's potentially in the overflow zone (ie from a
894+
* multi-block I/O that wrapped around the queue), also zap the copy.
895+
*/
851896
stream->buffers[oldest_buffer_index]=InvalidBuffer;
852-
#endif
897+
if (oldest_buffer_index<stream->io_combine_limit-1)
898+
stream->buffers[stream->queue_size+oldest_buffer_index]=
899+
InvalidBuffer;
853900

854901
#if defined(CLOBBER_FREED_MEMORY)|| defined(USE_VALGRIND)
855902

@@ -894,6 +941,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
894941
#ifndefREAD_STREAM_DISABLE_FAST_PATH
895942
/* See if we can take the fast path for all-cached scans next time. */
896943
if (stream->ios_in_progress==0&&
944+
stream->forwarded_buffers==0&&
897945
stream->pinned_buffers==1&&
898946
stream->distance==1&&
899947
stream->pending_read_nblocks==0&&
@@ -929,6 +977,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
929977
void
930978
read_stream_reset(ReadStream*stream)
931979
{
980+
int16index;
932981
Bufferbuffer;
933982

934983
/* Stop looking ahead. */
@@ -942,6 +991,24 @@ read_stream_reset(ReadStream *stream)
942991
while ((buffer=read_stream_next_buffer(stream,NULL))!=InvalidBuffer)
943992
ReleaseBuffer(buffer);
944993

994+
/* Unpin any unused forwarded buffers. */
995+
index=stream->next_buffer_index;
996+
while (index<stream->initialized_buffers&&
997+
(buffer=stream->buffers[index])!=InvalidBuffer)
998+
{
999+
Assert(stream->forwarded_buffers>0);
1000+
stream->forwarded_buffers--;
1001+
ReleaseBuffer(buffer);
1002+
1003+
stream->buffers[index]=InvalidBuffer;
1004+
if (index<stream->io_combine_limit-1)
1005+
stream->buffers[stream->queue_size+index]=InvalidBuffer;
1006+
1007+
if (++index==stream->queue_size)
1008+
index=0;
1009+
}
1010+
1011+
Assert(stream->forwarded_buffers==0);
9451012
Assert(stream->pinned_buffers==0);
9461013
Assert(stream->ios_in_progress==0);
9471014

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp