@@ -103,7 +103,7 @@ typedef struct ReorderBufferIterTXNEntry
103
103
XLogRecPtr lsn ;
104
104
ReorderBufferChange * change ;
105
105
ReorderBufferTXN * txn ;
106
- int fd ;
106
+ File fd ;
107
107
XLogSegNo segno ;
108
108
}ReorderBufferIterTXNEntry ;
109
109
@@ -181,7 +181,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
181
181
* subtransactions
182
182
* ---------------------------------------
183
183
*/
184
- static ReorderBufferIterTXNState * ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn );
184
+ static void ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
185
+ ReorderBufferIterTXNState * volatile * iter_state );
185
186
static ReorderBufferChange * ReorderBufferIterTXNNext (ReorderBuffer * rb ,ReorderBufferIterTXNState * state );
186
187
static void ReorderBufferIterTXNFinish (ReorderBuffer * rb ,
187
188
ReorderBufferIterTXNState * state );
@@ -197,7 +198,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
197
198
static void ReorderBufferSerializeChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
198
199
int fd ,ReorderBufferChange * change );
199
200
static Size ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
200
- int * fd ,XLogSegNo * segno );
201
+ File * fd ,XLogSegNo * segno );
201
202
static void ReorderBufferRestoreChange (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
202
203
char * change );
203
204
static void ReorderBufferRestoreCleanup (ReorderBuffer * rb ,ReorderBufferTXN * txn );
@@ -953,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
953
954
/*
954
955
* Allocate & initialize an iterator which iterates in lsn order over a
955
956
* transaction and all its subtransactions.
957
+ *
958
+ * Note: The iterator state is returned through iter_state parameter rather
959
+ * than the function's return value. This is because the state gets cleaned up
960
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
961
+ * back the state even if this function throws an exception.
956
962
*/
957
- static ReorderBufferIterTXNState *
958
- ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn )
963
+ static void
964
+ ReorderBufferIterTXNInit (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
965
+ ReorderBufferIterTXNState * volatile * iter_state )
959
966
{
960
967
Size nr_txns = 0 ;
961
968
ReorderBufferIterTXNState * state ;
962
969
dlist_iter cur_txn_i ;
963
970
int32 off ;
964
971
972
+ * iter_state = NULL ;
973
+
965
974
/*
966
975
* Calculate the size of our heap: one element for every transaction that
967
976
* contains changes. (Besides the transactions already in the reorder
@@ -1005,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1005
1014
ReorderBufferIterCompare ,
1006
1015
state );
1007
1016
1017
+ /* Now that the state fields are initialized, it is safe to return it. */
1018
+ * iter_state = state ;
1019
+
1008
1020
/*
1009
1021
* Now insert items into the binary heap, in an unordered fashion. (We
1010
1022
* will run a heap assembly step at the end; this is more efficient.)
@@ -1067,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1067
1079
1068
1080
/* assemble a valid binary heap */
1069
1081
binaryheap_build (state -> heap );
1070
-
1071
- return state ;
1072
1082
}
1073
1083
1074
1084
/*
@@ -1172,7 +1182,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1172
1182
for (off = 0 ;off < state -> nr_txns ;off ++ )
1173
1183
{
1174
1184
if (state -> entries [off ].fd != -1 )
1175
- CloseTransientFile (state -> entries [off ].fd );
1185
+ FileClose (state -> entries [off ].fd );
1176
1186
}
1177
1187
1178
1188
/* free memory we might have "leaked" in the last *Next call */
@@ -1508,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1508
1518
1509
1519
rb -> begin (rb ,txn );
1510
1520
1511
- iterstate = ReorderBufferIterTXNInit (rb ,txn );
1521
+ ReorderBufferIterTXNInit (rb ,txn , & iterstate );
1512
1522
while ((change = ReorderBufferIterTXNNext (rb ,iterstate ))!= NULL )
1513
1523
{
1514
1524
Relation relation = NULL ;
@@ -2465,7 +2475,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2465
2475
*/
2466
2476
static Size
2467
2477
ReorderBufferRestoreChanges (ReorderBuffer * rb ,ReorderBufferTXN * txn ,
2468
- int * fd ,XLogSegNo * segno )
2478
+ File * fd ,XLogSegNo * segno )
2469
2479
{
2470
2480
Size restored = 0 ;
2471
2481
XLogSegNo last_segno ;
@@ -2510,7 +2520,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2510
2520
ReorderBufferSerializedPath (path ,MyReplicationSlot ,txn -> xid ,
2511
2521
* segno );
2512
2522
2513
- * fd = OpenTransientFile (path ,O_RDONLY |PG_BINARY ,0 );
2523
+ * fd = PathNameOpenFile (path ,O_RDONLY |PG_BINARY ,0 );
2514
2524
if (* fd < 0 && errno == ENOENT )
2515
2525
{
2516
2526
* fd = -1 ;
@@ -2531,14 +2541,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2531
2541
* end of this file.
2532
2542
*/
2533
2543
ReorderBufferSerializeReserve (rb ,sizeof (ReorderBufferDiskChange ));
2534
- pgstat_report_wait_start (WAIT_EVENT_REORDER_BUFFER_READ );
2535
- readBytes = read (* fd ,rb -> outbuf ,sizeof (ReorderBufferDiskChange ));
2536
- pgstat_report_wait_end ();
2544
+ readBytes = FileRead (* fd ,rb -> outbuf ,sizeof (ReorderBufferDiskChange ),
2545
+ WAIT_EVENT_REORDER_BUFFER_READ );
2537
2546
2538
2547
/* eof */
2539
2548
if (readBytes == 0 )
2540
2549
{
2541
- CloseTransientFile (* fd );
2550
+ FileClose (* fd );
2542
2551
* fd = -1 ;
2543
2552
(* segno )++ ;
2544
2553
continue ;
@@ -2560,10 +2569,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2560
2569
sizeof (ReorderBufferDiskChange )+ ondisk -> size );
2561
2570
ondisk = (ReorderBufferDiskChange * )rb -> outbuf ;
2562
2571
2563
- pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ );
2564
- readBytes = read ( * fd , rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2565
- ondisk -> size - sizeof (ReorderBufferDiskChange ));
2566
- pgstat_report_wait_end ( );
2572
+ readBytes = FileRead ( * fd ,
2573
+ rb -> outbuf + sizeof (ReorderBufferDiskChange ),
2574
+ ondisk -> size - sizeof (ReorderBufferDiskChange ),
2575
+ WAIT_EVENT_REORDER_BUFFER_READ );
2567
2576
2568
2577
if (readBytes < 0 )
2569
2578
ereport (ERROR ,