Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf8a6d8e

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds acertain threshold, we spill those to disk.  And this happens for each(sub)transaction.  Now, while reading all these files, we don't close themuntil we read all the files.  While reading these files, if the number ofsuch files exceeds the maximum number of file descriptors, the operationerrors out.Use PathNameOpenFile interface to open these files as that internally hasthe mechanism to release kernel FDs as needed to get us under themax_safe_fds limit.Reported-by: Amit KhandekarAuthor: Amit KhandekarReviewed-by: Amit KapilaBackpatch-through: 9.4Discussion:https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parent8f3e44a commitf8a6d8e

File tree

2 files changed

+90
-25
lines changed

2 files changed

+90
-25
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
103103
CommandIdcombocid;/* just for debugging */
104104
}ReorderBufferTupleCidEnt;
105105

106+
/* Virtual file descriptor with file offset tracking */
107+
typedefstructTXNEntryFile
108+
{
109+
Filevfd;/* -1 when the file is closed */
110+
off_tcurOffset;/* offset for next write or read. Reset to 0
111+
* when vfd is opened. */
112+
}TXNEntryFile;
113+
106114
/* k-way in-order change iteration support structures */
107115
typedefstructReorderBufferIterTXNEntry
108116
{
109117
XLogRecPtrlsn;
110118
ReorderBufferChange*change;
111119
ReorderBufferTXN*txn;
112-
intfd;
120+
TXNEntryFilefile;
113121
XLogSegNosegno;
114122
}ReorderBufferIterTXNEntry;
115123

@@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178186
* subtransactions
179187
* ---------------------------------------
180188
*/
181-
staticReorderBufferIterTXNState*ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn);
189+
staticvoidReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
190+
ReorderBufferIterTXNState*volatile*iter_state);
182191
staticReorderBufferChange*ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
183192
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
184193
ReorderBufferIterTXNState*state);
@@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194203
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
195204
intfd,ReorderBufferChange*change);
196205
staticSizeReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
197-
int*fd,XLogSegNo*segno);
206+
TXNEntryFile*file,XLogSegNo*segno);
198207
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
199208
char*change);
200209
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
@@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945954
/*
946955
* Allocate & initialize an iterator which iterates in lsn order over a
947956
* transaction and all its subtransactions.
957+
*
958+
* Note: The iterator state is returned through iter_state parameter rather
959+
* than the function's return value. This is because the state gets cleaned up
960+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
961+
* back the state even if this function throws an exception.
948962
*/
949-
staticReorderBufferIterTXNState*
950-
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn)
963+
staticvoid
964+
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
965+
ReorderBufferIterTXNState*volatile*iter_state)
951966
{
952967
Sizenr_txns=0;
953968
ReorderBufferIterTXNState*state;
954969
dlist_itercur_txn_i;
955970
int32off;
956971

972+
*iter_state=NULL;
973+
957974
/*
958975
* Calculate the size of our heap: one element for every transaction that
959976
* contains changes. (Besides the transactions already in the reorder
@@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9881005

9891006
for (off=0;off<state->nr_txns;off++)
9901007
{
991-
state->entries[off].fd=-1;
1008+
state->entries[off].file.vfd=-1;
9921009
state->entries[off].segno=0;
9931010
}
9941011

@@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9971014
ReorderBufferIterCompare,
9981015
state);
9991016

1017+
/* Now that the state fields are initialized, it is safe to return it. */
1018+
*iter_state=state;
1019+
10001020
/*
10011021
* Now insert items into the binary heap, in an unordered fashion. (We
10021022
* will run a heap assembly step at the end; this is more efficient.)
@@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10131033
{
10141034
/* serialize remaining changes */
10151035
ReorderBufferSerializeTXN(rb,txn);
1016-
ReorderBufferRestoreChanges(rb,txn,&state->entries[off].fd,
1036+
ReorderBufferRestoreChanges(rb,txn,&state->entries[off].file,
10171037
&state->entries[off].segno);
10181038
}
10191039

@@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10431063
/* serialize remaining changes */
10441064
ReorderBufferSerializeTXN(rb,cur_txn);
10451065
ReorderBufferRestoreChanges(rb,cur_txn,
1046-
&state->entries[off].fd,
1066+
&state->entries[off].file,
10471067
&state->entries[off].segno);
10481068
}
10491069
cur_change=dlist_head_element(ReorderBufferChange,node,
@@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10591079

10601080
/* assemble a valid binary heap */
10611081
binaryheap_build(state->heap);
1062-
1063-
returnstate;
10641082
}
10651083

10661084
/*
@@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
11241142
dlist_delete(&change->node);
11251143
dlist_push_tail(&state->old_change,&change->node);
11261144

1127-
if (ReorderBufferRestoreChanges(rb,entry->txn,&entry->fd,
1145+
if (ReorderBufferRestoreChanges(rb,entry->txn,&entry->file,
11281146
&state->entries[off].segno))
11291147
{
11301148
/* successfully restored changes from disk */
@@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11631181

11641182
for (off=0;off<state->nr_txns;off++)
11651183
{
1166-
if (state->entries[off].fd!=-1)
1167-
CloseTransientFile(state->entries[off].fd);
1184+
if (state->entries[off].file.vfd!=-1)
1185+
FileClose(state->entries[off].file.vfd);
11681186
}
11691187

11701188
/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15001518

15011519
rb->begin(rb,txn);
15021520

1503-
iterstate=ReorderBufferIterTXNInit(rb,txn);
1521+
ReorderBufferIterTXNInit(rb,txn,&iterstate);
15041522
while ((change=ReorderBufferIterTXNNext(rb,iterstate))!=NULL)
15051523
{
15061524
Relationrelation=NULL;
@@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
25172535
*/
25182536
staticSize
25192537
ReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
2520-
int*fd,XLogSegNo*segno)
2538+
TXNEntryFile*file,XLogSegNo*segno)
25212539
{
25222540
Sizerestored=0;
25232541
XLogSegNolast_segno;
25242542
dlist_mutable_itercleanup_iter;
2543+
File*fd=&file->vfd;
25252544

25262545
Assert(txn->first_lsn!=InvalidXLogRecPtr);
25272546
Assert(txn->final_lsn!=InvalidXLogRecPtr);
@@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25622581
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
25632582
*segno);
25642583

2565-
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY);
2584+
*fd=PathNameOpenFile(path,O_RDONLY |PG_BINARY);
2585+
2586+
/* No harm in resetting the offset even in case of failure */
2587+
file->curOffset=0;
2588+
25662589
if (*fd<0&&errno==ENOENT)
25672590
{
25682591
*fd=-1;
@@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25822605
* end of this file.
25832606
*/
25842607
ReorderBufferSerializeReserve(rb,sizeof(ReorderBufferDiskChange));
2585-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2586-
readBytes=read(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
2587-
pgstat_report_wait_end();
2608+
readBytes=FileRead(file->vfd,rb->outbuf,
2609+
sizeof(ReorderBufferDiskChange),
2610+
file->curOffset,WAIT_EVENT_REORDER_BUFFER_READ);
25882611

25892612
/* eof */
25902613
if (readBytes==0)
25912614
{
2592-
CloseTransientFile(*fd);
2615+
FileClose(*fd);
25932616
*fd=-1;
25942617
(*segno)++;
25952618
continue;
@@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26052628
readBytes,
26062629
(uint32)sizeof(ReorderBufferDiskChange))));
26072630

2631+
file->curOffset+=readBytes;
2632+
26082633
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
26092634

26102635
ReorderBufferSerializeReserve(rb,
26112636
sizeof(ReorderBufferDiskChange)+ondisk->size);
26122637
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
26132638

2614-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2615-
readBytes=read(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2616-
ondisk->size-sizeof(ReorderBufferDiskChange));
2617-
pgstat_report_wait_end();
2639+
readBytes=FileRead(file->vfd,
2640+
rb->outbuf+sizeof(ReorderBufferDiskChange),
2641+
ondisk->size-sizeof(ReorderBufferDiskChange),
2642+
file->curOffset,
2643+
WAIT_EVENT_REORDER_BUFFER_READ);
26182644

26192645
if (readBytes<0)
26202646
ereport(ERROR,
@@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26272653
readBytes,
26282654
(uint32) (ondisk->size-sizeof(ReorderBufferDiskChange)))));
26292655

2656+
file->curOffset+=readBytes;
2657+
26302658
/*
26312659
* ok, read a full change from disk, now restore it into proper
26322660
* in-memory format

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::Moretests=>10;
10+
use Test::Moretests=>11;
1111
use Config;
1212

1313
# Initialize master node
@@ -135,5 +135,42 @@
135135
is($node_master->slot('otherdb_slot')->{'slot_name'},
136136
undef,'logical slot was actually dropped with DB');
137137

138+
# Test to ensure that we don't run out of file descriptors even if there
139+
# are more spill files than maxAllocatedDescs.
140+
141+
# Set max_files_per_process to a small value to make it more likely to run out
142+
# of max open file descriptors.
143+
$node_master->safe_psql('postgres',
144+
'ALTER SYSTEM SET max_files_per_process = 26;');
145+
$node_master->restart;
146+
147+
$node_master->safe_psql(
148+
'postgres',q{
149+
do $$
150+
BEGIN
151+
FOR i IN 1..10 LOOP
152+
BEGIN
153+
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
154+
EXCEPTION
155+
when division_by_zero then perform 'dummy';
156+
END;
157+
END LOOP;
158+
END $$;
159+
});
160+
161+
$result =$node_master->safe_psql('postgres',
162+
qq[
163+
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
164+
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
165+
]);
166+
167+
$expected =q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
168+
is($result,$expected,'got expected output from spilling subxacts session');
169+
170+
# Reset back max_files_per_process
171+
$node_master->safe_psql('postgres',
172+
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
173+
$node_master->restart;
174+
138175
# done with the node
139176
$node_master->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp