45
45
* fsynced
46
46
** If COMMIT happens after checkpoint then backend reads state data from
47
47
* files
48
- **In case ofcrash replay will move data from xlog to files, if that
49
- *hasn't happened before. XXX TODO - move toshmem in replay also
48
+ **Simplified version ofthe same scenario happens during recovery and
49
+ *replication. See comments toKnownPreparedXact structure.
50
50
*
51
51
*-------------------------------------------------------------------------
52
52
*/
@@ -181,6 +181,35 @@ static GlobalTransaction MyLockedGxact = NULL;
181
181
182
182
static bool twophaseExitRegistered = false;
183
183
184
+ /*
185
+ * During replay and replication KnownPreparedList holds info about active prepared
186
+ * transactions that weren't moved to files yet. We will need that info by the end of
187
+ * recovery (including promote) to restore memory state of that transactions.
188
+ *
189
+ * Naive approach here is to move each PREPARE record to disk, fsync it and don't have
190
+ * that list at all, but that provokes a lot of unnecessary fsyncs on small files
191
+ * causing replica to be slower than master.
192
+ *
193
+ * Replay of twophase records happens by the following rules:
194
+ ** On PREPARE redo KnownPreparedAdd() is called to add that transaction to
195
+ * KnownPreparedList and no more actions taken.
196
+ ** On checkpoint we iterate through KnownPreparedList, move all prepare
197
+ * records that behind redo_horizon to file and deleting items from list.
198
+ ** On COMMIT/ABORT we delete file or entry in KnownPreparedList.
199
+ ** At the end of recovery we move all known prepared transactions to disk
200
+ * to allow RecoverPreparedTransactions/StandbyRecoverPreparedTransactions
201
+ * do their work.
202
+ */
203
+ typedef struct KnownPreparedXact
204
+ {
205
+ TransactionId xid ;
206
+ XLogRecPtr prepare_start_lsn ;
207
+ XLogRecPtr prepare_end_lsn ;
208
+ dlist_node list_node ;
209
+ }KnownPreparedXact ;
210
+
211
+ static dlist_head KnownPreparedList = DLIST_STATIC_INIT (KnownPreparedList );
212
+
184
213
static void RecordTransactionCommitPrepared (TransactionId xid ,
185
214
int nchildren ,
186
215
TransactionId * children ,
@@ -200,82 +229,6 @@ static void RemoveGXact(GlobalTransaction gxact);
200
229
201
230
static void XlogReadTwoPhaseData (XLogRecPtr lsn ,char * * buf ,int * len );
202
231
203
-
204
- dlist_head StandbyTwoPhaseStateData = DLIST_STATIC_INIT (StandbyTwoPhaseStateData );
205
-
206
- typedef struct StandbyPreparedTransaction
207
- {
208
- TransactionId xid ;
209
- XLogRecPtr prepare_start_lsn ;
210
- XLogRecPtr prepare_end_lsn ;
211
- dlist_node list_node ;
212
- }StandbyPreparedTransaction ;
213
-
214
- void
215
- StandbyCheckPointTwoPhase (XLogRecPtr redo_horizon )
216
- {
217
- dlist_mutable_iter miter ;
218
- int serialized_xacts = 0 ;
219
-
220
- // Assert(RecoveryInProgress());
221
-
222
- TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
223
-
224
- dlist_foreach_modify (miter ,& StandbyTwoPhaseStateData )
225
- {
226
- StandbyPreparedTransaction * xact = dlist_container (StandbyPreparedTransaction ,
227
- list_node ,miter .cur );
228
-
229
- if (redo_horizon == InvalidXLogRecPtr || xact -> prepare_end_lsn <=redo_horizon )
230
- {
231
- char * buf ;
232
- int len ;
233
-
234
- XlogReadTwoPhaseData (xact -> prepare_start_lsn ,& buf ,& len );
235
- RecreateTwoPhaseFile (xact -> xid ,buf ,len );
236
- pfree (buf );
237
- dlist_delete (miter .cur );
238
- serialized_xacts ++ ;
239
- }
240
- }
241
-
242
- TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
243
-
244
- if (log_checkpoints && serialized_xacts > 0 )
245
- ereport (LOG ,
246
- (errmsg_plural ("%u two-phase state file was written "
247
- "for long-running prepared transactions" ,
248
- "%u two-phase state files were written "
249
- "for long-running prepared transactions" ,
250
- serialized_xacts ,
251
- serialized_xacts )));
252
- }
253
-
254
- // XXX: rename to remove_standby_state
255
- void
256
- StandbyAtCommit (TransactionId xid )
257
- {
258
- dlist_mutable_iter miter ;
259
-
260
- Assert (RecoveryInProgress ());
261
-
262
- dlist_foreach_modify (miter ,& StandbyTwoPhaseStateData )
263
- {
264
- StandbyPreparedTransaction * xact = dlist_container (StandbyPreparedTransaction ,
265
- list_node ,miter .cur );
266
-
267
- if (xact -> xid == xid )
268
- {
269
- dlist_delete (miter .cur );
270
- return ;
271
- }
272
- }
273
-
274
- RemoveTwoPhaseFile (xid , false);
275
- }
276
-
277
-
278
-
279
232
/*
280
233
* Initialization of shared memory
281
234
*/
@@ -1729,18 +1682,25 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
1729
1682
serialized_xacts )));
1730
1683
}
1731
1684
1685
+ /*
1686
+ * KnownPreparedAdd.
1687
+ *
1688
+ * Store correspondence of start/end lsn and xid in KnownPreparedList.
1689
+ * This is called during redo of prepare record to have list of prepared
1690
+ * transactions that aren't yet moved to 2PC files by the end of recovery.
1691
+ */
1732
1692
void
1733
- StandbyAtPrepare (XLogReaderState * record )
1693
+ KnownPreparedAdd (XLogReaderState * record )
1734
1694
{
1735
- StandbyPreparedTransaction * xact ;
1695
+ KnownPreparedXact * xact ;
1736
1696
TwoPhaseFileHeader * hdr = (TwoPhaseFileHeader * )XLogRecGetData (record );
1737
1697
1738
- xact = (StandbyPreparedTransaction * )palloc (sizeof (StandbyPreparedTransaction ));
1698
+ xact = (KnownPreparedXact * )palloc (sizeof (KnownPreparedXact ));
1739
1699
xact -> xid = hdr -> xid ;
1740
1700
xact -> prepare_start_lsn = record -> ReadRecPtr ;
1741
1701
xact -> prepare_end_lsn = record -> EndRecPtr ;
1742
1702
1743
- dlist_push_tail (& StandbyTwoPhaseStateData ,& xact -> list_node );
1703
+ dlist_push_tail (& KnownPreparedList ,& xact -> list_node );
1744
1704
}
1745
1705
1746
1706
/*
@@ -1781,7 +1741,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1781
1741
int nxids = 0 ;
1782
1742
int allocsize = 0 ;
1783
1743
1784
- StandbyCheckPointTwoPhase ( 0 );
1744
+ KnownPreparedRecreateFiles ( InvalidXLogRecPtr );
1785
1745
1786
1746
cldir = AllocateDir (TWOPHASE_DIR );
1787
1747
while ((clde = ReadDir (cldir ,TWOPHASE_DIR ))!= NULL )
@@ -2254,3 +2214,88 @@ RecordTransactionAbortPrepared(TransactionId xid,
2254
2214
*/
2255
2215
SyncRepWaitForLSN (recptr , false);
2256
2216
}
2217
+
2218
+ /*
2219
+ * KnownPreparedRemoveByXid
2220
+ *
2221
+ * Forget about prepared transaction. Called durind commit/abort.
2222
+ */
2223
+ void
2224
+ KnownPreparedRemoveByXid (TransactionId xid )
2225
+ {
2226
+ dlist_mutable_iter miter ;
2227
+
2228
+ Assert (RecoveryInProgress ());
2229
+
2230
+ dlist_foreach_modify (miter ,& KnownPreparedList )
2231
+ {
2232
+ KnownPreparedXact * xact = dlist_container (KnownPreparedXact ,
2233
+ list_node ,miter .cur );
2234
+
2235
+ if (xact -> xid == xid )
2236
+ {
2237
+ dlist_delete (miter .cur );
2238
+ /*
2239
+ * Since we found entry in KnownPreparedList we know that file isn't
2240
+ * on disk yet and we can end up here.
2241
+ */
2242
+ return ;
2243
+ }
2244
+ }
2245
+
2246
+ /*
2247
+ * Here we know that file should be moved to disk. But aborting recovery because
2248
+ * of absence of unnecessary file doesn't seems to be a good idea, so call remove
2249
+ * with giveWarning=false.
2250
+ */
2251
+ RemoveTwoPhaseFile (xid , false);
2252
+ }
2253
+
2254
+ /*
2255
+ * KnownPreparedRecreateFiles
2256
+ *
2257
+ * Moves prepare records from WAL to files. Callend during checkpoint replay
2258
+ * or PrescanPreparedTransactions.
2259
+ *
2260
+ * redo_horizon = InvalidXLogRecPtr indicates that all transactions from
2261
+ *KnownPreparedList should be moved to disk.
2262
+ */
2263
+ void
2264
+ KnownPreparedRecreateFiles (XLogRecPtr redo_horizon )
2265
+ {
2266
+ dlist_mutable_iter miter ;
2267
+ int serialized_xacts = 0 ;
2268
+
2269
+ Assert (RecoveryInProgress ());
2270
+
2271
+ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ();
2272
+
2273
+ dlist_foreach_modify (miter ,& KnownPreparedList )
2274
+ {
2275
+ KnownPreparedXact * xact = dlist_container (KnownPreparedXact ,
2276
+ list_node ,miter .cur );
2277
+
2278
+ if (xact -> prepare_end_lsn <=redo_horizon || redo_horizon == InvalidXLogRecPtr )
2279
+ {
2280
+ char * buf ;
2281
+ int len ;
2282
+
2283
+ XlogReadTwoPhaseData (xact -> prepare_start_lsn ,& buf ,& len );
2284
+ RecreateTwoPhaseFile (xact -> xid ,buf ,len );
2285
+ pfree (buf );
2286
+ dlist_delete (miter .cur );
2287
+ serialized_xacts ++ ;
2288
+ }
2289
+ }
2290
+
2291
+ TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ();
2292
+
2293
+ if (log_checkpoints && serialized_xacts > 0 )
2294
+ ereport (LOG ,
2295
+ (errmsg_plural ("%u two-phase state file was written "
2296
+ "for long-running prepared transactions" ,
2297
+ "%u two-phase state files were written "
2298
+ "for long-running prepared transactions" ,
2299
+ serialized_xacts ,
2300
+ serialized_xacts )));
2301
+ }