Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitdcac5e7

Browse files
author
Amit Kapila
committed
Refactor sharedfileset.c to separate out fileset implementation.
Move fileset related implementation out of sharedfileset.c to allow itsusage by backends that don't want to share filesets among differentprocesses. After this split, fileset infrastructure is used by bothsharedfileset.c and worker.c for the named temporary files that surviveacross transactions.Author: Dilip Kumar, based on suggestion by Andres FreundReviewed-by: Hou Zhijie, Masahiko Sawada, Amit KapilaDiscussion:https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
1 parentd3fa876 commitdcac5e7

File tree

14 files changed

+368
-336
lines changed

14 files changed

+368
-336
lines changed

‎src/backend/replication/logical/launcher.c‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg)
648648

649649
logicalrep_worker_detach();
650650

651+
/* Cleanup filesets used for streaming transactions. */
652+
logicalrep_worker_cleanupfileset();
653+
651654
ApplyLauncherWakeup();
652655
}
653656

‎src/backend/replication/logical/worker.c‎

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
* BufFile infrastructure supports temporary files that exceed the OS file size
4040
* limit, (b) provides a way for automatic clean up on the error and (c) provides
4141
* a way to survive these files across local transactions and allow to open and
42-
* close at stream start and close. We decided to useSharedFileSet
42+
* close at stream start and close. We decided to useFileSet
4343
* infrastructure as without that it deletes the files on the closure of the
4444
* file and if we decide to keep stream files open across the start/stop stream
4545
* then it will consume a lot of memory (more than 8K for each BufFile and
4646
* there could be multiple such BufFiles as the subscriber could receive
4747
* multiple start/stop streams for different transactions before getting the
48-
* commit). Moreover, if we don't useSharedFileSet then we also need to invent
48+
* commit). Moreover, if we don't useFileSet then we also need to invent
4949
* a new way to pass filenames to BufFile APIs so that we are allowed to open
5050
* the file we desired across multiple stream-open calls for the same
5151
* transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
246246
typedefstructStreamXidHash
247247
{
248248
TransactionIdxid;/* xid is the hash key and must be first */
249-
SharedFileSet*stream_fileset;/* shared file set for stream data */
250-
SharedFileSet*subxact_fileset;/* shared file set for subxact info */
249+
FileSet*stream_fileset;/* file set for stream data */
250+
FileSet*subxact_fileset;/* file set for subxact info */
251251
}StreamXidHash;
252252

253253
staticMemoryContextApplyMessageContext=NULL;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
270270
staticTransactionIdstream_xid=InvalidTransactionId;
271271

272272
/*
273-
* Hash table for storing the streaming xid information along withshared file
274-
*setfor streaming and subxact files.
273+
* Hash table for storing the streaming xid information along withfilesets
274+
* for streaming and subxact files.
275275
*/
276276
staticHTAB*xidhash=NULL;
277277

@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
12971297

12981298
/* open the changes file */
12991299
changes_filename(path,MyLogicalRepWorker->subid,xid);
1300-
fd=BufFileOpenShared(ent->stream_fileset,path,O_RDWR);
1300+
fd=BufFileOpenFileSet(ent->stream_fileset,path,O_RDWR);
13011301

13021302
/* OK, truncate the file at the right offset */
1303-
BufFileTruncateShared(fd,subxact_data.subxacts[subidx].fileno,
1304-
subxact_data.subxacts[subidx].offset);
1303+
BufFileTruncateFileSet(fd,subxact_data.subxacts[subidx].fileno,
1304+
subxact_data.subxacts[subidx].offset);
13051305
BufFileClose(fd);
13061306

13071307
/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
13551355
errmsg_internal("transaction %u not found in stream XID hash table",
13561356
xid)));
13571357

1358-
fd=BufFileOpenShared(ent->stream_fileset,path,O_RDONLY);
1358+
fd=BufFileOpenFileSet(ent->stream_fileset,path,O_RDONLY);
13591359

13601360
buffer=palloc(BLCKSZ);
13611361
initStringInfo(&s2);
@@ -2541,6 +2541,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
25412541
}
25422542
}
25432543

2544+
/*
2545+
* Cleanup filesets.
2546+
*/
2547+
void
2548+
logicalrep_worker_cleanupfileset(void)
2549+
{
2550+
HASH_SEQ_STATUSstatus;
2551+
StreamXidHash*hentry;
2552+
2553+
/* Remove all the pending stream and subxact filesets. */
2554+
if (xidhash)
2555+
{
2556+
hash_seq_init(&status,xidhash);
2557+
while ((hentry= (StreamXidHash*)hash_seq_search(&status))!=NULL)
2558+
{
2559+
FileSetDeleteAll(hentry->stream_fileset);
2560+
2561+
/* Delete the subxact fileset iff it is created. */
2562+
if (hentry->subxact_fileset)
2563+
FileSetDeleteAll(hentry->subxact_fileset);
2564+
}
2565+
}
2566+
}
2567+
25442568
/*
25452569
* Apply main loop.
25462570
*/
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
30243048
if (ent->subxact_fileset)
30253049
{
30263050
cleanup_subxact_info();
3027-
SharedFileSetDeleteAll(ent->subxact_fileset);
3051+
FileSetDeleteAll(ent->subxact_fileset);
30283052
pfree(ent->subxact_fileset);
30293053
ent->subxact_fileset=NULL;
30303054
}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
30423066
MemoryContextoldctx;
30433067

30443068
/*
3045-
* We need to maintainsharedfileset across multiple stream
3046-
*start/stopcalls. So, need to allocate it in a persistent context.
3069+
* We need to maintain fileset across multiple stream start/stop
3070+
* calls. So, need to allocate it in a persistent context.
30473071
*/
30483072
oldctx=MemoryContextSwitchTo(ApplyContext);
3049-
ent->subxact_fileset=palloc(sizeof(SharedFileSet));
3050-
SharedFileSetInit(ent->subxact_fileset,NULL);
3073+
ent->subxact_fileset=palloc(sizeof(FileSet));
3074+
FileSetInit(ent->subxact_fileset);
30513075
MemoryContextSwitchTo(oldctx);
30523076

3053-
fd=BufFileCreateShared(ent->subxact_fileset,path);
3077+
fd=BufFileCreateFileSet(ent->subxact_fileset,path);
30543078
}
30553079
else
3056-
fd=BufFileOpenShared(ent->subxact_fileset,path,O_RDWR);
3080+
fd=BufFileOpenFileSet(ent->subxact_fileset,path,O_RDWR);
30573081

30583082
len=sizeof(SubXactInfo)*subxact_data.nsubxacts;
30593083

@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
31073131

31083132
subxact_filename(path,subid,xid);
31093133

3110-
fd=BufFileOpenShared(ent->subxact_fileset,path,O_RDONLY);
3134+
fd=BufFileOpenFileSet(ent->subxact_fileset,path,O_RDONLY);
31113135

31123136
/* read number of subxact items */
31133137
if (BufFileRead(fd,&subxact_data.nsubxacts,
@@ -3264,15 +3288,15 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32643288

32653289
/* Delete the change file and release the stream fileset memory */
32663290
changes_filename(path,subid,xid);
3267-
SharedFileSetDeleteAll(ent->stream_fileset);
3291+
FileSetDeleteAll(ent->stream_fileset);
32683292
pfree(ent->stream_fileset);
32693293
ent->stream_fileset=NULL;
32703294

32713295
/* Delete the subxact file and release the memory, if it exist */
32723296
if (ent->subxact_fileset)
32733297
{
32743298
subxact_filename(path,subid,xid);
3275-
SharedFileSetDeleteAll(ent->subxact_fileset);
3299+
FileSetDeleteAll(ent->subxact_fileset);
32763300
pfree(ent->subxact_fileset);
32773301
ent->subxact_fileset=NULL;
32783302
}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
32883312
*
32893313
* Open a file for streamed changes from a toplevel transaction identified
32903314
* by stream_xid (global variable). If it's the first chunk of streamed
3291-
* changes for this transaction, initialize thesharedfileset and create the
3292-
*buffile,otherwise open the previously created file.
3315+
* changes for this transaction, initialize the fileset and create the buffile,
3316+
* otherwise open the previously created file.
32933317
*
32943318
* This can only be called at the beginning of a "streaming" block, i.e.
32953319
* between stream_start/stream_stop messages from the upstream.
@@ -3330,24 +3354,24 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33303354
if (first_segment)
33313355
{
33323356
MemoryContextsavectx;
3333-
SharedFileSet*fileset;
3357+
FileSet*fileset;
33343358

33353359
if (found)
33363360
ereport(ERROR,
33373361
(errcode(ERRCODE_PROTOCOL_VIOLATION),
33383362
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
33393363

33403364
/*
3341-
* We need to maintainsharedfileset across multiple stream
3342-
*start/stopcalls. So, need to allocate it in a persistent context.
3365+
* We need to maintain fileset across multiple stream start/stop
3366+
* calls. So, need to allocate it in a persistent context.
33433367
*/
33443368
savectx=MemoryContextSwitchTo(ApplyContext);
3345-
fileset=palloc(sizeof(SharedFileSet));
3369+
fileset=palloc(sizeof(FileSet));
33463370

3347-
SharedFileSetInit(fileset,NULL);
3371+
FileSetInit(fileset);
33483372
MemoryContextSwitchTo(savectx);
33493373

3350-
stream_fd=BufFileCreateShared(fileset,path);
3374+
stream_fd=BufFileCreateFileSet(fileset,path);
33513375

33523376
/* Remember the fileset for the next stream of the same transaction */
33533377
ent->xid=xid;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
33653389
* Open the file and seek to the end of the file because we always
33663390
* append the changes file.
33673391
*/
3368-
stream_fd=BufFileOpenShared(ent->stream_fileset,path,O_RDWR);
3392+
stream_fd=BufFileOpenFileSet(ent->stream_fileset,path,O_RDWR);
33693393
BufFileSeek(stream_fd,0,0,SEEK_END);
33703394
}
33713395

‎src/backend/storage/file/Makefile‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ OBJS = \
1616
buffile.o\
1717
copydir.o\
1818
fd.o\
19+
fileset.o\
1920
reinit.o\
2021
sharedfileset.o
2122

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp