Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf003d9f

Browse files
committed
Add circular WAL decoding buffer.
Teach xlogreader.c to decode its output into a circular buffer, tosupport optimizations based on looking ahead. * XLogReadRecord() works as before, consuming records one by one, and allowing them to be examined via the traditional XLogRecGetXXX() macros. * An alternative new interface XLogNextRecord() is added that returns pointers to DecodedXLogRecord structs that can be examined directly. * XLogReadAhead() provides a second cursor that lets you see further ahead, as long as data is available and there is enough space in the decoding buffer. This returns DecodedXLogRecord pointers to the caller, but also adds them to a queue of records that will later be consumed by XLogNextRecord()/XLogReadRecord().The buffer's size is controlled with wal_decode_buffer_size. The buffercould potentially be placed into shared memory, for future projects.Large records that don't fit in the circular buffer are called"oversized" and allocated separately with palloc().Discussion:https://postgr.es/m/CA+hUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq=AovOddfHpA@mail.gmail.com
1 parent323cbe7 commitf003d9f

File tree

8 files changed

+734
-200
lines changed

8 files changed

+734
-200
lines changed

‎src/backend/access/transam/generic_xlog.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record)
482482
uint8block_id;
483483

484484
/* Protect limited size of buffers[] array */
485-
Assert(record->max_block_id<MAX_GENERIC_XLOG_PAGES);
485+
Assert(XLogRecMaxBlockId(record)<MAX_GENERIC_XLOG_PAGES);
486486

487487
/* Iterate over blocks */
488-
for (block_id=0;block_id <=record->max_block_id;block_id++)
488+
for (block_id=0;block_id <=XLogRecMaxBlockId(record);block_id++)
489489
{
490490
XLogRedoActionaction;
491491

@@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record)
525525
}
526526

527527
/* Changes are done: unlock and release all buffers */
528-
for (block_id=0;block_id <=record->max_block_id;block_id++)
528+
for (block_id=0;block_id <=XLogRecMaxBlockId(record);block_id++)
529529
{
530530
if (BufferIsValid(buffers[block_id]))
531531
UnlockReleaseBuffer(buffers[block_id]);

‎src/backend/access/transam/xlog.c

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata,
12091209
StringInfoDatarecordBuf;
12101210
char*errormsg=NULL;
12111211
MemoryContextoldCxt;
1212+
DecodedXLogRecord*decoded;
12121213

12131214
oldCxt=MemoryContextSwitchTo(walDebugCxt);
12141215

@@ -1224,14 +1225,19 @@ XLogInsertRecord(XLogRecData *rdata,
12241225
for (;rdata!=NULL;rdata=rdata->next)
12251226
appendBinaryStringInfo(&recordBuf,rdata->data,rdata->len);
12261227

1228+
/* How much space would it take to decode this record? */
1229+
decoded=palloc(DecodeXLogRecordRequiredSpace(recordBuf.len));
1230+
12271231
if (!debug_reader)
12281232
debug_reader=XLogReaderAllocate(wal_segment_size,NULL,NULL);
12291233

12301234
if (!debug_reader)
12311235
{
12321236
appendStringInfoString(&buf,"error decoding record: out of memory");
12331237
}
1234-
elseif (!DecodeXLogRecord(debug_reader, (XLogRecord*)recordBuf.data,
1238+
elseif (!DecodeXLogRecord(debug_reader,decoded,
1239+
(XLogRecord*)recordBuf.data,
1240+
EndPos,
12351241
&errormsg))
12361242
{
12371243
appendStringInfo(&buf,"error decoding record: %s",
@@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata,
12401246
else
12411247
{
12421248
appendStringInfoString(&buf," - ");
1249+
/*
1250+
* Temporarily make this decoded record the current record for
1251+
* XLogRecGetXXX() macros.
1252+
*/
1253+
debug_reader->record=decoded;
12431254
xlog_outdesc(&buf,debug_reader);
1255+
debug_reader->record=NULL;
12441256
}
12451257
elog(LOG,"%s",buf.data);
12461258

1259+
pfree(decoded);
12471260
pfree(buf.data);
12481261
pfree(recordBuf.data);
12491262
MemoryContextSwitchTo(oldCxt);
@@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record)
14171430

14181431
Assert((XLogRecGetInfo(record)&XLR_CHECK_CONSISTENCY)!=0);
14191432

1420-
for (block_id=0;block_id <=record->max_block_id;block_id++)
1433+
for (block_id=0;block_id <=XLogRecMaxBlockId(record);block_id++)
14211434
{
14221435
Bufferbuf;
14231436
Pagepage;
@@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
43834396

43844397
ReadRecPtr=xlogreader->ReadRecPtr;
43854398
EndRecPtr=xlogreader->EndRecPtr;
4399+
43864400
if (record==NULL)
43874401
{
43884402
if (readFile >=0)
@@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record)
1030010314
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
1030110315
* code just to distinguish them for statistics purposes.
1030210316
*/
10303-
for (uint8block_id=0;block_id <=record->max_block_id;block_id++)
10317+
for (uint8block_id=0;block_id <=XLogRecMaxBlockId(record);block_id++)
1030410318
{
1030510319
Bufferbuffer;
1030610320

@@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
1043510449
intblock_id;
1043610450

1043710451
/* decode block references */
10438-
for (block_id=0;block_id <=record->max_block_id;block_id++)
10452+
for (block_id=0;block_id <=XLogRecMaxBlockId(record);block_id++)
1043910453
{
1044010454
RelFileNodernode;
1044110455
ForkNumberforknum;
@@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state,
1210412118
XLogRecPtrtargetPagePtr=state->readPagePtr;
1210512119
intreqLen=state->reqLen;
1210612120
intreadLen=0;
12107-
XLogRecPtrtargetRecPtr=state->ReadRecPtr;
12121+
XLogRecPtrtargetRecPtr=state->DecodeRecPtr;
1210812122
uint32targetPageOff;
1210912123
XLogSegNotargetSegNoPG_USED_FOR_ASSERTS_ONLY;
1211012124
intr;
@@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state,
1212212136
/*
1212312137
* Request a restartpoint if we've replayed too much xlog since the
1212412138
* last one.
12139+
*
12140+
* XXX Why is this here? Move it to recovery loop, since it's based
12141+
* on replay position, not read position?
1212512142
*/
1212612143
if (bgwriterLaunched)
1212712144
{
@@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
1261312630
* be updated on each cycle. When we are behind,
1261412631
* XLogReceiptTime will not advance, so the grace time
1261512632
* allotted to conflicting queries will decrease.
12633+
*
1261612634
*/
1261712635
if (RecPtr<flushedUpto)
1261812636
havedata= true;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp