Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitadb4661

Browse files
committed
Fix recovery_prefetch with low maintenance_io_concurrency.
We should process completed IOs *before* trying to start more, so thatit is always possible to decode one more record when the decoded recordqueue is empty, even if maintenance_io_concurrency is set so low that asingle earlier WAL record might have saturated the IO queue.That bug was hidden because the effect of maintenance_io_concurrency wasarbitrarily clamped to be at least 2. Fix the ordering, and also removethat clamp. We need a special case for 0, which is now treated the sameas recovery_prefetch=off, but otherwise the number is used directly.This allows for testing with 1, which would have made the problemobvious in simple test scenarios.Also add an explicit error message for missing contrecords. It was abit strange that we didn't report an error already, and became a latentbug with prefetching, since the internal state that tracks abortedcontrecords would not survive retrying, as revealed by026_overwrite_contrecord.pl with this adjustment. Reporting an errorprevents that.Back-patch to 15.Reported-by: Justin Pryzby <pryzby@telsasoft.com>Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>Discussion:https://postgr.es/m/20220831140128.GS31833%40telsasoft.com
1 parent12d40d4 commitadb4661

File tree

3 files changed

+56
-23
lines changed

3 files changed

+56
-23
lines changed

‎src/backend/access/transam/xlogprefetcher.c

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
intrecovery_prefetch=RECOVERY_PREFETCH_TRY;
7373

7474
#ifdefUSE_PREFETCH
75-
#defineRecoveryPrefetchEnabled() (recovery_prefetch != RECOVERY_PREFETCH_OFF)
75+
#defineRecoveryPrefetchEnabled() \
76+
(recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77+
maintenance_io_concurrency > 0)
7678
#else
7779
#defineRecoveryPrefetchEnabled() false
7880
#endif
@@ -985,6 +987,7 @@ XLogRecord *
985987
XLogPrefetcherReadRecord(XLogPrefetcher*prefetcher,char**errmsg)
986988
{
987989
DecodedXLogRecord*record;
990+
XLogRecPtrreplayed_up_to;
988991

989992
/*
990993
* See if it's time to reset the prefetching machinery, because a relevant
@@ -1000,7 +1003,8 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10001003

10011004
if (RecoveryPrefetchEnabled())
10021005
{
1003-
max_inflight=Max(maintenance_io_concurrency,2);
1006+
Assert(maintenance_io_concurrency>0);
1007+
max_inflight=maintenance_io_concurrency;
10041008
max_distance=max_inflight*XLOGPREFETCHER_DISTANCE_MULTIPLIER;
10051009
}
10061010
else
@@ -1018,14 +1022,34 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10181022
}
10191023

10201024
/*
1021-
* Release last returned record, if there is one. We need to do this so
1022-
*that we can check for empty decode queue accurately.
1025+
* Release last returned record, if there is one, as it's now been
1026+
*replayed.
10231027
*/
1024-
XLogReleasePreviousRecord(prefetcher->reader);
1028+
replayed_up_to=XLogReleasePreviousRecord(prefetcher->reader);
10251029

1026-
/* If there's nothing queued yet, then start prefetching. */
1030+
/*
1031+
* Can we drop any filters yet? If we were waiting for a relation to be
1032+
* created or extended, it is now OK to access blocks in the covered
1033+
* range.
1034+
*/
1035+
XLogPrefetcherCompleteFilters(prefetcher,replayed_up_to);
1036+
1037+
/*
1038+
* All IO initiated by earlier WAL is now completed. This might trigger
1039+
* further prefetching.
1040+
*/
1041+
lrq_complete_lsn(prefetcher->streaming_read,replayed_up_to);
1042+
1043+
/*
1044+
* If there's nothing queued yet, then start prefetching to cause at least
1045+
* one record to be queued.
1046+
*/
10271047
if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048+
{
1049+
Assert(lrq_inflight(prefetcher->streaming_read)==0);
1050+
Assert(lrq_completed(prefetcher->streaming_read)==0);
10281051
lrq_prefetch(prefetcher->streaming_read);
1052+
}
10291053

10301054
/* Read the next record. */
10311055
record=XLogNextRecord(prefetcher->reader,errmsg);
@@ -1039,12 +1063,13 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10391063
Assert(record==prefetcher->reader->record);
10401064

10411065
/*
1042-
*Can we drop any prefetch filters yet, given the record we're about to
1043-
*return? This assumes that any records with earlier LSNs have been
1044-
*replayed, so if we were waiting for a relation to be created or
1045-
*extended, it is now OKtoaccess blocks in the covered range.
1066+
*If maintenance_io_concurrency is set very low, we might have started
1067+
*prefetching some but not all of the blocks referenced in the record
1068+
*we're about to return. Forget about the rest of the blocks in this
1069+
*record by dropping the prefetcher's referencetoit.
10461070
*/
1047-
XLogPrefetcherCompleteFilters(prefetcher,record->lsn);
1071+
if (record==prefetcher->record)
1072+
prefetcher->record=NULL;
10481073

10491074
/*
10501075
* See if it's time to compute some statistics, because enough WAL has
@@ -1053,13 +1078,6 @@ XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
10531078
if (unlikely(record->lsn >=prefetcher->next_stats_shm_lsn))
10541079
XLogPrefetcherComputeStats(prefetcher);
10551080

1056-
/*
1057-
* The caller is about to replay this record, so we can now report that
1058-
* all IO initiated because of early WAL must be finished. This may
1059-
* trigger more readahead.
1060-
*/
1061-
lrq_complete_lsn(prefetcher->streaming_read,record->lsn);
1062-
10631081
Assert(record==prefetcher->reader->record);
10641082

10651083
return&record->header;

‎src/backend/access/transam/xlogreader.c

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,24 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
275275
}
276276

277277
/*
278-
*See if we can releasethe last record that was returned by
279-
*XLogNextRecord(), if any, to free up space.
278+
*Releasethe last record that was returned by XLogNextRecord(), if any, to
279+
*free up space. Returns the LSN past the end of the record.
280280
*/
281-
void
281+
XLogRecPtr
282282
XLogReleasePreviousRecord(XLogReaderState*state)
283283
{
284284
DecodedXLogRecord*record;
285+
XLogRecPtrnext_lsn;
285286

286287
if (!state->record)
287-
return;
288+
returnInvalidXLogRecPtr;
288289

289290
/*
290291
* Remove it from the decoded record queue. It must be the oldest item
291292
* decoded, decode_queue_head.
292293
*/
293294
record=state->record;
295+
next_lsn=record->next_lsn;
294296
Assert(record==state->decode_queue_head);
295297
state->record=NULL;
296298
state->decode_queue_head=record->next;
@@ -336,6 +338,8 @@ XLogReleasePreviousRecord(XLogReaderState *state)
336338
state->decode_buffer_tail=state->decode_buffer;
337339
}
338340
}
341+
342+
returnnext_lsn;
339343
}
340344

341345
/*
@@ -907,6 +911,17 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
907911
*/
908912
state->abortedRecPtr=RecPtr;
909913
state->missingContrecPtr=targetPagePtr;
914+
915+
/*
916+
* If we got here without reporting an error, report one now so that
917+
* XLogPrefetcherReadRecord() doesn't bring us back a second time and
918+
* clobber the above state. Otherwise, the existing error takes
919+
* precedence.
920+
*/
921+
if (!state->errormsg_buf[0])
922+
report_invalid_record(state,
923+
"missing contrecord at %X/%X",
924+
LSN_FORMAT_ARGS(RecPtr));
910925
}
911926

912927
if (decoded&&decoded->oversized)

‎src/include/access/xlogreader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
363363
char**errormsg);
364364

365365
/* Release the previously returned record, if necessary. */
366-
externvoidXLogReleasePreviousRecord(XLogReaderState*state);
366+
externXLogRecPtrXLogReleasePreviousRecord(XLogReaderState*state);
367367

368368
/* Try to read ahead, if there is data and space. */
369369
externDecodedXLogRecord*XLogReadAhead(XLogReaderState*state,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp