@@ -109,7 +109,7 @@ typedef struct ReorderBufferIterTXNEntry
109
109
XLogRecPtr lsn ;
110
110
ReorderBufferChange * change ;
111
111
ReorderBufferTXN * txn ;
112
- int fd ;
112
+ File fd ;
113
113
XLogSegNo segno ;
114
114
}ReorderBufferIterTXNEntry ;
115
115
@@ -178,7 +178,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
178
178
* subtransactions
179
179
* ---------------------------------------
180
180
*/
181
- static ReorderBufferIterTXNState * ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn );
181
+ static void ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
182
+ ReorderBufferIterTXNState * volatile * iter_state );
182
183
static ReorderBufferChange * ReorderBufferIterTXNNext (ReorderBuffer * rb ,ReorderBufferIterTXNState * state );
183
184
static void ReorderBufferIterTXNFinish (ReorderBuffer * rb ,
184
185
ReorderBufferIterTXNState * state );
@@ -194,7 +195,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194
195
static void ReorderBufferSerializeChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
195
196
int fd ,ReorderBufferChange * change );
196
197
static Size ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
197
- int * fd ,XLogSegNo * segno );
198
+ File * fd ,XLogSegNo * segno );
198
199
static void ReorderBufferRestoreChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
199
200
char * change );
200
201
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb ,ReorderBufferTXN * txn );
@@ -945,15 +946,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
945
946
/*
946
947
* Allocate & initialize an iterator which iterates in lsn order over a
947
948
* transaction and all its subtransactions.
949
+ *
950
+ * Note: The iterator state is returned through iter_state parameter rather
951
+ * than the function's return value. This is because the state gets cleaned up
952
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
953
+ * back the state even if this function throws an exception.
948
954
*/
949
- static ReorderBufferIterTXNState *
950
- ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn )
955
+ static void
956
+ ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
957
+ ReorderBufferIterTXNState * volatile * iter_state )
951
958
{
952
959
Size nr_txns = 0 ;
953
960
ReorderBufferIterTXNState * state ;
954
961
dlist_iter cur_txn_i ;
955
962
int32 off ;
956
963
964
+ * iter_state = NULL ;
965
+
957
966
/*
958
967
* Calculate the size of our heap: one element for every transaction that
959
968
* contains changes. (Besides the transactions already in the reorder
@@ -997,6 +1006,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
997
1006
ReorderBufferIterCompare ,
998
1007
state );
999
1008
1009
+ /* Now that the state fields are initialized, it is safe to return it. */
1010
+ * iter_state = state ;
1011
+
1000
1012
/*
1001
1013
* Now insert items into the binary heap, in an unordered fashion. (We
1002
1014
* will run a heap assembly step at the end; this is more efficient.)
@@ -1059,8 +1071,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1059
1071
1060
1072
/* assemble a valid binary heap */
1061
1073
binaryheap_build (state -> heap );
1062
-
1063
- return state ;
1064
1074
}
1065
1075
1066
1076
/*
@@ -1164,7 +1174,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1164
1174
for (off = 0 ;off < state -> nr_txns ;off ++ )
1165
1175
{
1166
1176
if (state -> entries [off ].fd != -1 )
1167
- CloseTransientFile (state -> entries [off ].fd );
1177
+ FileClose (state -> entries [off ].fd );
1168
1178
}
1169
1179
1170
1180
/* free memory we might have "leaked" in the last *Next call */
@@ -1500,7 +1510,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1500
1510
1501
1511
rb -> begin (rb ,txn );
1502
1512
1503
- iterstate = ReorderBufferIterTXNInit (rb ,txn );
1513
+ ReorderBufferIterTXNInit (rb ,txn , & iterstate );
1504
1514
while ((change = ReorderBufferIterTXNNext (rb ,iterstate ))!= NULL )
1505
1515
{
1506
1516
Relation relation = NULL ;
@@ -2517,7 +2527,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2517
2527
*/
2518
2528
static Size
2519
2529
ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
2520
- int * fd ,XLogSegNo * segno )
2530
+ File * fd ,XLogSegNo * segno )
2521
2531
{
2522
2532
Size restored = 0 ;
2523
2533
XLogSegNo last_segno ;
@@ -2562,7 +2572,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2562
2572
ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,
2563
2573
* segno );
2564
2574
2565
- * fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY );
2575
+ * fd = PathNameOpenFile (path ,O_RDONLY |PG_BINARY );
2566
2576
if (* fd < 0 && errno == ENOENT )
2567
2577
{
2568
2578
* fd = -1 ;
@@ -2582,14 +2592,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2582
2592
* end of this file.
2583
2593
*/
2584
2594
ReorderBufferSerializeReserve (rb ,sizeof (ReorderBufferDiskChange ));
2585
- pgstat_report_wait_start (WAIT_EVENT_REORDER_BUFFER_READ );
2586
- readBytes = read (* fd ,rb -> outbuf ,sizeof (ReorderBufferDiskChange ));
2587
- pgstat_report_wait_end ();
2595
+ readBytes = FileRead (* fd ,rb -> outbuf ,sizeof (ReorderBufferDiskChange ),
2596
+ WAIT_EVENT_REORDER_BUFFER_READ );
2588
2597
2589
2598
/* eof */
2590
2599
if (readBytes == 0 )
2591
2600
{
2592
- CloseTransientFile (* fd );
2601
+ FileClose (* fd );
2593
2602
* fd = -1 ;
2594
2603
(* segno )++ ;
2595
2604
continue ;
@@ -2611,10 +2620,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2611
2620
sizeof (ReorderBufferDiskChange )+ ondisk -> size );
2612
2621
ondisk = (ReorderBufferDiskChange * )rb -> outbuf ;
2613
2622
2614
- pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ );
2615
- readBytes = read ( * fd , rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2616
- ondisk -> size - sizeof (ReorderBufferDiskChange ));
2617
- pgstat_report_wait_end ( );
2623
+ readBytes = FileRead ( * fd ,
2624
+ rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2625
+ ondisk -> size - sizeof (ReorderBufferDiskChange ),
2626
+ WAIT_EVENT_REORDER_BUFFER_READ );
2618
2627
2619
2628
if (readBytes < 0 )
2620
2629
ereport (ERROR ,