57
57
#include "replication/walsender.h"
58
58
#include "replication/syncrep.h"
59
59
#include "storage/fd.h"
60
+ #include "storage/ipc.h"
60
61
#include "storage/predicate.h"
61
62
#include "storage/proc.h"
62
63
#include "storage/procarray.h"
@@ -81,25 +82,25 @@ intmax_prepared_xacts = 0;
81
82
*
82
83
* The lifecycle of a global transaction is:
83
84
*
84
- * 1. After checking that the requested GID is not in use, set up an
85
- *entry in the TwoPhaseState->prepXacts array with the correctXID andGID ,
86
- *with locking_xid = my own XID and valid = false .
85
+ * 1. After checking that the requested GID is not in use, set up an entry in
86
+ * the TwoPhaseState->prepXacts array with the correctGID andvalid = false ,
87
+ *and mark it as locked by my backend .
87
88
*
88
89
* 2. After successfully completing prepare, set valid = true and enter the
89
90
* referenced PGPROC into the global ProcArray.
90
91
*
91
- * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry
92
- *is valid andits locking_xid is no longer active, then store my current
93
- *XID intolocking_xid . This prevents concurrent attempts to commit or
94
- * rollback the same prepared xact.
92
+ * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
93
+ * valid andnot locked, then mark the entry as locked by storing my current
94
+ *backend ID intolocking_backend . This prevents concurrent attempts to
95
+ *commit or rollback the same prepared xact.
95
96
*
96
97
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
97
98
* from the ProcArray and the TwoPhaseState->prepXacts array and return it to
98
99
* the freelist.
99
100
*
100
101
* Note that if the preparing transaction fails between steps 1 and 2, the
101
- * entrywill remain in prepXacts until recycled. We can detect recyclable
102
- *entries by checking for valid = false and locking_xid no longer active .
102
+ * entrymust be removed so that the GID and the GlobalTransaction struct
103
+ *can be reused. See AtAbort_Twophase() .
103
104
*
104
105
* typedef struct GlobalTransactionData *GlobalTransaction appears in
105
106
* twophase.h
@@ -114,8 +115,8 @@ typedef struct GlobalTransactionData
114
115
TimestampTz prepared_at ;/* time of preparation */
115
116
XLogRecPtr prepare_lsn ;/* XLOG offset of prepare record */
116
117
Oid owner ;/* ID of user that executed the xact */
117
- TransactionId locking_xid ; /*top-level XID of backend working on xact */
118
- bool valid ;/* TRUE iffully prepared */
118
+ BackendId locking_backend ; /* backendcurrently working on the xact */
119
+ bool valid ;/* TRUE ifPGPROC entry is in proc array */
119
120
char gid [GIDSIZE ];/* The GID assigned to the prepared xact */
120
121
}GlobalTransactionData ;
121
122
@@ -140,6 +141,12 @@ typedef struct TwoPhaseStateData
140
141
141
142
static TwoPhaseStateData * TwoPhaseState ;
142
143
144
+ /*
145
+ * Global transaction entry currently locked by us, if any.
146
+ */
147
+ static GlobalTransaction MyLockedGxact = NULL ;
148
+
149
+ static bool twophaseExitRegistered = false;
143
150
144
151
static void RecordTransactionCommitPrepared (TransactionId xid ,
145
152
int nchildren ,
@@ -156,6 +163,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
156
163
RelFileNode * rels );
157
164
static void ProcessRecords (char * bufptr ,TransactionId xid ,
158
165
const TwoPhaseCallback callbacks []);
166
+ static void RemoveGXact (GlobalTransaction gxact );
159
167
160
168
161
169
/*
@@ -229,6 +237,74 @@ TwoPhaseShmemInit(void)
229
237
Assert (found );
230
238
}
231
239
240
+ /*
241
+ * Exit hook to unlock the global transaction entry we're working on.
242
+ */
243
+ static void
244
+ AtProcExit_Twophase (int code ,Datum arg )
245
+ {
246
+ /* same logic as abort */
247
+ AtAbort_Twophase ();
248
+ }
249
+
250
+ /*
251
+ * Abort hook to unlock the global transaction entry we're working on.
252
+ */
253
+ void
254
+ AtAbort_Twophase (void )
255
+ {
256
+ if (MyLockedGxact == NULL )
257
+ return ;
258
+
259
+ /*
260
+ * What to do with the locked global transaction entry? If we were in
261
+ * the process of preparing the transaction, but haven't written the WAL
262
+ * record and state file yet, the transaction must not be considered as
263
+ * prepared. Likewise, if we are in the process of finishing an
264
+ * already-prepared transaction, and fail after having already written
265
+ * the 2nd phase commit or rollback record to the WAL, the transaction
266
+ * should not be considered as prepared anymore. In those cases, just
267
+ * remove the entry from shared memory.
268
+ *
269
+ * Otherwise, the entry must be left in place so that the transaction
270
+ * can be finished later, so just unlock it.
271
+ *
272
+ * If we abort during prepare, after having written the WAL record, we
273
+ * might not have transfered all locks and other state to the prepared
274
+ * transaction yet. Likewise, if we abort during commit or rollback,
275
+ * after having written the WAL record, we might not have released
276
+ * all the resources held by the transaction yet. In those cases, the
277
+ * in-memory state can be wrong, but it's too late to back out.
278
+ */
279
+ if (!MyLockedGxact -> valid )
280
+ {
281
+ RemoveGXact (MyLockedGxact );
282
+ }
283
+ else
284
+ {
285
+ LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
286
+
287
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
288
+
289
+ LWLockRelease (TwoPhaseStateLock );
290
+ }
291
+ MyLockedGxact = NULL ;
292
+ }
293
+
294
+ /*
295
+ * This is called after we have finished transfering state to the prepared
296
+ * PGXACT entry.
297
+ */
298
+ void
299
+ PostPrepare_Twophase ()
300
+ {
301
+ LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
302
+ MyLockedGxact -> locking_backend = InvalidBackendId ;
303
+ LWLockRelease (TwoPhaseStateLock );
304
+
305
+ MyLockedGxact = NULL ;
306
+ }
307
+
232
308
233
309
/*
234
310
* MarkAsPreparing
@@ -260,29 +336,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
260
336
errmsg ("prepared transactions are disabled" ),
261
337
errhint ("Set max_prepared_transactions to a nonzero value." )));
262
338
263
- LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
264
-
265
- /*
266
- * First, find and recycle any gxacts that failed during prepare. We do
267
- * this partly to ensure we don't mistakenly say their GIDs are still
268
- * reserved, and partly so we don't fail on out-of-slots unnecessarily.
269
- */
270
- for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
339
+ /* on first call, register the exit hook */
340
+ if (!twophaseExitRegistered )
271
341
{
272
- gxact = TwoPhaseState -> prepXacts [i ];
273
- if (!gxact -> valid && !TransactionIdIsActive (gxact -> locking_xid ))
274
- {
275
- /* It's dead Jim ... remove from the active array */
276
- TwoPhaseState -> numPrepXacts -- ;
277
- TwoPhaseState -> prepXacts [i ]= TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ];
278
- /* and put it back in the freelist */
279
- gxact -> next = TwoPhaseState -> freeGXacts ;
280
- TwoPhaseState -> freeGXacts = gxact ;
281
- /* Back up index count too, so we don't miss scanning one */
282
- i -- ;
283
- }
342
+ on_shmem_exit (AtProcExit_Twophase ,0 );
343
+ twophaseExitRegistered = true;
284
344
}
285
345
346
+ LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
347
+
286
348
/* Check for conflicting GID */
287
349
for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
288
350
{
@@ -339,14 +401,20 @@ MarkAsPreparing(TransactionId xid, const char *gid,
339
401
/* initialize LSN to 0 (start of WAL) */
340
402
gxact -> prepare_lsn = 0 ;
341
403
gxact -> owner = owner ;
342
- gxact -> locking_xid = xid ;
404
+ gxact -> locking_backend = MyBackendId ;
343
405
gxact -> valid = false;
344
406
strcpy (gxact -> gid ,gid );
345
407
346
408
/* And insert it into the active array */
347
409
Assert (TwoPhaseState -> numPrepXacts < max_prepared_xacts );
348
410
TwoPhaseState -> prepXacts [TwoPhaseState -> numPrepXacts ++ ]= gxact ;
349
411
412
+ /*
413
+ * Remember that we have this GlobalTransaction entry locked for us.
414
+ * If we abort after this, we must release it.
415
+ */
416
+ MyLockedGxact = gxact ;
417
+
350
418
LWLockRelease (TwoPhaseStateLock );
351
419
352
420
return gxact ;
@@ -409,6 +477,13 @@ LockGXact(const char *gid, Oid user)
409
477
{
410
478
int i ;
411
479
480
+ /* on first call, register the exit hook */
481
+ if (!twophaseExitRegistered )
482
+ {
483
+ on_shmem_exit (AtProcExit_Twophase ,0 );
484
+ twophaseExitRegistered = true;
485
+ }
486
+
412
487
LWLockAcquire (TwoPhaseStateLock ,LW_EXCLUSIVE );
413
488
414
489
for (i = 0 ;i < TwoPhaseState -> numPrepXacts ;i ++ )
@@ -423,15 +498,11 @@ LockGXact(const char *gid, Oid user)
423
498
continue ;
424
499
425
500
/* Found it, but has someone else got it locked? */
426
- if (TransactionIdIsValid (gxact -> locking_xid ))
427
- {
428
- if (TransactionIdIsActive (gxact -> locking_xid ))
429
- ereport (ERROR ,
430
- (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
431
- errmsg ("prepared transaction with identifier \"%s\" is busy" ,
432
- gid )));
433
- gxact -> locking_xid = InvalidTransactionId ;
434
- }
501
+ if (gxact -> locking_backend != InvalidBackendId )
502
+ ereport (ERROR ,
503
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
504
+ errmsg ("prepared transaction with identifier \"%s\" is busy" ,
505
+ gid )));
435
506
436
507
if (user != gxact -> owner && !superuser_arg (user ))
437
508
ereport (ERROR ,
@@ -452,7 +523,8 @@ LockGXact(const char *gid, Oid user)
452
523
errhint ("Connect to the database where the transaction was prepared to finish it." )));
453
524
454
525
/* OK for me to lock it */
455
- gxact -> locking_xid = GetTopTransactionId ();
526
+ gxact -> locking_backend = MyBackendId ;
527
+ MyLockedGxact = gxact ;
456
528
457
529
LWLockRelease (TwoPhaseStateLock );
458
530
@@ -1088,6 +1160,13 @@ EndPrepare(GlobalTransaction gxact)
1088
1160
*/
1089
1161
MyPgXact -> delayChkpt = false;
1090
1162
1163
+ /*
1164
+ * Remember that we have this GlobalTransaction entry locked for us. If
1165
+ * we crash after this point, it's too late to abort, but we must unlock
1166
+ * it so that the prepared transaction can be committed or rolled back.
1167
+ */
1168
+ MyLockedGxact = gxact ;
1169
+
1091
1170
END_CRIT_SECTION ();
1092
1171
1093
1172
/*
@@ -1334,8 +1413,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1334
1413
1335
1414
/*
1336
1415
* In case we fail while running the callbacks, mark the gxact invalid so
1337
- * no one else will try to commit/rollback, and so it can be recycled
1338
- * properly later. It is still locked by our XID so it won't go away yet.
1416
+ * no one else will try to commit/rollback, and so it will be recycled
1417
+ * if we fail after this point. It is still locked by our backend so it
1418
+ * won't go away yet.
1339
1419
*
1340
1420
* (We assume it's safe to do this without taking TwoPhaseStateLock.)
1341
1421
*/
@@ -1395,6 +1475,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1395
1475
RemoveTwoPhaseFile (xid , true);
1396
1476
1397
1477
RemoveGXact (gxact );
1478
+ MyLockedGxact = NULL ;
1398
1479
1399
1480
pfree (buf );
1400
1481
}