Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1ad47e8

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds acertain threshold, we spill those to disk.  And this happens for each(sub)transaction.  Now, while reading all these files, we don't close themuntil we read all the files.  While reading these files, if the number ofsuch files exceeds the maximum number of file descriptors, the operationerrors out.Use PathNameOpenFile interface to open these files as that internally hasthe mechanism to release kernel FDs as needed to get us under themax_safe_fds limit.Reported-by: Amit KhandekarAuthor: Amit KhandekarReviewed-by: Amit KapilaBackpatch-through: 9.4Discussion:https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parentce758a3 commit1ad47e8

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ typedef struct ReorderBufferIterTXNEntry
101101
XLogRecPtrlsn;
102102
ReorderBufferChange*change;
103103
ReorderBufferTXN*txn;
104-
intfd;
104+
Filefd;
105105
XLogSegNosegno;
106106
}ReorderBufferIterTXNEntry;
107107

@@ -182,7 +182,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
182182
* subtransactions
183183
* ---------------------------------------
184184
*/
185-
staticReorderBufferIterTXNState*ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn);
185+
staticvoidReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
186+
ReorderBufferIterTXNState*volatile*iter_state);
186187
staticReorderBufferChange*
187188
ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
188189
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
@@ -199,7 +200,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
199200
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
200201
intfd,ReorderBufferChange*change);
201202
staticSizeReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
202-
int*fd,XLogSegNo*segno);
203+
File*fd,XLogSegNo*segno);
203204
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
204205
char*change);
205206
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
@@ -942,15 +943,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
942943
/*
943944
* Allocate & initialize an iterator which iterates in lsn order over a
944945
* transaction and all its subtransactions.
946+
*
947+
* Note: The iterator state is returned through iter_state parameter rather
948+
* than the function's return value. This is because the state gets cleaned up
949+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
950+
* back the state even if this function throws an exception.
945951
*/
946-
staticReorderBufferIterTXNState*
947-
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn)
952+
staticvoid
953+
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
954+
ReorderBufferIterTXNState*volatile*iter_state)
948955
{
949956
Sizenr_txns=0;
950957
ReorderBufferIterTXNState*state;
951958
dlist_itercur_txn_i;
952959
int32off;
953960

961+
*iter_state=NULL;
962+
954963
/*
955964
* Calculate the size of our heap: one element for every transaction that
956965
* contains changes. (Besides the transactions already in the reorder
@@ -994,6 +1003,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9941003
ReorderBufferIterCompare,
9951004
state);
9961005

1006+
/* Now that the state fields are initialized, it is safe to return it. */
1007+
*iter_state=state;
1008+
9971009
/*
9981010
* Now insert items into the binary heap, in an unordered fashion. (We
9991011
* will run a heap assembly step at the end; this is more efficient.)
@@ -1056,8 +1068,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10561068

10571069
/* assemble a valid binary heap */
10581070
binaryheap_build(state->heap);
1059-
1060-
returnstate;
10611071
}
10621072

10631073
/*
@@ -1161,7 +1171,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11611171
for (off=0;off<state->nr_txns;off++)
11621172
{
11631173
if (state->entries[off].fd!=-1)
1164-
CloseTransientFile(state->entries[off].fd);
1174+
FileClose(state->entries[off].fd);
11651175
}
11661176

11671177
/* free memory we might have "leaked" in the last *Next call */
@@ -1496,7 +1506,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
14961506

14971507
rb->begin(rb,txn);
14981508

1499-
iterstate=ReorderBufferIterTXNInit(rb,txn);
1509+
ReorderBufferIterTXNInit(rb,txn,&iterstate);
15001510
while ((change=ReorderBufferIterTXNNext(rb,iterstate))!=NULL)
15011511
{
15021512
Relationrelation=NULL;
@@ -2338,7 +2348,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
23382348
*/
23392349
staticSize
23402350
ReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
2341-
int*fd,XLogSegNo*segno)
2351+
File*fd,XLogSegNo*segno)
23422352
{
23432353
Sizerestored=0;
23442354
XLogSegNolast_segno;
@@ -2383,7 +2393,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23832393
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
23842394
*segno);
23852395

2386-
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY,0);
2396+
*fd=PathNameOpenFile(path,O_RDONLY |PG_BINARY,0);
23872397
if (*fd<0&&errno==ENOENT)
23882398
{
23892399
*fd=-1;
@@ -2404,12 +2414,12 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
24042414
* end of this file.
24052415
*/
24062416
ReorderBufferSerializeReserve(rb,sizeof(ReorderBufferDiskChange));
2407-
readBytes=read(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
2417+
readBytes=FileRead(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
24082418

24092419
/* eof */
24102420
if (readBytes==0)
24112421
{
2412-
CloseTransientFile(*fd);
2422+
FileClose(*fd);
24132423
*fd=-1;
24142424
(*segno)++;
24152425
continue;
@@ -2431,8 +2441,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
24312441
sizeof(ReorderBufferDiskChange)+ondisk->size);
24322442
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
24332443

2434-
readBytes=read(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2435-
ondisk->size-sizeof(ReorderBufferDiskChange));
2444+
readBytes=FileRead(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2445+
ondisk->size-sizeof(ReorderBufferDiskChange));
24362446

24372447
if (readBytes<0)
24382448
ereport(ERROR,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp