Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit27b5f48

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds acertain threshold, we spill those to disk.  And this happens for each(sub)transaction.  Now, while reading all these files, we don't close themuntil we read all the files.  While reading these files, if the number ofsuch files exceeds the maximum number of file descriptors, the operationerrors out.Use PathNameOpenFile interface to open these files as that internally hasthe mechanism to release kernel FDs as needed to get us under themax_safe_fds limit.Reported-by: Amit KhandekarAuthor: Amit KhandekarReviewed-by: Amit KapilaBackpatch-through: 9.4Discussion:https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parentf512479 commit27b5f48

File tree

2 files changed

+66
-20
lines changed

2 files changed

+66
-20
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ typedef struct ReorderBufferIterTXNEntry
103103
XLogRecPtrlsn;
104104
ReorderBufferChange*change;
105105
ReorderBufferTXN*txn;
106-
intfd;
106+
Filefd;
107107
XLogSegNosegno;
108108
}ReorderBufferIterTXNEntry;
109109

@@ -181,7 +181,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
181181
* subtransactions
182182
* ---------------------------------------
183183
*/
184-
staticReorderBufferIterTXNState*ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn);
184+
staticvoidReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
185+
ReorderBufferIterTXNState*volatile*iter_state);
185186
staticReorderBufferChange*ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
186187
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
187188
ReorderBufferIterTXNState*state);
@@ -197,7 +198,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
197198
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
198199
intfd,ReorderBufferChange*change);
199200
staticSizeReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
200-
int*fd,XLogSegNo*segno);
201+
File*fd,XLogSegNo*segno);
201202
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
202203
char*change);
203204
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
@@ -953,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
953954
/*
954955
* Allocate & initialize an iterator which iterates in lsn order over a
955956
* transaction and all its subtransactions.
957+
*
958+
* Note: The iterator state is returned through iter_state parameter rather
959+
* than the function's return value. This is because the state gets cleaned up
960+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
961+
* back the state even if this function throws an exception.
956962
*/
957-
staticReorderBufferIterTXNState*
958-
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn)
963+
staticvoid
964+
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
965+
ReorderBufferIterTXNState*volatile*iter_state)
959966
{
960967
Sizenr_txns=0;
961968
ReorderBufferIterTXNState*state;
962969
dlist_itercur_txn_i;
963970
int32off;
964971

972+
*iter_state=NULL;
973+
965974
/*
966975
* Calculate the size of our heap: one element for every transaction that
967976
* contains changes. (Besides the transactions already in the reorder
@@ -1005,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10051014
ReorderBufferIterCompare,
10061015
state);
10071016

1017+
/* Now that the state fields are initialized, it is safe to return it. */
1018+
*iter_state=state;
1019+
10081020
/*
10091021
* Now insert items into the binary heap, in an unordered fashion. (We
10101022
* will run a heap assembly step at the end; this is more efficient.)
@@ -1067,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10671079

10681080
/* assemble a valid binary heap */
10691081
binaryheap_build(state->heap);
1070-
1071-
returnstate;
10721082
}
10731083

10741084
/*
@@ -1172,7 +1182,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11721182
for (off=0;off<state->nr_txns;off++)
11731183
{
11741184
if (state->entries[off].fd!=-1)
1175-
CloseTransientFile(state->entries[off].fd);
1185+
FileClose(state->entries[off].fd);
11761186
}
11771187

11781188
/* free memory we might have "leaked" in the last *Next call */
@@ -1508,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15081518

15091519
rb->begin(rb,txn);
15101520

1511-
iterstate=ReorderBufferIterTXNInit(rb,txn);
1521+
ReorderBufferIterTXNInit(rb,txn,&iterstate);
15121522
while ((change=ReorderBufferIterTXNNext(rb,iterstate))!=NULL)
15131523
{
15141524
Relationrelation=NULL;
@@ -2465,7 +2475,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
24652475
*/
24662476
staticSize
24672477
ReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
2468-
int*fd,XLogSegNo*segno)
2478+
File*fd,XLogSegNo*segno)
24692479
{
24702480
Sizerestored=0;
24712481
XLogSegNolast_segno;
@@ -2510,7 +2520,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25102520
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
25112521
*segno);
25122522

2513-
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY,0);
2523+
*fd=PathNameOpenFile(path,O_RDONLY |PG_BINARY,0);
25142524
if (*fd<0&&errno==ENOENT)
25152525
{
25162526
*fd=-1;
@@ -2531,14 +2541,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25312541
* end of this file.
25322542
*/
25332543
ReorderBufferSerializeReserve(rb,sizeof(ReorderBufferDiskChange));
2534-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2535-
readBytes=read(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
2536-
pgstat_report_wait_end();
2544+
readBytes=FileRead(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange),
2545+
WAIT_EVENT_REORDER_BUFFER_READ);
25372546

25382547
/* eof */
25392548
if (readBytes==0)
25402549
{
2541-
CloseTransientFile(*fd);
2550+
FileClose(*fd);
25422551
*fd=-1;
25432552
(*segno)++;
25442553
continue;
@@ -2560,10 +2569,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25602569
sizeof(ReorderBufferDiskChange)+ondisk->size);
25612570
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
25622571

2563-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2564-
readBytes=read(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2565-
ondisk->size-sizeof(ReorderBufferDiskChange));
2566-
pgstat_report_wait_end();
2572+
readBytes=FileRead(*fd,
2573+
rb->outbuf+sizeof(ReorderBufferDiskChange),
2574+
ondisk->size-sizeof(ReorderBufferDiskChange),
2575+
WAIT_EVENT_REORDER_BUFFER_READ);
25672576

25682577
if (readBytes<0)
25692578
ereport(ERROR,

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::Moretests=>10;
10+
use Test::Moretests=>11;
1111
use Config;
1212

1313
# Initialize master node
@@ -133,5 +133,42 @@
133133
is($node_master->slot('otherdb_slot')->{'slot_name'},
134134
undef,'logical slot was actually dropped with DB');
135135

136+
# Test to ensure that we don't run out of file descriptors even if there
137+
# are more spill files than maxAllocatedDescs.
138+
139+
# Set max_files_per_process to a small value to make it more likely to run out
140+
# of max open file descriptors.
141+
$node_master->safe_psql('postgres',
142+
'ALTER SYSTEM SET max_files_per_process = 26;');
143+
$node_master->restart;
144+
145+
$node_master->safe_psql(
146+
'postgres',q{
147+
do $$
148+
BEGIN
149+
FOR i IN 1..10 LOOP
150+
BEGIN
151+
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
152+
EXCEPTION
153+
when division_by_zero then perform 'dummy';
154+
END;
155+
END LOOP;
156+
END $$;
157+
});
158+
159+
$result =$node_master->safe_psql('postgres',
160+
qq[
161+
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
162+
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
163+
]);
164+
165+
$expected =q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
166+
is($result,$expected,'got expected output from spilling subxacts session');
167+
168+
# Reset back max_files_per_process
169+
$node_master->safe_psql('postgres',
170+
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
171+
$node_master->restart;
172+
136173
# done with the node
137174
$node_master->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp