Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit92fc685

Browse files
committed
Respect changing pin limits in read_stream.c.
To avoid pinning too much of the buffer pool at once, read_stream.cpreviously used LimitAdditionalPins(). The coding was naive, and onlyconsidered the available buffers at stream construction time.This commit checks before each StartReadBuffers() call withGetAdditionalPinLimit(). The result might change over time due to pinsacquired outside this stream by the same backend. No extra CPU cyclesare added to the all-buffered fast-path code, but the I/O-starting pathnow considers the up-to-date remaining buffer limit.In practice it was quite difficult to exceed limits and cause any realproblems in v17, so no back-patch for now, but proposed changes willmake it easier.Per code review from Andres, in the course of testing his AIO patches.Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions)Discussion:https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com
1 parent0793ab8 commit92fc685

File tree

1 file changed

+90
-16
lines changed

1 file changed

+90
-16
lines changed

‎src/backend/storage/aio/read_stream.c

Lines changed: 90 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct ReadStream
116116
int16pinned_buffers;
117117
int16distance;
118118
booladvice_enabled;
119+
booltemporary;
119120

120121
/*
121122
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -213,8 +214,9 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data)
213214
}
214215

215216
/*
216-
* In order to deal with short reads in StartReadBuffers(), we sometimes need
217-
* to defer handling of a block until later.
217+
* In order to deal with buffer shortages and I/O limits after short reads, we
218+
* sometimes need to defer handling of a block we've already consumed from the
219+
* registered callback until later.
218220
*/
219221
staticinlinevoid
220222
read_stream_unget_block(ReadStream*stream,BlockNumberblocknum)
@@ -225,7 +227,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
225227
stream->buffered_blocknum=blocknum;
226228
}
227229

228-
staticvoid
230+
/*
231+
* Start as much of the current pending read as we can. If we have to split it
232+
* because of the per-backend buffer limit, or the buffer manager decides to
233+
* split it, then the pending read is adjusted to hold the remaining portion.
234+
*
235+
* We can always start a read of at least size one if we have no progress yet.
236+
* Otherwise it's possible that we can't start a read at all because of a lack
237+
* of buffers, and then false is returned. Buffer shortages also reduce the
238+
* distance to a level that prevents look-ahead until buffers are released.
239+
*/
240+
staticbool
229241
read_stream_start_pending_read(ReadStream*stream,boolsuppress_advice)
230242
{
231243
boolneed_wait;
@@ -234,12 +246,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234246
int16io_index;
235247
int16overflow;
236248
int16buffer_index;
249+
int16buffer_limit;
237250

238251
/* This should only be called with a pending read. */
239252
Assert(stream->pending_read_nblocks>0);
240253
Assert(stream->pending_read_nblocks <=stream->io_combine_limit);
241254

242-
/* We had better not exceed thepinlimitby starting this read. */
255+
/* We had better not exceed theper-stream bufferlimitwith this read. */
243256
Assert(stream->pinned_buffers+stream->pending_read_nblocks <=
244257
stream->max_pinned_buffers);
245258

@@ -260,10 +273,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
260273
else
261274
flags=0;
262275

263-
/* We say how many blocks we want to read, but may be smaller on return. */
276+
/* How many more buffers is this backend allowed? */
277+
if (stream->temporary)
278+
buffer_limit=Min(GetAdditionalLocalPinLimit(),PG_INT16_MAX);
279+
else
280+
buffer_limit=Min(GetAdditionalPinLimit(),PG_INT16_MAX);
281+
if (buffer_limit==0&&stream->pinned_buffers==0)
282+
buffer_limit=1;/* guarantee progress */
283+
284+
/* Does the per-backend limit affect this read? */
285+
nblocks=stream->pending_read_nblocks;
286+
if (buffer_limit<nblocks)
287+
{
288+
int16new_distance;
289+
290+
/* Shrink distance: no more look-ahead until buffers are released. */
291+
new_distance=stream->pinned_buffers+buffer_limit;
292+
if (stream->distance>new_distance)
293+
stream->distance=new_distance;
294+
295+
/* Unless we have nothing to give the consumer, stop here. */
296+
if (stream->pinned_buffers>0)
297+
return false;
298+
299+
/* A short read is required to make progress. */
300+
nblocks=buffer_limit;
301+
}
302+
303+
/*
304+
* We say how many blocks we want to read, but it may be smaller on return
305+
* if the buffer manager decides to shorten the read.
306+
*/
264307
buffer_index=stream->next_buffer_index;
265308
io_index=stream->next_io_index;
266-
nblocks=stream->pending_read_nblocks;
267309
need_wait=StartReadBuffers(&stream->ios[io_index].op,
268310
&stream->buffers[buffer_index],
269311
stream->pending_read_blocknum,
@@ -313,6 +355,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
313355
/* Adjust the pending read to cover the remaining portion, if any. */
314356
stream->pending_read_blocknum+=nblocks;
315357
stream->pending_read_nblocks-=nblocks;
358+
359+
return true;
316360
}
317361

318362
staticvoid
@@ -361,14 +405,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
361405
/* We have to start the pending read before we can build another. */
362406
while (stream->pending_read_nblocks>0)
363407
{
364-
read_stream_start_pending_read(stream,suppress_advice);
365-
suppress_advice= false;
366-
if (stream->ios_in_progress==stream->max_ios)
408+
if (!read_stream_start_pending_read(stream,suppress_advice)||
409+
stream->ios_in_progress==stream->max_ios)
367410
{
368-
/*And we've hit the limit. Rewind, and stop here. */
411+
/*We've hit thebuffer or I/Olimit. Rewind and stop here. */
369412
read_stream_unget_block(stream,blocknum);
370413
return;
371414
}
415+
416+
suppress_advice= false;
372417
}
373418

374419
/* This is the start of a new pending read. */
@@ -382,15 +427,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
382427
* io_combine_limit size once more buffers have been consumed. However,
383428
* if we've already reached io_combine_limit, or we've reached the
384429
* distance limit and there isn't anything pinned yet, or the callback has
385-
* signaled end-of-stream, we start the read immediately.
430+
* signaled end-of-stream, we start the read immediately. Note that the
431+
* pending read can exceed the distance goal, if the latter was reduced
432+
* after hitting the per-backend buffer limit.
386433
*/
387434
if (stream->pending_read_nblocks>0&&
388435
(stream->pending_read_nblocks==stream->io_combine_limit||
389-
(stream->pending_read_nblocks==stream->distance&&
436+
(stream->pending_read_nblocks>=stream->distance&&
390437
stream->pinned_buffers==0)||
391438
stream->distance==0)&&
392439
stream->ios_in_progress<stream->max_ios)
393440
read_stream_start_pending_read(stream,suppress_advice);
441+
442+
/*
443+
* There should always be something pinned when we leave this function,
444+
* whether started by this call or not, unless we've hit the end of the
445+
* stream. In the worst case we can always make progress one buffer at a
446+
* time.
447+
*/
448+
Assert(stream->pinned_buffers>0||stream->distance==0);
394449
}
395450

396451
/*
@@ -420,6 +475,7 @@ read_stream_begin_impl(int flags,
420475
intmax_ios;
421476
intstrategy_pin_limit;
422477
uint32max_pinned_buffers;
478+
uint32max_possible_buffer_limit;
423479
Oidtablespace_id;
424480

425481
/*
@@ -475,12 +531,23 @@ read_stream_begin_impl(int flags,
475531
strategy_pin_limit=GetAccessStrategyPinLimit(strategy);
476532
max_pinned_buffers=Min(strategy_pin_limit,max_pinned_buffers);
477533

478-
/* Don't allow this backend to pin more than its share of buffers. */
534+
/*
535+
* Also limit our queue to the maximum number of pins we could ever be
536+
* allowed to acquire according to the buffer manager. We may not really
537+
* be able to use them all due to other pins held by this backend, but
538+
* we'll check that later in read_stream_start_pending_read().
539+
*/
479540
if (SmgrIsTemp(smgr))
480-
LimitAdditionalLocalPins(&max_pinned_buffers);
541+
max_possible_buffer_limit=GetLocalPinLimit();
481542
else
482-
LimitAdditionalPins(&max_pinned_buffers);
483-
Assert(max_pinned_buffers>0);
543+
max_possible_buffer_limit=GetPinLimit();
544+
max_pinned_buffers=Min(max_pinned_buffers,max_possible_buffer_limit);
545+
546+
/*
547+
* The limit might be zero on a system configured with too few buffers for
548+
* the number of connections. We need at least one to make progress.
549+
*/
550+
max_pinned_buffers=Max(1,max_pinned_buffers);
484551

485552
/*
486553
* We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +613,7 @@ read_stream_begin_impl(int flags,
546613
stream->callback=callback;
547614
stream->callback_private_data=callback_private_data;
548615
stream->buffered_blocknum=InvalidBlockNumber;
616+
stream->temporary=SmgrIsTemp(smgr);
549617

550618
/*
551619
* Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +742,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
674742
* arbitrary I/O entry (they're all free). We don't have to
675743
* adjust pinned_buffers because we're transferring one to caller
676744
* but pinning one more.
745+
*
746+
* In the fast path we don't need to check the pin limit. We're
747+
* always allowed at least one pin so that progress can be made,
748+
* and that's all we need here. Although two pins are momentarily
749+
* held at the same time, the model used here is that the stream
750+
* holds only one, and the other now belongs to the caller.
677751
*/
678752
if (likely(!StartReadBuffer(&stream->ios[0].op,
679753
&stream->buffers[oldest_buffer_index],

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp