Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcee1dd1

Browse files
committed
Refrain from duplicating data in reorderbuffers
If a walsender exits leaving data in reorderbuffers, the next walsenderthat tries to decode the same transaction would append its decoded datain the same spill files without truncating it first, which effectivelyduplicate the data. Avoid that by removing any leftover reorderbufferspill files when a walsender starts.Backpatch to 9.4; this bug has been there from the very beginning oflogical decoding.Author: Craig Ringer, revised by meReviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
1 parente20dd6a commitcee1dd1

File tree

1 file changed

+82
-55
lines changed

1 file changed

+82
-55
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 82 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
199199
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
200200
char*change);
201201
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
202+
staticvoidReorderBufferCleanupSerializedTXNs(constchar*slotname);
203+
staticvoidReorderBufferSerializedPath(char*path,ReplicationSlot*slot,
204+
TransactionIdxid,XLogSegNosegno);
202205

203206
staticvoidReorderBufferFreeSnap(ReorderBuffer*rb,Snapshotsnap);
204207
staticSnapshotReorderBufferCopySnap(ReorderBuffer*rb,Snapshotorig_snap,
@@ -217,7 +220,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
217220

218221

219222
/*
220-
* Allocate a new ReorderBuffer
223+
* Allocate a new ReorderBuffer and clean out any old serialized state from
224+
* prior ReorderBuffer instances for the same slot.
221225
*/
222226
ReorderBuffer*
223227
ReorderBufferAllocate(void)
@@ -226,6 +230,8 @@ ReorderBufferAllocate(void)
226230
HASHCTLhash_ctl;
227231
MemoryContextnew_ctx;
228232

233+
Assert(MyReplicationSlot!=NULL);
234+
229235
/* allocate memory in own context, to have better accountability */
230236
new_ctx=AllocSetContextCreate(CurrentMemoryContext,
231237
"ReorderBuffer",
@@ -268,6 +274,13 @@ ReorderBufferAllocate(void)
268274
dlist_init(&buffer->toplevel_by_lsn);
269275
slist_init(&buffer->cached_tuplebufs);
270276

277+
/*
278+
* Ensure there's no stale data from prior uses of this slot, in case some
279+
* prior exit avoided calling ReorderBufferFree. Failure to do this can
280+
* produce duplicated txns, and it's very cheap if there's nothing there.
281+
*/
282+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
283+
271284
returnbuffer;
272285
}
273286

@@ -284,6 +297,9 @@ ReorderBufferFree(ReorderBuffer *rb)
284297
* memory context.
285298
*/
286299
MemoryContextDelete(context);
300+
301+
/* Free disk space used by unconsumed reorder buffers */
302+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
287303
}
288304

289305
/*
@@ -2073,7 +2089,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
20732089
intfd=-1;
20742090
XLogSegNocurOpenSegNo=0;
20752091
Sizespilled=0;
2076-
charpath[MAXPGPATH];
20772092

20782093
elog(DEBUG2,"spill %u changes in XID %u to disk",
20792094
(uint32)txn->nentries_mem,txn->xid);
@@ -2100,21 +2115,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21002115
*/
21012116
if (fd==-1|| !XLByteInSeg(change->lsn,curOpenSegNo))
21022117
{
2103-
XLogRecPtrrecptr;
2118+
charpath[MAXPGPATH];
21042119

21052120
if (fd!=-1)
21062121
CloseTransientFile(fd);
21072122

21082123
XLByteToSeg(change->lsn,curOpenSegNo);
2109-
XLogSegNoOffsetToRecPtr(curOpenSegNo,0,recptr);
21102124

21112125
/*
21122126
* No need to care about TLIs here, only used during a single run,
21132127
* so each LSN only maps to a specific WAL record.
21142128
*/
2115-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2116-
NameStr(MyReplicationSlot->data.name),txn->xid,
2117-
(uint32) (recptr >>32), (uint32)recptr);
2129+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
2130+
curOpenSegNo);
21182131

21192132
/* open segment, create it if necessary */
21202133
fd=OpenTransientFile(path,
@@ -2124,8 +2137,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21242137
if (fd<0)
21252138
ereport(ERROR,
21262139
(errcode_for_file_access(),
2127-
errmsg("could not open file \"%s\": %m",
2128-
path)));
2140+
errmsg("could not open file \"%s\": %m",path)));
21292141
}
21302142

21312143
ReorderBufferSerializeChange(rb,txn,fd,change);
@@ -2343,25 +2355,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23432355

23442356
if (*fd==-1)
23452357
{
2346-
XLogRecPtrrecptr;
23472358
charpath[MAXPGPATH];
23482359

23492360
/* first time in */
23502361
if (*segno==0)
2351-
{
23522362
XLByteToSeg(txn->first_lsn,*segno);
2353-
}
23542363

23552364
Assert(*segno!=0||dlist_is_empty(&txn->changes));
2356-
XLogSegNoOffsetToRecPtr(*segno,0,recptr);
23572365

23582366
/*
23592367
* No need to care about TLIs here, only used during a single run,
23602368
* so each LSN only maps to a specific WAL record.
23612369
*/
2362-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2363-
NameStr(MyReplicationSlot->data.name),txn->xid,
2364-
(uint32) (recptr >>32), (uint32)recptr);
2370+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
2371+
*segno);
23652372

23662373
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY,0);
23672374
if (*fd<0&&errno==ENOENT)
@@ -2597,20 +2604,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
25972604
for (cur=first;cur <=last;cur++)
25982605
{
25992606
charpath[MAXPGPATH];
2600-
XLogRecPtrrecptr;
2601-
2602-
XLogSegNoOffsetToRecPtr(cur,0,recptr);
26032607

2604-
sprintf(path,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2605-
NameStr(MyReplicationSlot->data.name),txn->xid,
2606-
(uint32) (recptr >>32), (uint32)recptr);
2608+
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,cur);
26072609
if (unlink(path)!=0&&errno!=ENOENT)
26082610
ereport(ERROR,
26092611
(errcode_for_file_access(),
26102612
errmsg("could not remove file \"%s\": %m",path)));
26112613
}
26122614
}
26132615

2616+
/*
2617+
* Remove any leftover serialized reorder buffers from a slot directory after a
2618+
* prior crash or decoding session exit.
2619+
*/
2620+
staticvoid
2621+
ReorderBufferCleanupSerializedTXNs(constchar*slotname)
2622+
{
2623+
DIR*spill_dir;
2624+
structdirent*spill_de;
2625+
structstatstatbuf;
2626+
charpath[MAXPGPATH*2+12];
2627+
2628+
sprintf(path,"pg_replslot/%s",slotname);
2629+
2630+
/* we're only handling directories here, skip if it's not ours */
2631+
if (lstat(path,&statbuf)==0&& !S_ISDIR(statbuf.st_mode))
2632+
return;
2633+
2634+
spill_dir=AllocateDir(path);
2635+
while ((spill_de=ReadDirExtended(spill_dir,path,INFO))!=NULL)
2636+
{
2637+
/* only look at names that can be ours */
2638+
if (strncmp(spill_de->d_name,"xid",3)==0)
2639+
{
2640+
snprintf(path,sizeof(path),
2641+
"pg_replslot/%s/%s",slotname,
2642+
spill_de->d_name);
2643+
2644+
if (unlink(path)!=0)
2645+
ereport(ERROR,
2646+
(errcode_for_file_access(),
2647+
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2648+
path,slotname)));
2649+
}
2650+
}
2651+
FreeDir(spill_dir);
2652+
}
2653+
2654+
/*
2655+
* Given a replication slot, transaction ID and segment number, fill in the
2656+
* corresponding spill file into 'path', which is a caller-owned buffer of size
2657+
* at least MAXPGPATH.
2658+
*/
2659+
staticvoid
2660+
ReorderBufferSerializedPath(char*path,ReplicationSlot*slot,TransactionIdxid,
2661+
XLogSegNosegno)
2662+
{
2663+
XLogRecPtrrecptr;
2664+
2665+
XLogSegNoOffsetToRecPtr(segno,0,recptr);
2666+
2667+
snprintf(path,MAXPGPATH,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2668+
NameStr(MyReplicationSlot->data.name),
2669+
xid,
2670+
(uint32) (recptr >>32), (uint32)recptr);
2671+
}
2672+
26142673
/*
26152674
* Delete all data spilled to disk after we've restarted/crashed. It will be
26162675
* recreated when the respective slots are reused.
@@ -2621,15 +2680,9 @@ StartupReorderBuffer(void)
26212680
DIR*logical_dir;
26222681
structdirent*logical_de;
26232682

2624-
DIR*spill_dir;
2625-
structdirent*spill_de;
2626-
26272683
logical_dir=AllocateDir("pg_replslot");
26282684
while ((logical_de=ReadDir(logical_dir,"pg_replslot"))!=NULL)
26292685
{
2630-
structstatstatbuf;
2631-
charpath[MAXPGPATH*2+12];
2632-
26332686
if (strcmp(logical_de->d_name,".")==0||
26342687
strcmp(logical_de->d_name,"..")==0)
26352688
continue;
@@ -2642,33 +2695,7 @@ StartupReorderBuffer(void)
26422695
* ok, has to be a surviving logical slot, iterate and delete
26432696
* everything starting with xid-*
26442697
*/
2645-
sprintf(path,"pg_replslot/%s",logical_de->d_name);
2646-
2647-
/* we're only creating directories here, skip if it's not our's */
2648-
if (lstat(path,&statbuf)==0&& !S_ISDIR(statbuf.st_mode))
2649-
continue;
2650-
2651-
spill_dir=AllocateDir(path);
2652-
while ((spill_de=ReadDir(spill_dir,path))!=NULL)
2653-
{
2654-
if (strcmp(spill_de->d_name,".")==0||
2655-
strcmp(spill_de->d_name,"..")==0)
2656-
continue;
2657-
2658-
/* only look at names that can be ours */
2659-
if (strncmp(spill_de->d_name,"xid",3)==0)
2660-
{
2661-
sprintf(path,"pg_replslot/%s/%s",logical_de->d_name,
2662-
spill_de->d_name);
2663-
2664-
if (unlink(path)!=0)
2665-
ereport(PANIC,
2666-
(errcode_for_file_access(),
2667-
errmsg("could not remove file \"%s\": %m",
2668-
path)));
2669-
}
2670-
}
2671-
FreeDir(spill_dir);
2698+
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
26722699
}
26732700
FreeDir(logical_dir);
26742701
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp