Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3e3a797

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds acertain threshold, we spill those to disk.  And this happens for each(sub)transaction.  Now, while reading all these files, we don't close themuntil we read all the files.  While reading these files, if the number ofsuch files exceeds the maximum number of file descriptors, the operationerrors out.Use PathNameOpenFile interface to open these files as that internally hasthe mechanism to release kernel FDs as needed to get us under themax_safe_fds limit.Reported-by: Amit KhandekarAuthor: Amit KhandekarReviewed-by: Amit KapilaBackpatch-through: 9.4Discussion:https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parent9776feb commit3e3a797

File tree

2 files changed

+66
-20
lines changed

2 files changed

+66
-20
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ typedef struct ReorderBufferIterTXNEntry
109109
XLogRecPtrlsn;
110110
ReorderBufferChange*change;
111111
ReorderBufferTXN*txn;
112-
intfd;
112+
Filefd;
113113
XLogSegNosegno;
114114
}ReorderBufferIterTXNEntry;
115115

@@ -178,7 +178,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178178
* subtransactions
179179
* ---------------------------------------
180180
*/
181-
staticReorderBufferIterTXNState*ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn);
181+
staticvoidReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
182+
ReorderBufferIterTXNState*volatile*iter_state);
182183
staticReorderBufferChange*ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
183184
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
184185
ReorderBufferIterTXNState*state);
@@ -194,7 +195,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194195
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
195196
intfd,ReorderBufferChange*change);
196197
staticSizeReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
197-
int*fd,XLogSegNo*segno);
198+
File*fd,XLogSegNo*segno);
198199
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
199200
char*change);
200201
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
@@ -945,15 +946,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945946
/*
946947
* Allocate & initialize an iterator which iterates in lsn order over a
947948
* transaction and all its subtransactions.
949+
*
950+
* Note: The iterator state is returned through iter_state parameter rather
951+
* than the function's return value. This is because the state gets cleaned up
952+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
953+
* back the state even if this function throws an exception.
948954
*/
949-
staticReorderBufferIterTXNState*
950-
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn)
955+
staticvoid
956+
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
957+
ReorderBufferIterTXNState*volatile*iter_state)
951958
{
952959
Sizenr_txns=0;
953960
ReorderBufferIterTXNState*state;
954961
dlist_itercur_txn_i;
955962
int32off;
956963

964+
*iter_state=NULL;
965+
957966
/*
958967
* Calculate the size of our heap: one element for every transaction that
959968
* contains changes. (Besides the transactions already in the reorder
@@ -997,6 +1006,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9971006
ReorderBufferIterCompare,
9981007
state);
9991008

1009+
/* Now that the state fields are initialized, it is safe to return it. */
1010+
*iter_state=state;
1011+
10001012
/*
10011013
* Now insert items into the binary heap, in an unordered fashion. (We
10021014
* will run a heap assembly step at the end; this is more efficient.)
@@ -1059,8 +1071,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10591071

10601072
/* assemble a valid binary heap */
10611073
binaryheap_build(state->heap);
1062-
1063-
returnstate;
10641074
}
10651075

10661076
/*
@@ -1164,7 +1174,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11641174
for (off=0;off<state->nr_txns;off++)
11651175
{
11661176
if (state->entries[off].fd!=-1)
1167-
CloseTransientFile(state->entries[off].fd);
1177+
FileClose(state->entries[off].fd);
11681178
}
11691179

11701180
/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1510,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15001510

15011511
rb->begin(rb,txn);
15021512

1503-
iterstate=ReorderBufferIterTXNInit(rb,txn);
1513+
ReorderBufferIterTXNInit(rb,txn,&iterstate);
15041514
while ((change=ReorderBufferIterTXNNext(rb,iterstate))!=NULL)
15051515
{
15061516
Relationrelation=NULL;
@@ -2517,7 +2527,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
25172527
*/
25182528
staticSize
25192529
ReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
2520-
int*fd,XLogSegNo*segno)
2530+
File*fd,XLogSegNo*segno)
25212531
{
25222532
Sizerestored=0;
25232533
XLogSegNolast_segno;
@@ -2562,7 +2572,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25622572
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
25632573
*segno);
25642574

2565-
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY);
2575+
*fd=PathNameOpenFile(path,O_RDONLY |PG_BINARY);
25662576
if (*fd<0&&errno==ENOENT)
25672577
{
25682578
*fd=-1;
@@ -2582,14 +2592,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25822592
* end of this file.
25832593
*/
25842594
ReorderBufferSerializeReserve(rb,sizeof(ReorderBufferDiskChange));
2585-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2586-
readBytes=read(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
2587-
pgstat_report_wait_end();
2595+
readBytes=FileRead(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange),
2596+
WAIT_EVENT_REORDER_BUFFER_READ);
25882597

25892598
/* eof */
25902599
if (readBytes==0)
25912600
{
2592-
CloseTransientFile(*fd);
2601+
FileClose(*fd);
25932602
*fd=-1;
25942603
(*segno)++;
25952604
continue;
@@ -2611,10 +2620,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26112620
sizeof(ReorderBufferDiskChange)+ondisk->size);
26122621
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
26132622

2614-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2615-
readBytes=read(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2616-
ondisk->size-sizeof(ReorderBufferDiskChange));
2617-
pgstat_report_wait_end();
2623+
readBytes=FileRead(*fd,
2624+
rb->outbuf+sizeof(ReorderBufferDiskChange),
2625+
ondisk->size-sizeof(ReorderBufferDiskChange),
2626+
WAIT_EVENT_REORDER_BUFFER_READ);
26182627

26192628
if (readBytes<0)
26202629
ereport(ERROR,

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::Moretests=>10;
10+
use Test::Moretests=>11;
1111
use Config;
1212

1313
# Initialize master node
@@ -135,5 +135,42 @@
135135
is($node_master->slot('otherdb_slot')->{'slot_name'},
136136
undef,'logical slot was actually dropped with DB');
137137

138+
# Test to ensure that we don't run out of file descriptors even if there
139+
# are more spill files than maxAllocatedDescs.
140+
141+
# Set max_files_per_process to a small value to make it more likely to run out
142+
# of max open file descriptors.
143+
$node_master->safe_psql('postgres',
144+
'ALTER SYSTEM SET max_files_per_process = 26;');
145+
$node_master->restart;
146+
147+
$node_master->safe_psql(
148+
'postgres',q{
149+
do $$
150+
BEGIN
151+
FOR i IN 1..10 LOOP
152+
BEGIN
153+
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
154+
EXCEPTION
155+
when division_by_zero then perform 'dummy';
156+
END;
157+
END LOOP;
158+
END $$;
159+
});
160+
161+
$result =$node_master->safe_psql('postgres',
162+
qq[
163+
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
164+
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
165+
]);
166+
167+
$expected =q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
168+
is($result,$expected,'got expected output from spilling subxacts session');
169+
170+
# Reset back max_files_per_process
171+
$node_master->safe_psql('postgres',
172+
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
173+
$node_master->restart;
174+
138175
# done with the node
139176
$node_master->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp