Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e5c2af

Browse files
committed
Refrain from duplicating data in reorderbuffers
If a walsender exits leaving data in reorderbuffers, the next walsenderthat tries to decode the same transaction would append its decoded datain the same spill files without truncating it first, which effectivelyduplicate the data. Avoid that by removing any leftover reorderbufferspill files when a walsender starts.Backpatch to 9.4; this bug has been there from the very beginning oflogical decoding.Author: Craig Ringer, revised by meReviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
1 parent7aba4f2 commit8e5c2af

File tree

1 file changed

+82
-55
lines changed

1 file changed

+82
-55
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 82 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
202202
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
203203
char*change);
204204
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
205+
staticvoidReorderBufferCleanupSerializedTXNs(constchar*slotname);
206+
staticvoidReorderBufferSerializedPath(char*path,ReplicationSlot*slot,
207+
TransactionIdxid,XLogSegNosegno);
205208

206209
staticvoidReorderBufferFreeSnap(ReorderBuffer*rb,Snapshotsnap);
207210
staticSnapshotReorderBufferCopySnap(ReorderBuffer*rb,Snapshotorig_snap,
@@ -220,7 +223,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
220223

221224

222225
/*
223-
* Allocate a new ReorderBuffer
226+
* Allocate a new ReorderBuffer and clean out any old serialized state from
227+
* prior ReorderBuffer instances for the same slot.
224228
*/
225229
ReorderBuffer*
226230
ReorderBufferAllocate(void)
@@ -229,6 +233,8 @@ ReorderBufferAllocate(void)
229233
HASHCTLhash_ctl;
230234
MemoryContextnew_ctx;
231235

236+
Assert(MyReplicationSlot!=NULL);
237+
232238
/* allocate memory in own context, to have better accountability */
233239
new_ctx=AllocSetContextCreate(CurrentMemoryContext,
234240
"ReorderBuffer",
@@ -265,6 +271,13 @@ ReorderBufferAllocate(void)
265271
dlist_init(&buffer->cached_changes);
266272
slist_init(&buffer->cached_tuplebufs);
267273

274+
/*
275+
* Ensure there's no stale data from prior uses of this slot, in case some
276+
* prior exit avoided calling ReorderBufferFree. Failure to do this can
277+
* produce duplicated txns, and it's very cheap if there's nothing there.
278+
*/
279+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
280+
268281
returnbuffer;
269282
}
270283

@@ -281,6 +294,9 @@ ReorderBufferFree(ReorderBuffer *rb)
281294
* memory context.
282295
*/
283296
MemoryContextDelete(context);
297+
298+
/* Free disk space used by unconsumed reorder buffers */
299+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
284300
}
285301

286302
/*
@@ -2117,7 +2133,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21172133
intfd=-1;
21182134
XLogSegNocurOpenSegNo=0;
21192135
Sizespilled=0;
2120-
charpath[MAXPGPATH];
21212136

21222137
elog(DEBUG2,"spill %u changes in XID %u to disk",
21232138
(uint32)txn->nentries_mem,txn->xid);
@@ -2144,21 +2159,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21442159
*/
21452160
if (fd==-1|| !XLByteInSeg(change->lsn,curOpenSegNo))
21462161
{
2147-
XLogRecPtrrecptr;
2162+
charpath[MAXPGPATH];
21482163

21492164
if (fd!=-1)
21502165
CloseTransientFile(fd);
21512166

21522167
XLByteToSeg(change->lsn,curOpenSegNo);
2153-
XLogSegNoOffsetToRecPtr(curOpenSegNo,0,recptr);
21542168

21552169
/*
21562170
* No need to care about TLIs here, only used during a single run,
21572171
* so each LSN only maps to a specific WAL record.
21582172
*/
2159-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2160-
NameStr(MyReplicationSlot->data.name),txn->xid,
2161-
(uint32) (recptr >>32), (uint32)recptr);
2173+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
2174+
curOpenSegNo);
21622175

21632176
/* open segment, create it if necessary */
21642177
fd=OpenTransientFile(path,
@@ -2168,8 +2181,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21682181
if (fd<0)
21692182
ereport(ERROR,
21702183
(errcode_for_file_access(),
2171-
errmsg("could not open file \"%s\": %m",
2172-
path)));
2184+
errmsg("could not open file \"%s\": %m",path)));
21732185
}
21742186

21752187
ReorderBufferSerializeChange(rb,txn,fd,change);
@@ -2385,25 +2397,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23852397

23862398
if (*fd==-1)
23872399
{
2388-
XLogRecPtrrecptr;
23892400
charpath[MAXPGPATH];
23902401

23912402
/* first time in */
23922403
if (*segno==0)
2393-
{
23942404
XLByteToSeg(txn->first_lsn,*segno);
2395-
}
23962405

23972406
Assert(*segno!=0||dlist_is_empty(&txn->changes));
2398-
XLogSegNoOffsetToRecPtr(*segno,0,recptr);
23992407

24002408
/*
24012409
* No need to care about TLIs here, only used during a single run,
24022410
* so each LSN only maps to a specific WAL record.
24032411
*/
2404-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2405-
NameStr(MyReplicationSlot->data.name),txn->xid,
2406-
(uint32) (recptr >>32), (uint32)recptr);
2412+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
2413+
*segno);
24072414

24082415
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY,0);
24092416
if (*fd<0&&errno==ENOENT)
@@ -2635,20 +2642,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
26352642
for (cur=first;cur <=last;cur++)
26362643
{
26372644
charpath[MAXPGPATH];
2638-
XLogRecPtrrecptr;
2639-
2640-
XLogSegNoOffsetToRecPtr(cur,0,recptr);
26412645

2642-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2643-
NameStr(MyReplicationSlot->data.name),txn->xid,
2644-
(uint32) (recptr >>32), (uint32)recptr);
2646+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,cur);
26452647
if (unlink(path)!=0&&errno!=ENOENT)
26462648
ereport(ERROR,
26472649
(errcode_for_file_access(),
26482650
errmsg("could not remove file \"%s\": %m",path)));
26492651
}
26502652
}
26512653

2654+
/*
2655+
* Remove any leftover serialized reorder buffers from a slot directory after a
2656+
* prior crash or decoding session exit.
2657+
*/
2658+
staticvoid
2659+
ReorderBufferCleanupSerializedTXNs(constchar*slotname)
2660+
{
2661+
DIR*spill_dir;
2662+
structdirent*spill_de;
2663+
structstatstatbuf;
2664+
charpath[MAXPGPATH*2+12];
2665+
2666+
sprintf(path,"pg_replslot/%s",slotname);
2667+
2668+
/* we're only handling directories here, skip if it's not ours */
2669+
if (lstat(path,&statbuf)==0&& !S_ISDIR(statbuf.st_mode))
2670+
return;
2671+
2672+
spill_dir=AllocateDir(path);
2673+
while ((spill_de=ReadDirExtended(spill_dir,path,INFO))!=NULL)
2674+
{
2675+
/* only look at names that can be ours */
2676+
if (strncmp(spill_de->d_name,"xid",3)==0)
2677+
{
2678+
snprintf(path,sizeof(path),
2679+
"pg_replslot/%s/%s",slotname,
2680+
spill_de->d_name);
2681+
2682+
if (unlink(path)!=0)
2683+
ereport(ERROR,
2684+
(errcode_for_file_access(),
2685+
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2686+
path,slotname)));
2687+
}
2688+
}
2689+
FreeDir(spill_dir);
2690+
}
2691+
2692+
/*
2693+
* Given a replication slot, transaction ID and segment number, fill in the
2694+
* corresponding spill file into 'path', which is a caller-owned buffer of size
2695+
* at least MAXPGPATH.
2696+
*/
2697+
staticvoid
2698+
ReorderBufferSerializedPath(char*path,ReplicationSlot*slot,TransactionIdxid,
2699+
XLogSegNosegno)
2700+
{
2701+
XLogRecPtrrecptr;
2702+
2703+
XLogSegNoOffsetToRecPtr(segno,0,recptr);
2704+
2705+
snprintf(path,MAXPGPATH,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2706+
NameStr(MyReplicationSlot->data.name),
2707+
xid,
2708+
(uint32) (recptr >>32), (uint32)recptr);
2709+
}
2710+
26522711
/*
26532712
* Delete all data spilled to disk after we've restarted/crashed. It will be
26542713
* recreated when the respective slots are reused.
@@ -2659,15 +2718,9 @@ StartupReorderBuffer(void)
26592718
DIR*logical_dir;
26602719
structdirent*logical_de;
26612720

2662-
DIR*spill_dir;
2663-
structdirent*spill_de;
2664-
26652721
logical_dir=AllocateDir("pg_replslot");
26662722
while ((logical_de=ReadDir(logical_dir,"pg_replslot"))!=NULL)
26672723
{
2668-
structstatstatbuf;
2669-
charpath[MAXPGPATH*2+12];
2670-
26712724
if (strcmp(logical_de->d_name,".")==0||
26722725
strcmp(logical_de->d_name,"..")==0)
26732726
continue;
@@ -2680,33 +2733,7 @@ StartupReorderBuffer(void)
26802733
* ok, has to be a surviving logical slot, iterate and delete
26812734
* everything starting with xid-*
26822735
*/
2683-
sprintf(path,"pg_replslot/%s",logical_de->d_name);
2684-
2685-
/* we're only creating directories here, skip if it's not our's */
2686-
if (lstat(path,&statbuf)==0&& !S_ISDIR(statbuf.st_mode))
2687-
continue;
2688-
2689-
spill_dir=AllocateDir(path);
2690-
while ((spill_de=ReadDir(spill_dir,path))!=NULL)
2691-
{
2692-
if (strcmp(spill_de->d_name,".")==0||
2693-
strcmp(spill_de->d_name,"..")==0)
2694-
continue;
2695-
2696-
/* only look at names that can be ours */
2697-
if (strncmp(spill_de->d_name,"xid",3)==0)
2698-
{
2699-
sprintf(path,"pg_replslot/%s/%s",logical_de->d_name,
2700-
spill_de->d_name);
2701-
2702-
if (unlink(path)!=0)
2703-
ereport(PANIC,
2704-
(errcode_for_file_access(),
2705-
errmsg("could not remove file \"%s\": %m",
2706-
path)));
2707-
}
2708-
}
2709-
FreeDir(spill_dir);
2736+
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
27102737
}
27112738
FreeDir(logical_dir);
27122739
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp