Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd207038

Browse files
author
Amit Kapila
committed
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds acertain threshold, we spill those to disk.  And this happens for each(sub)transaction.  Now, while reading all these files, we don't close themuntil we read all the files.  While reading these files, if the number ofsuch files exceeds the maximum number of file descriptors, the operationerrors out.Use PathNameOpenFile interface to open these files as that internally hasthe mechanism to release kernel FDs as needed to get us under themax_safe_fds limit.Reported-by: Amit KhandekarAuthor: Amit KhandekarReviewed-by: Amit KapilaBackpatch-through: 9.4Discussion:https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
1 parent4b25f5d commitd207038

File tree

2 files changed

+91
-25
lines changed

2 files changed

+91
-25
lines changed

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,21 @@ typedef struct ReorderBufferTupleCidEnt
131131
CommandIdcombocid;/* just for debugging */
132132
}ReorderBufferTupleCidEnt;
133133

134+
/* Virtual file descriptor with file offset tracking */
135+
typedefstructTXNEntryFile
136+
{
137+
Filevfd;/* -1 when the file is closed */
138+
off_tcurOffset;/* offset for next write or read. Reset to 0
139+
* when vfd is opened. */
140+
}TXNEntryFile;
141+
134142
/* k-way in-order change iteration support structures */
135143
typedefstructReorderBufferIterTXNEntry
136144
{
137145
XLogRecPtrlsn;
138146
ReorderBufferChange*change;
139147
ReorderBufferTXN*txn;
140-
intfd;
148+
TXNEntryFilefile;
141149
XLogSegNosegno;
142150
}ReorderBufferIterTXNEntry;
143151

@@ -207,7 +215,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
207215
* subtransactions
208216
* ---------------------------------------
209217
*/
210-
staticReorderBufferIterTXNState*ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn);
218+
staticvoidReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
219+
ReorderBufferIterTXNState*volatile*iter_state);
211220
staticReorderBufferChange*ReorderBufferIterTXNNext(ReorderBuffer*rb,ReorderBufferIterTXNState*state);
212221
staticvoidReorderBufferIterTXNFinish(ReorderBuffer*rb,
213222
ReorderBufferIterTXNState*state);
@@ -223,7 +232,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
223232
staticvoidReorderBufferSerializeChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
224233
intfd,ReorderBufferChange*change);
225234
staticSizeReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
226-
int*fd,XLogSegNo*segno);
235+
TXNEntryFile*file,XLogSegNo*segno);
227236
staticvoidReorderBufferRestoreChange(ReorderBuffer*rb,ReorderBufferTXN*txn,
228237
char*change);
229238
staticvoidReorderBufferRestoreCleanup(ReorderBuffer*rb,ReorderBufferTXN*txn);
@@ -996,15 +1005,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
9961005
/*
9971006
* Allocate & initialize an iterator which iterates in lsn order over a
9981007
* transaction and all its subtransactions.
1008+
*
1009+
* Note: The iterator state is returned through iter_state parameter rather
1010+
* than the function's return value. This is because the state gets cleaned up
1011+
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
1012+
* back the state even if this function throws an exception.
9991013
*/
1000-
staticReorderBufferIterTXNState*
1001-
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn)
1014+
staticvoid
1015+
ReorderBufferIterTXNInit(ReorderBuffer*rb,ReorderBufferTXN*txn,
1016+
ReorderBufferIterTXNState*volatile*iter_state)
10021017
{
10031018
Sizenr_txns=0;
10041019
ReorderBufferIterTXNState*state;
10051020
dlist_itercur_txn_i;
10061021
int32off;
10071022

1023+
*iter_state=NULL;
1024+
10081025
/*
10091026
* Calculate the size of our heap: one element for every transaction that
10101027
* contains changes. (Besides the transactions already in the reorder
@@ -1039,7 +1056,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10391056

10401057
for (off=0;off<state->nr_txns;off++)
10411058
{
1042-
state->entries[off].fd=-1;
1059+
state->entries[off].file.vfd=-1;
10431060
state->entries[off].segno=0;
10441061
}
10451062

@@ -1048,6 +1065,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10481065
ReorderBufferIterCompare,
10491066
state);
10501067

1068+
/* Now that the state fields are initialized, it is safe to return it. */
1069+
*iter_state=state;
1070+
10511071
/*
10521072
* Now insert items into the binary heap, in an unordered fashion. (We
10531073
* will run a heap assembly step at the end; this is more efficient.)
@@ -1064,7 +1084,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10641084
{
10651085
/* serialize remaining changes */
10661086
ReorderBufferSerializeTXN(rb,txn);
1067-
ReorderBufferRestoreChanges(rb,txn,&state->entries[off].fd,
1087+
ReorderBufferRestoreChanges(rb,txn,&state->entries[off].file,
10681088
&state->entries[off].segno);
10691089
}
10701090

@@ -1094,7 +1114,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10941114
/* serialize remaining changes */
10951115
ReorderBufferSerializeTXN(rb,cur_txn);
10961116
ReorderBufferRestoreChanges(rb,cur_txn,
1097-
&state->entries[off].fd,
1117+
&state->entries[off].file,
10981118
&state->entries[off].segno);
10991119
}
11001120
cur_change=dlist_head_element(ReorderBufferChange,node,
@@ -1110,8 +1130,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
11101130

11111131
/* assemble a valid binary heap */
11121132
binaryheap_build(state->heap);
1113-
1114-
returnstate;
11151133
}
11161134

11171135
/*
@@ -1175,7 +1193,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
11751193
dlist_delete(&change->node);
11761194
dlist_push_tail(&state->old_change,&change->node);
11771195

1178-
if (ReorderBufferRestoreChanges(rb,entry->txn,&entry->fd,
1196+
if (ReorderBufferRestoreChanges(rb,entry->txn,&entry->file,
11791197
&state->entries[off].segno))
11801198
{
11811199
/* successfully restored changes from disk */
@@ -1214,8 +1232,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
12141232

12151233
for (off=0;off<state->nr_txns;off++)
12161234
{
1217-
if (state->entries[off].fd!=-1)
1218-
CloseTransientFile(state->entries[off].fd);
1235+
if (state->entries[off].file.vfd!=-1)
1236+
FileClose(state->entries[off].file.vfd);
12191237
}
12201238

12211239
/* free memory we might have "leaked" in the last *Next call */
@@ -1558,7 +1576,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15581576

15591577
rb->begin(rb,txn);
15601578

1561-
iterstate=ReorderBufferIterTXNInit(rb,txn);
1579+
ReorderBufferIterTXNInit(rb,txn,&iterstate);
15621580
while ((change=ReorderBufferIterTXNNext(rb,iterstate))!=NULL)
15631581
{
15641582
Relationrelation=NULL;
@@ -2765,11 +2783,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
27652783
*/
27662784
staticSize
27672785
ReorderBufferRestoreChanges(ReorderBuffer*rb,ReorderBufferTXN*txn,
2768-
int*fd,XLogSegNo*segno)
2786+
TXNEntryFile*file,XLogSegNo*segno)
27692787
{
27702788
Sizerestored=0;
27712789
XLogSegNolast_segno;
27722790
dlist_mutable_itercleanup_iter;
2791+
File*fd=&file->vfd;
27732792

27742793
Assert(txn->first_lsn!=InvalidXLogRecPtr);
27752794
Assert(txn->final_lsn!=InvalidXLogRecPtr);
@@ -2810,7 +2829,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
28102829
ReorderBufferSerializedPath(path,MyReplicationSlot,txn->xid,
28112830
*segno);
28122831

2813-
*fd=OpenTransientFile(path,O_RDONLY |PG_BINARY);
2832+
*fd=PathNameOpenFile(path,O_RDONLY |PG_BINARY);
2833+
2834+
/* No harm in resetting the offset even in case of failure */
2835+
file->curOffset=0;
2836+
28142837
if (*fd<0&&errno==ENOENT)
28152838
{
28162839
*fd=-1;
@@ -2830,14 +2853,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
28302853
* end of this file.
28312854
*/
28322855
ReorderBufferSerializeReserve(rb,sizeof(ReorderBufferDiskChange));
2833-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2834-
readBytes=read(*fd,rb->outbuf,sizeof(ReorderBufferDiskChange));
2835-
pgstat_report_wait_end();
2856+
readBytes=FileRead(file->vfd,rb->outbuf,
2857+
sizeof(ReorderBufferDiskChange),
2858+
file->curOffset,WAIT_EVENT_REORDER_BUFFER_READ);
28362859

28372860
/* eof */
28382861
if (readBytes==0)
28392862
{
2840-
CloseTransientFile(*fd);
2863+
FileClose(*fd);
28412864
*fd=-1;
28422865
(*segno)++;
28432866
continue;
@@ -2853,16 +2876,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
28532876
readBytes,
28542877
(uint32)sizeof(ReorderBufferDiskChange))));
28552878

2879+
file->curOffset+=readBytes;
2880+
28562881
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
28572882

28582883
ReorderBufferSerializeReserve(rb,
28592884
sizeof(ReorderBufferDiskChange)+ondisk->size);
28602885
ondisk= (ReorderBufferDiskChange*)rb->outbuf;
28612886

2862-
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2863-
readBytes=read(*fd,rb->outbuf+sizeof(ReorderBufferDiskChange),
2864-
ondisk->size-sizeof(ReorderBufferDiskChange));
2865-
pgstat_report_wait_end();
2887+
readBytes=FileRead(file->vfd,
2888+
rb->outbuf+sizeof(ReorderBufferDiskChange),
2889+
ondisk->size-sizeof(ReorderBufferDiskChange),
2890+
file->curOffset,
2891+
WAIT_EVENT_REORDER_BUFFER_READ);
28662892

28672893
if (readBytes<0)
28682894
ereport(ERROR,
@@ -2875,6 +2901,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
28752901
readBytes,
28762902
(uint32) (ondisk->size-sizeof(ReorderBufferDiskChange)))));
28772903

2904+
file->curOffset+=readBytes;
2905+
28782906
/*
28792907
* ok, read a full change from disk, now restore it into proper
28802908
* in-memory format

‎src/test/recovery/t/006_logical_decoding.pl

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use warnings;
88
use PostgresNode;
99
use TestLib;
10-
use Test::Moretests=>10;
10+
use Test::Moretests=>11;
1111
use Config;
1212

1313
# Initialize master node
@@ -135,5 +135,43 @@
135135
is($node_master->slot('otherdb_slot')->{'slot_name'},
136136
undef,'logical slot was actually dropped with DB');
137137

138+
# Test to ensure that we don't run out of file descriptors even if there
139+
# are more spill files than maxAllocatedDescs.
140+
141+
# Set max_files_per_process to a small value to make it more likely to run out
142+
# of max open file descriptors.
143+
$node_master->safe_psql('postgres',
144+
'ALTER SYSTEM SET max_files_per_process = 26;');
145+
$node_master->restart;
146+
147+
$node_master->safe_psql(
148+
'postgres',q{
149+
do $$
150+
BEGIN
151+
FOR i IN 1..10 LOOP
152+
BEGIN
153+
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
154+
EXCEPTION
155+
when division_by_zero then perform 'dummy';
156+
END;
157+
END LOOP;
158+
END $$;
159+
});
160+
161+
$result =$node_master->safe_psql('postgres',
162+
qq[
163+
set logical_decoding_work_mem to 64; -- generate plenty of .spill files
164+
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
165+
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
166+
]);
167+
168+
$expected =q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
169+
is($result,$expected,'got expected output from spilling subxacts session');
170+
171+
# Reset back max_files_per_process
172+
$node_master->safe_psql('postgres',
173+
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
174+
$node_master->restart;
175+
138176
# done with the node
139177
$node_master->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp