Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdd38ff2

Browse files
committed
Fix recovery_prefetch with low maintenance_io_concurrency.
We should process completed IOs *before* trying to start more, so thatit is always possible to decode one more record when the decoded recordqueue is empty, even if maintenance_io_concurrency is set so low that asingle earlier WAL record might have saturated the IO queue.That bug was hidden because the effect of maintenance_io_concurrency wasarbitrarily clamped to be at least 2. Fix the ordering, and also removethat clamp. We need a special case for 0, which is now treated the sameas recovery_prefetch=off, but otherwise the number is used directly.This allows for testing with 1, which would have made the problemobvious in simple test scenarios.Also add an explicit error message for missing contrecords. It was abit strange that we didn't report an error already, and became a latentbug with prefetching, since the internal state that tracks abortedcontrecords would not survive retrying, as revealed by026_overwrite_contrecord.pl with this adjustment. Reporting an errorprevents that.Back-patch to 15.Reported-by: Justin Pryzby <pryzby@telsasoft.com>Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>Discussion:https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
1 parent144cefa commitdd38ff2

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

‎src/backend/access/transam/xlogprefetcher.c

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
intrecovery_prefetch=RECOVERY_PREFETCH_TRY;
7373

7474
#ifdefUSE_PREFETCH
75-
#defineRecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
75+
#defineRecoveryPrefetchEnabled() \
76+
(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77+
maintenance_io_concurrency > 0)
7678
#else
7779
#defineRecoveryPrefetchEnabled() false
7880
#endif
@@ -983,6 +985,7 @@ XLogRecord *
983985
XLogPrefetcherReadRecord(XLogPrefetcher*prefetcher,char**errmsg)
984986
{
985987
DecodedXLogRecord*record;
988+
XLogRecPtrreplayed_up_to;
986989

987990
/*
988991
* See if it's time to reset the prefetching machinery, because a relevant
@@ -998,7 +1001,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
9981001

9991002
if (RecoveryPrefetchEnabled())
10001003
{
1001-
max_inflight=Max(maintenance_io_concurrency,2);
1004+
Assert(maintenance_io_concurrency>0);
1005+
max_inflight=maintenance_io_concurrency;
10021006
max_distance=max_inflight*XLOGPREFETCHER_DISTANCE_MULTIPLIER;
10031007
}
10041008
else
@@ -1016,14 +1020,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10161020
}
10171021

10181022
/*
1019-
* Release last returned record, if there is one. We need to do this so
1020-
*that we can check for empty decode queue accurately.
1023+
* Release last returned record, if there is one, as it's now been
1024+
*replayed.
10211025
*/
1022-
XLogReleasePreviousRecord(prefetcher->reader);
1026+
replayed_up_to=XLogReleasePreviousRecord(prefetcher->reader);
10231027

1024-
/* If there's nothing queued yet, then start prefetching. */
1028+
/*
1029+
* Can we drop any filters yet? If we were waiting for a relation to be
1030+
* created or extended, it is now OK to access blocks in the covered
1031+
* range.
1032+
*/
1033+
XLogPrefetcherCompleteFilters(prefetcher,replayed_up_to);
1034+
1035+
/*
1036+
* All IO initiated by earlier WAL is now completed. This might trigger
1037+
* further prefetching.
1038+
*/
1039+
lrq_complete_lsn(prefetcher->streaming_read,replayed_up_to);
1040+
1041+
/*
1042+
* If there's nothing queued yet, then start prefetching to cause at least
1043+
* one record to be queued.
1044+
*/
10251045
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1046+
{
1047+
Assert(lrq_inflight(prefetcher->streaming_read)==0);
1048+
Assert(lrq_completed(prefetcher->streaming_read)==0);
10261049
lrq_prefetch(prefetcher->streaming_read);
1050+
}
10271051

10281052
/* Read the next record. */
10291053
record=XLogNextRecord(prefetcher->reader,errmsg);
@@ -1037,12 +1061,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10371061
Assert(record==prefetcher->reader->record);
10381062

10391063
/*
1040-
*Can we drop any prefetch filters yet, given the record we're about to
1041-
*return? This assumes that any records with earlier LSNs have been
1042-
*replayed, so if we were waiting for a relation to be created or
1043-
*extended, it is now OKtoaccess blocks in the covered range.
1064+
*If maintenance_io_concurrency is set very low, we might have started
1065+
*prefetching some but not all of the blocks referenced in the record
1066+
*we're about to return. Forget about the rest of the blocks in this
1067+
*record by dropping the prefetcher's referencetoit.
10441068
*/
1045-
XLogPrefetcherCompleteFilters(prefetcher,record->lsn);
1069+
if (record==prefetcher->record)
1070+
prefetcher->record=NULL;
10461071

10471072
/*
10481073
* See if it's time to compute some statistics, because enough WAL has
@@ -1051,13 +1076,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10511076
if (unlikely(record->lsn >=prefetcher->next_stats_shm_lsn))
10521077
XLogPrefetcherComputeStats(prefetcher);
10531078

1054-
/*
1055-
* The caller is about to replay this record, so we can now report that
1056-
* all IO initiated because of early WAL must be finished. This may
1057-
* trigger more readahead.
1058-
*/
1059-
lrq_complete_lsn(prefetcher->streaming_read,record->lsn);
1060-
10611079
Assert(record==prefetcher->reader->record);
10621080

10631081
return&record->header;

‎src/backend/access/transam/xlogreader.c

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
275275
}
276276

277277
/*
278-
*See if we can releasethe last record that was returned by
279-
*XLogNextRecord(), if any, to free up space.
278+
*Releasethe last record that was returned by XLogNextRecord(), if any, to
279+
*free up space. Returns the LSN past the end of the record.
280280
*/
281-
void
281+
XLogRecPtr
282282
XLogReleasePreviousRecord(XLogReaderState*state)
283283
{
284284
DecodedXLogRecord*record;
285+
XLogRecPtrnext_lsn;
285286

286287
if (!state->record)
287-
return;
288+
returnInvalidXLogRecPtr;
288289

289290
/*
290291
* Remove it from the decoded record queue. It must be the oldest item
291292
* decoded, decode_queue_head.
292293
*/
293294
record=state->record;
295+
next_lsn=record->next_lsn;
294296
Assert(record==state->decode_queue_head);
295297
state->record=NULL;
296298
state->decode_queue_head=record->next;
@@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
336338
state->decode_buffer_tail=state->decode_buffer;
337339
}
338340
}
341+
342+
returnnext_lsn;
339343
}
340344

341345
/*
@@ -906,6 +910,17 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
906910
*/
907911
state->abortedRecPtr=RecPtr;
908912
state->missingContrecPtr=targetPagePtr;
913+
914+
/*
915+
* If we got here without reporting an error, report one now so that
916+
* XLogPrefetcherReadRecord() doesn't bring us back a second time and
917+
* clobber the above state. Otherwise, the existing error takes
918+
* precedence.
919+
*/
920+
if (!state->errormsg_buf[0])
921+
report_invalid_record(state,
922+
"missing contrecord at %X/%X",
923+
LSN_FORMAT_ARGS(RecPtr));
909924
}
910925

911926
if (decoded&&decoded->oversized)

‎src/include/access/xlogreader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
363363
char**errormsg);
364364

365365
/* Release the previously returned record, if necessary. */
366-
externvoidXLogReleasePreviousRecord(XLogReaderState*state);
366+
externXLogRecPtrXLogReleasePreviousRecord(XLogReaderState*state);
367367

368368
/* Try to read ahead, if there is data and space. */
369369
externDecodedXLogRecord*XLogReadAhead(XLogReaderState*state,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp