3939 * BufFile infrastructure supports temporary files that exceed the OS file size
4040 * limit, (b) provides a way for automatic clean up on the error and (c) provides
4141 * a way to survive these files across local transactions and allow to open and
42- * close at stream start and close. We decided to useSharedFileSet
42+ * close at stream start and close. We decided to useFileSet
4343 * infrastructure as without that it deletes the files on the closure of the
4444 * file and if we decide to keep stream files open across the start/stop stream
4545 * then it will consume a lot of memory (more than 8K for each BufFile and
4646 * there could be multiple such BufFiles as the subscriber could receive
4747 * multiple start/stop streams for different transactions before getting the
48- * commit). Moreover, if we don't useSharedFileSet then we also need to invent
48+ * commit). Moreover, if we don't useFileSet then we also need to invent
4949 * a new way to pass filenames to BufFile APIs so that we are allowed to open
5050 * the file we desired across multiple stream-open calls for the same
5151 * transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
246246typedef struct StreamXidHash
247247{
248248TransactionId xid ;/* xid is the hash key and must be first */
249- SharedFileSet * stream_fileset ;/* shared file set for stream data */
250- SharedFileSet * subxact_fileset ;/* shared file set for subxact info */
249+ FileSet * stream_fileset ;/* file set for stream data */
250+ FileSet * subxact_fileset ;/* file set for subxact info */
251251}StreamXidHash ;
252252
253253static MemoryContext ApplyMessageContext = NULL ;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
270270static TransactionId stream_xid = InvalidTransactionId ;
271271
272272/*
273- * Hash table for storing the streaming xid information along withshared file
274- *set for streaming and subxact files.
273+ * Hash table for storing the streaming xid information along withfilesets
274+ * for streaming and subxact files.
275275 */
276276static HTAB * xidhash = NULL ;
277277
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
12971297
12981298/* open the changes file */
12991299changes_filename (path ,MyLogicalRepWorker -> subid ,xid );
1300- fd = BufFileOpenShared (ent -> stream_fileset ,path ,O_RDWR );
1300+ fd = BufFileOpenFileSet (ent -> stream_fileset ,path ,O_RDWR );
13011301
13021302/* OK, truncate the file at the right offset */
1303- BufFileTruncateShared (fd ,subxact_data .subxacts [subidx ].fileno ,
1304- subxact_data .subxacts [subidx ].offset );
1303+ BufFileTruncateFileSet (fd ,subxact_data .subxacts [subidx ].fileno ,
1304+ subxact_data .subxacts [subidx ].offset );
13051305BufFileClose (fd );
13061306
13071307/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
13551355errmsg_internal ("transaction %u not found in stream XID hash table" ,
13561356xid )));
13571357
1358- fd = BufFileOpenShared (ent -> stream_fileset ,path ,O_RDONLY );
1358+ fd = BufFileOpenFileSet (ent -> stream_fileset ,path ,O_RDONLY );
13591359
13601360buffer = palloc (BLCKSZ );
13611361initStringInfo (& s2 );
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
25412541}
25422542}
25432543
2544+ /*
2545+ * Cleanup filesets.
2546+ */
2547+ void
2548+ logicalrep_worker_cleanupfileset (void )
2549+ {
2550+ HASH_SEQ_STATUS status ;
2551+ StreamXidHash * hentry ;
2552+
2553+ /* Remove all the pending stream and subxact filesets. */
2554+ if (xidhash )
2555+ {
2556+ hash_seq_init (& status ,xidhash );
2557+ while ((hentry = (StreamXidHash * )hash_seq_search (& status ))!= NULL )
2558+ {
2559+ FileSetDeleteAll (hentry -> stream_fileset );
2560+
2561+ /* Delete the subxact fileset iff it is created. */
2562+ if (hentry -> subxact_fileset )
2563+ FileSetDeleteAll (hentry -> subxact_fileset );
2564+ }
2565+ }
2566+ }
2567+
25442568/*
25452569 * Apply main loop.
25462570 */
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
30243048if (ent -> subxact_fileset )
30253049{
30263050cleanup_subxact_info ();
3027- SharedFileSetDeleteAll (ent -> subxact_fileset );
3051+ FileSetDeleteAll (ent -> subxact_fileset );
30283052pfree (ent -> subxact_fileset );
30293053ent -> subxact_fileset = NULL ;
30303054}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
30423066MemoryContext oldctx ;
30433067
30443068/*
3045- * We need to maintainshared fileset across multiple stream
3046- *start/stop calls. So, need to allocate it in a persistent context.
3069+ * We need to maintain fileset across multiple stream start/stop
3070+ * calls. So, need to allocate it in a persistent context.
30473071 */
30483072oldctx = MemoryContextSwitchTo (ApplyContext );
3049- ent -> subxact_fileset = palloc (sizeof (SharedFileSet ));
3050- SharedFileSetInit (ent -> subxact_fileset , NULL );
3073+ ent -> subxact_fileset = palloc (sizeof (FileSet ));
3074+ FileSetInit (ent -> subxact_fileset );
30513075MemoryContextSwitchTo (oldctx );
30523076
3053- fd = BufFileCreateShared (ent -> subxact_fileset ,path );
3077+ fd = BufFileCreateFileSet (ent -> subxact_fileset ,path );
30543078}
30553079else
3056- fd = BufFileOpenShared (ent -> subxact_fileset ,path ,O_RDWR );
3080+ fd = BufFileOpenFileSet (ent -> subxact_fileset ,path ,O_RDWR );
30573081
30583082len = sizeof (SubXactInfo )* subxact_data .nsubxacts ;
30593083
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
31073131
31083132subxact_filename (path ,subid ,xid );
31093133
3110- fd = BufFileOpenShared (ent -> subxact_fileset ,path ,O_RDONLY );
3134+ fd = BufFileOpenFileSet (ent -> subxact_fileset ,path ,O_RDONLY );
31113135
31123136/* read number of subxact items */
31133137if (BufFileRead (fd ,& subxact_data .nsubxacts ,
@@ -3264,15 +3288,15 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32643288
32653289/* Delete the change file and release the stream fileset memory */
32663290changes_filename (path ,subid ,xid );
3267- SharedFileSetDeleteAll (ent -> stream_fileset );
3291+ FileSetDeleteAll (ent -> stream_fileset );
32683292pfree (ent -> stream_fileset );
32693293ent -> stream_fileset = NULL ;
32703294
32713295/* Delete the subxact file and release the memory, if it exist */
32723296if (ent -> subxact_fileset )
32733297{
32743298subxact_filename (path ,subid ,xid );
3275- SharedFileSetDeleteAll (ent -> subxact_fileset );
3299+ FileSetDeleteAll (ent -> subxact_fileset );
32763300pfree (ent -> subxact_fileset );
32773301ent -> subxact_fileset = NULL ;
32783302}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32883312 *
32893313 * Open a file for streamed changes from a toplevel transaction identified
32903314 * by stream_xid (global variable). If it's the first chunk of streamed
3291- * changes for this transaction, initialize theshared fileset and create the
3292- *buffile, otherwise open the previously created file.
3315+ * changes for this transaction, initialize the fileset and create the buffile,
3316+ * otherwise open the previously created file.
32933317 *
32943318 * This can only be called at the beginning of a "streaming" block, i.e.
32953319 * between stream_start/stream_stop messages from the upstream.
@@ -3330,24 +3354,24 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33303354if (first_segment )
33313355{
33323356MemoryContext savectx ;
3333- SharedFileSet * fileset ;
3357+ FileSet * fileset ;
33343358
33353359if (found )
33363360ereport (ERROR ,
33373361(errcode (ERRCODE_PROTOCOL_VIOLATION ),
33383362errmsg_internal ("incorrect first-segment flag for streamed replication transaction" )));
33393363
33403364/*
3341- * We need to maintainshared fileset across multiple stream
3342- *start/stop calls. So, need to allocate it in a persistent context.
3365+ * We need to maintain fileset across multiple stream start/stop
3366+ * calls. So, need to allocate it in a persistent context.
33433367 */
33443368savectx = MemoryContextSwitchTo (ApplyContext );
3345- fileset = palloc (sizeof (SharedFileSet ));
3369+ fileset = palloc (sizeof (FileSet ));
33463370
3347- SharedFileSetInit (fileset , NULL );
3371+ FileSetInit (fileset );
33483372MemoryContextSwitchTo (savectx );
33493373
3350- stream_fd = BufFileCreateShared (fileset ,path );
3374+ stream_fd = BufFileCreateFileSet (fileset ,path );
33513375
33523376/* Remember the fileset for the next stream of the same transaction */
33533377ent -> xid = xid ;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33653389 * Open the file and seek to the end of the file because we always
33663390 * append the changes file.
33673391 */
3368- stream_fd = BufFileOpenShared (ent -> stream_fileset ,path ,O_RDWR );
3392+ stream_fd = BufFileOpenFileSet (ent -> stream_fileset ,path ,O_RDWR );
33693393BufFileSeek (stream_fd ,0 ,0 ,SEEK_END );
33703394}
33713395