@@ -202,6 +202,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
202202static void ReorderBufferRestoreChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
203203char * change );
204204static void ReorderBufferRestoreCleanup (ReorderBuffer * rb ,ReorderBufferTXN * txn );
205+ static void ReorderBufferCleanupSerializedTXNs (const char * slotname );
206+ static void ReorderBufferSerializedPath (char * path ,ReplicationSlot * slot ,
207+ TransactionId xid ,XLogSegNo segno );
205208
206209static void ReorderBufferFreeSnap (ReorderBuffer * rb ,Snapshot snap );
207210static Snapshot ReorderBufferCopySnap (ReorderBuffer * rb ,Snapshot orig_snap ,
@@ -220,7 +223,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
220223
221224
222225/*
223- * Allocate a new ReorderBuffer
226+ * Allocate a new ReorderBuffer and clean out any old serialized state from
227+ * prior ReorderBuffer instances for the same slot.
224228 */
225229ReorderBuffer *
226230ReorderBufferAllocate (void )
@@ -229,6 +233,8 @@ ReorderBufferAllocate(void)
229233HASHCTL hash_ctl ;
230234MemoryContext new_ctx ;
231235
236+ Assert (MyReplicationSlot != NULL );
237+
232238/* allocate memory in own context, to have better accountability */
233239new_ctx = AllocSetContextCreate (CurrentMemoryContext ,
234240"ReorderBuffer" ,
@@ -265,6 +271,13 @@ ReorderBufferAllocate(void)
265271dlist_init (& buffer -> cached_changes );
266272slist_init (& buffer -> cached_tuplebufs );
267273
274+ /*
275+ * Ensure there's no stale data from prior uses of this slot, in case some
276+ * prior exit avoided calling ReorderBufferFree. Failure to do this can
277+ * produce duplicated txns, and it's very cheap if there's nothing there.
278+ */
279+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
280+
268281return buffer ;
269282}
270283
@@ -281,6 +294,9 @@ ReorderBufferFree(ReorderBuffer *rb)
281294 * memory context.
282295 */
283296MemoryContextDelete (context );
297+
298+ /* Free disk space used by unconsumed reorder buffers */
299+ ReorderBufferCleanupSerializedTXNs (NameStr (MyReplicationSlot -> data .name ));
284300}
285301
286302/*
@@ -2117,7 +2133,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21172133int fd = -1 ;
21182134XLogSegNo curOpenSegNo = 0 ;
21192135Size spilled = 0 ;
2120- char path [MAXPGPATH ];
21212136
21222137elog (DEBUG2 ,"spill %u changes in XID %u to disk" ,
21232138 (uint32 )txn -> nentries_mem ,txn -> xid );
@@ -2144,21 +2159,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21442159 */
21452160if (fd == -1 || !XLByteInSeg (change -> lsn ,curOpenSegNo ))
21462161{
2147- XLogRecPtr recptr ;
2162+ char path [ MAXPGPATH ] ;
21482163
21492164if (fd != -1 )
21502165CloseTransientFile (fd );
21512166
21522167XLByteToSeg (change -> lsn ,curOpenSegNo );
2153- XLogSegNoOffsetToRecPtr (curOpenSegNo ,0 ,recptr );
21542168
21552169/*
21562170 * No need to care about TLIs here, only used during a single run,
21572171 * so each LSN only maps to a specific WAL record.
21582172 */
2159- sprintf (path ,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2160- NameStr (MyReplicationSlot -> data .name ),txn -> xid ,
2161- (uint32 ) (recptr >>32 ), (uint32 )recptr );
2173+ ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,
2174+ curOpenSegNo );
21622175
21632176/* open segment, create it if necessary */
21642177fd = OpenTransientFile (path ,
@@ -2168,8 +2181,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21682181if (fd < 0 )
21692182ereport (ERROR ,
21702183(errcode_for_file_access (),
2171- errmsg ("could not open file \"%s\": %m" ,
2172- path )));
2184+ errmsg ("could not open file \"%s\": %m" ,path )));
21732185}
21742186
21752187ReorderBufferSerializeChange (rb ,txn ,fd ,change );
@@ -2385,25 +2397,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23852397
23862398if (* fd == -1 )
23872399{
2388- XLogRecPtr recptr ;
23892400char path [MAXPGPATH ];
23902401
23912402/* first time in */
23922403if (* segno == 0 )
2393- {
23942404XLByteToSeg (txn -> first_lsn ,* segno );
2395- }
23962405
23972406Assert (* segno != 0 || dlist_is_empty (& txn -> changes ));
2398- XLogSegNoOffsetToRecPtr (* segno ,0 ,recptr );
23992407
24002408/*
24012409 * No need to care about TLIs here, only used during a single run,
24022410 * so each LSN only maps to a specific WAL record.
24032411 */
2404- sprintf (path ,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2405- NameStr (MyReplicationSlot -> data .name ),txn -> xid ,
2406- (uint32 ) (recptr >>32 ), (uint32 )recptr );
2412+ ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,
2413+ * segno );
24072414
24082415* fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY ,0 );
24092416if (* fd < 0 && errno == ENOENT )
@@ -2635,20 +2642,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
26352642for (cur = first ;cur <=last ;cur ++ )
26362643{
26372644char path [MAXPGPATH ];
2638- XLogRecPtr recptr ;
2639-
2640- XLogSegNoOffsetToRecPtr (cur ,0 ,recptr );
26412645
2642- sprintf (path ,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2643- NameStr (MyReplicationSlot -> data .name ),txn -> xid ,
2644- (uint32 ) (recptr >>32 ), (uint32 )recptr );
2646+ ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,cur );
26452647if (unlink (path )!= 0 && errno != ENOENT )
26462648ereport (ERROR ,
26472649(errcode_for_file_access (),
26482650errmsg ("could not remove file \"%s\": %m" ,path )));
26492651}
26502652}
26512653
2654+ /*
2655+ * Remove any leftover serialized reorder buffers from a slot directory after a
2656+ * prior crash or decoding session exit.
2657+ */
2658+ static void
2659+ ReorderBufferCleanupSerializedTXNs (const char * slotname )
2660+ {
2661+ DIR * spill_dir ;
2662+ struct dirent * spill_de ;
2663+ struct stat statbuf ;
2664+ char path [MAXPGPATH * 2 + 12 ];
2665+
2666+ sprintf (path ,"pg_replslot/%s" ,slotname );
2667+
2668+ /* we're only handling directories here, skip if it's not ours */
2669+ if (lstat (path ,& statbuf )== 0 && !S_ISDIR (statbuf .st_mode ))
2670+ return ;
2671+
2672+ spill_dir = AllocateDir (path );
2673+ while ((spill_de = ReadDirExtended (spill_dir ,path ,INFO ))!= NULL )
2674+ {
2675+ /* only look at names that can be ours */
2676+ if (strncmp (spill_de -> d_name ,"xid" ,3 )== 0 )
2677+ {
2678+ snprintf (path ,sizeof (path ),
2679+ "pg_replslot/%s/%s" ,slotname ,
2680+ spill_de -> d_name );
2681+
2682+ if (unlink (path )!= 0 )
2683+ ereport (ERROR ,
2684+ (errcode_for_file_access (),
2685+ errmsg ("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m" ,
2686+ path ,slotname )));
2687+ }
2688+ }
2689+ FreeDir (spill_dir );
2690+ }
2691+
2692+ /*
2693+ * Given a replication slot, transaction ID and segment number, fill in the
2694+ * corresponding spill file into 'path', which is a caller-owned buffer of size
2695+ * at least MAXPGPATH.
2696+ */
2697+ static void
2698+ ReorderBufferSerializedPath (char * path ,ReplicationSlot * slot ,TransactionId xid ,
2699+ XLogSegNo segno )
2700+ {
2701+ XLogRecPtr recptr ;
2702+
2703+ XLogSegNoOffsetToRecPtr (segno ,0 ,recptr );
2704+
2705+ snprintf (path ,MAXPGPATH ,"pg_replslot/%s/xid-%u-lsn-%X-%X.snap" ,
2706+ NameStr (MyReplicationSlot -> data .name ),
2707+ xid ,
2708+ (uint32 ) (recptr >>32 ), (uint32 )recptr );
2709+ }
2710+
26522711/*
26532712 * Delete all data spilled to disk after we've restarted/crashed. It will be
26542713 * recreated when the respective slots are reused.
@@ -2659,15 +2718,9 @@ StartupReorderBuffer(void)
26592718DIR * logical_dir ;
26602719struct dirent * logical_de ;
26612720
2662- DIR * spill_dir ;
2663- struct dirent * spill_de ;
2664-
26652721logical_dir = AllocateDir ("pg_replslot" );
26662722while ((logical_de = ReadDir (logical_dir ,"pg_replslot" ))!= NULL )
26672723{
2668- struct stat statbuf ;
2669- char path [MAXPGPATH * 2 + 12 ];
2670-
26712724if (strcmp (logical_de -> d_name ,"." )== 0 ||
26722725strcmp (logical_de -> d_name ,".." )== 0 )
26732726continue ;
@@ -2680,33 +2733,7 @@ StartupReorderBuffer(void)
26802733 * ok, has to be a surviving logical slot, iterate and delete
26812734 * everything starting with xid-*
26822735 */
2683- sprintf (path ,"pg_replslot/%s" ,logical_de -> d_name );
2684-
2685- /* we're only creating directories here, skip if it's not our's */
2686- if (lstat (path ,& statbuf )== 0 && !S_ISDIR (statbuf .st_mode ))
2687- continue ;
2688-
2689- spill_dir = AllocateDir (path );
2690- while ((spill_de = ReadDir (spill_dir ,path ))!= NULL )
2691- {
2692- if (strcmp (spill_de -> d_name ,"." )== 0 ||
2693- strcmp (spill_de -> d_name ,".." )== 0 )
2694- continue ;
2695-
2696- /* only look at names that can be ours */
2697- if (strncmp (spill_de -> d_name ,"xid" ,3 )== 0 )
2698- {
2699- sprintf (path ,"pg_replslot/%s/%s" ,logical_de -> d_name ,
2700- spill_de -> d_name );
2701-
2702- if (unlink (path )!= 0 )
2703- ereport (PANIC ,
2704- (errcode_for_file_access (),
2705- errmsg ("could not remove file \"%s\": %m" ,
2706- path )));
2707- }
2708- }
2709- FreeDir (spill_dir );
2736+ ReorderBufferCleanupSerializedTXNs (logical_de -> d_name );
27102737}
27112738FreeDir (logical_dir );
27122739}