@@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
103103CommandId combocid ;/* just for debugging */
104104}ReorderBufferTupleCidEnt ;
105105
106+ /* Virtual file descriptor with file offset tracking */
107+ typedef struct TXNEntryFile
108+ {
109+ File vfd ;/* -1 when the file is closed */
110+ off_t curOffset ;/* offset for next write or read. Reset to 0
111+ * when vfd is opened. */
112+ }TXNEntryFile ;
113+
106114/* k-way in-order change iteration support structures */
107115typedef struct ReorderBufferIterTXNEntry
108116{
109117XLogRecPtr lsn ;
110118ReorderBufferChange * change ;
111119ReorderBufferTXN * txn ;
112- int fd ;
120+ TXNEntryFile file ;
113121XLogSegNo segno ;
114122}ReorderBufferIterTXNEntry ;
115123
@@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178186 * subtransactions
179187 * ---------------------------------------
180188 */
181- static ReorderBufferIterTXNState * ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn );
189+ static void ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
190+ ReorderBufferIterTXNState * volatile * iter_state );
182191static ReorderBufferChange * ReorderBufferIterTXNNext (ReorderBuffer * rb ,ReorderBufferIterTXNState * state );
183192static void ReorderBufferIterTXNFinish (ReorderBuffer * rb ,
184193ReorderBufferIterTXNState * state );
@@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194203static void ReorderBufferSerializeChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
195204int fd ,ReorderBufferChange * change );
196205static Size ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
197- int * fd ,XLogSegNo * segno );
206+ TXNEntryFile * file ,XLogSegNo * segno );
198207static void ReorderBufferRestoreChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
199208char * change );
200209static void ReorderBufferRestoreCleanup (ReorderBuffer * rb ,ReorderBufferTXN * txn );
@@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945954/*
946955 * Allocate & initialize an iterator which iterates in lsn order over a
947956 * transaction and all its subtransactions.
957+ *
958+ * Note: The iterator state is returned through iter_state parameter rather
959+ * than the function's return value. This is because the state gets cleaned up
960+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
961+ * back the state even if this function throws an exception.
948962 */
949- static ReorderBufferIterTXNState *
950- ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn )
963+ static void
964+ ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
965+ ReorderBufferIterTXNState * volatile * iter_state )
951966{
952967Size nr_txns = 0 ;
953968ReorderBufferIterTXNState * state ;
954969dlist_iter cur_txn_i ;
955970int32 off ;
956971
972+ * iter_state = NULL ;
973+
957974/*
958975 * Calculate the size of our heap: one element for every transaction that
959976 * contains changes. (Besides the transactions already in the reorder
@@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9881005
9891006for (off = 0 ;off < state -> nr_txns ;off ++ )
9901007{
991- state -> entries [off ].fd = -1 ;
1008+ state -> entries [off ].file . vfd = -1 ;
9921009state -> entries [off ].segno = 0 ;
9931010}
9941011
@@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
9971014ReorderBufferIterCompare ,
9981015state );
9991016
1017+ /* Now that the state fields are initialized, it is safe to return it. */
1018+ * iter_state = state ;
1019+
10001020/*
10011021 * Now insert items into the binary heap, in an unordered fashion. (We
10021022 * will run a heap assembly step at the end; this is more efficient.)
@@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10131033{
10141034/* serialize remaining changes */
10151035ReorderBufferSerializeTXN (rb ,txn );
1016- ReorderBufferRestoreChanges (rb ,txn ,& state -> entries [off ].fd ,
1036+ ReorderBufferRestoreChanges (rb ,txn ,& state -> entries [off ].file ,
10171037& state -> entries [off ].segno );
10181038}
10191039
@@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10431063/* serialize remaining changes */
10441064ReorderBufferSerializeTXN (rb ,cur_txn );
10451065ReorderBufferRestoreChanges (rb ,cur_txn ,
1046- & state -> entries [off ].fd ,
1066+ & state -> entries [off ].file ,
10471067& state -> entries [off ].segno );
10481068}
10491069cur_change = dlist_head_element (ReorderBufferChange ,node ,
@@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
10591079
10601080/* assemble a valid binary heap */
10611081binaryheap_build (state -> heap );
1062-
1063- return state ;
10641082}
10651083
10661084/*
@@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
11241142dlist_delete (& change -> node );
11251143dlist_push_tail (& state -> old_change ,& change -> node );
11261144
1127- if (ReorderBufferRestoreChanges (rb ,entry -> txn ,& entry -> fd ,
1145+ if (ReorderBufferRestoreChanges (rb ,entry -> txn ,& entry -> file ,
11281146& state -> entries [off ].segno ))
11291147{
11301148/* successfully restored changes from disk */
@@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
11631181
11641182for (off = 0 ;off < state -> nr_txns ;off ++ )
11651183{
1166- if (state -> entries [off ].fd != -1 )
1167- CloseTransientFile (state -> entries [off ].fd );
1184+ if (state -> entries [off ].file . vfd != -1 )
1185+ FileClose (state -> entries [off ].file . vfd );
11681186}
11691187
11701188/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
15001518
15011519rb -> begin (rb ,txn );
15021520
1503- iterstate = ReorderBufferIterTXNInit (rb ,txn );
1521+ ReorderBufferIterTXNInit (rb ,txn , & iterstate );
15041522while ((change = ReorderBufferIterTXNNext (rb ,iterstate ))!= NULL )
15051523{
15061524Relation relation = NULL ;
@@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
25172535 */
25182536static Size
25192537ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
2520- int * fd ,XLogSegNo * segno )
2538+ TXNEntryFile * file ,XLogSegNo * segno )
25212539{
25222540Size restored = 0 ;
25232541XLogSegNo last_segno ;
25242542dlist_mutable_iter cleanup_iter ;
2543+ File * fd = & file -> vfd ;
25252544
25262545Assert (txn -> first_lsn != InvalidXLogRecPtr );
25272546Assert (txn -> final_lsn != InvalidXLogRecPtr );
@@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25622581ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,
25632582* segno );
25642583
2565- * fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY );
2584+ * fd = PathNameOpenFile (path ,O_RDONLY |PG_BINARY );
2585+
2586+ /* No harm in resetting the offset even in case of failure */
2587+ file -> curOffset = 0 ;
2588+
25662589if (* fd < 0 && errno == ENOENT )
25672590{
25682591* fd = -1 ;
@@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
25822605 * end of this file.
25832606 */
25842607ReorderBufferSerializeReserve (rb ,sizeof (ReorderBufferDiskChange ));
2585- pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ );
2586- readBytes = read ( * fd , rb -> outbuf , sizeof (ReorderBufferDiskChange ));
2587- pgstat_report_wait_end ( );
2608+ readBytes = FileRead ( file -> vfd , rb -> outbuf ,
2609+ sizeof (ReorderBufferDiskChange ),
2610+ file -> curOffset , WAIT_EVENT_REORDER_BUFFER_READ );
25882611
25892612/* eof */
25902613if (readBytes == 0 )
25912614{
2592- CloseTransientFile (* fd );
2615+ FileClose (* fd );
25932616* fd = -1 ;
25942617(* segno )++ ;
25952618continue ;
@@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26052628readBytes ,
26062629(uint32 )sizeof (ReorderBufferDiskChange ))));
26072630
2631+ file -> curOffset += readBytes ;
2632+
26082633ondisk = (ReorderBufferDiskChange * )rb -> outbuf ;
26092634
26102635ReorderBufferSerializeReserve (rb ,
26112636sizeof (ReorderBufferDiskChange )+ ondisk -> size );
26122637ondisk = (ReorderBufferDiskChange * )rb -> outbuf ;
26132638
2614- pgstat_report_wait_start (WAIT_EVENT_REORDER_BUFFER_READ );
2615- readBytes = read (* fd ,rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2616- ondisk -> size - sizeof (ReorderBufferDiskChange ));
2617- pgstat_report_wait_end ();
2639+ readBytes = FileRead (file -> vfd ,
2640+ rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2641+ ondisk -> size - sizeof (ReorderBufferDiskChange ),
2642+ file -> curOffset ,
2643+ WAIT_EVENT_REORDER_BUFFER_READ );
26182644
26192645if (readBytes < 0 )
26202646ereport (ERROR ,
@@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
26272653readBytes ,
26282654(uint32 ) (ondisk -> size - sizeof (ReorderBufferDiskChange )))));
26292655
2656+ file -> curOffset += readBytes ;
2657+
26302658/*
26312659 * ok, read a full change from disk, now restore it into proper
26322660 * in-memory format