Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4dda4ec

Browse files
committed
2 parents4586a37 +7228f87 commit4dda4ec

File tree

8 files changed

+43
-19
lines changed

8 files changed

+43
-19
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ static void MtmReceiver(Datum arg)
10511051
MtmWakeUpBackend(ts);
10521052
}
10531053
}else {
1054+
elog(WARNING,"Receive PRECOMMITTED response for aborted transaction");// How it can happen? SHould we use assert here?
10541055
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
10551056
MtmWakeUpBackend(ts);
10561057
}

‎contrib/mmts/multimaster.c

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ void MtmPrecommitTransaction(char const* gid)
942942
ts->csn=MtmAssignCSN();
943943
MtmAdjustSubtransactions(ts);
944944
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
945+
SetPrepareTransactionState(ts->gid,"precommitted");
945946
}
946947
}
947948
MtmUnlock();
@@ -1311,6 +1312,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13111312

13121313
if (MtmIsCoordinator(ts)) {
13131314
inti;
1315+
Assert(false);// All broadcasts are now done through logical decoding
13141316
for (i=0;i<Mtm->nAllNodes;i++)
13151317
{
13161318
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask,i))
@@ -1560,7 +1562,10 @@ void MtmHandleApplyError(void)
15601562
}
15611563

15621564
/**
1563-
* Check status of all prepared transactions with coordinator at disabled node
1565+
* Check status of all prepared transactions with coordinator at disabled node.
1566+
* Actually, if node is precommitted (state == UNKNOWN) at any of nodes, then is is prepared at all nodes and so can be committed.
1567+
* But if coordinator of transaction is crashed, we made a decision about transaction commit only if transaction is precommitted at ALL live nodes.
1568+
* The reason is that we want to avoid extra polling to obtain maximum CSN from all nodes to assign it to committed transaction.
15641569
* Called only from MtmDisableNode in critical section.
15651570
*/
15661571
staticvoidMtmPollStatusOfPreparedTransactions(intdisabledNodeId)
@@ -1601,9 +1606,12 @@ static void MtmDisableNode(int nodeId)
16011606
Mtm->nodes[nodeId-1].lastStatusChangeTime=now;
16021607
Mtm->nodes[nodeId-1].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
16031608
if (nodeId!=MtmNodeId) {
1604-
Mtm->nLiveNodes-=1;
1609+
Mtm->nLiveNodes-=1;
1610+
}
1611+
if (Mtm->nLiveNodes >=Mtm->nAllNodes/2+1) {
1612+
/* Make decision about prepared transaction status only in quorum */
1613+
MtmPollStatusOfPreparedTransactions(nodeId);
16051614
}
1606-
MtmPollStatusOfPreparedTransactions(nodeId);
16071615
}
16081616

16091617
staticvoidMtmEnableNode(intnodeId)
@@ -3228,9 +3236,10 @@ bool MtmFilterTransaction(char* record, int size)
32283236
origin_node!=0&&
32293237
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
32303238

3231-
switch(PGLOGICAL_XACT_EVENT(flags))
3239+
switch(PGLOGICAL_XACT_EVENT(flags))
32323240
{
32333241
casePGLOGICAL_PREPARE:
3242+
casePGLOGICAL_PRECOMMIT_PREPARED:
32343243
casePGLOGICAL_ABORT_PREPARED:
32353244
gid=pq_getmsgstring(&s);
32363245
break;

‎contrib/mmts/multimaster.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#defineMTM_TXTRACE(tx,event)
4545
#else
4646
#defineMTM_TXTRACE(tx,event) \
47-
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event,getpid())
47+
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event,MyProcPid)
4848
#endif
4949

5050
#defineMULTIMASTER_NAME "multimaster"
@@ -160,6 +160,12 @@ typedef struct
160160
pgid_tgid;/* Global transaction identifier */
161161
}MtmArbiterMessage;
162162

163+
/*
164+
* Abort logical message is send by replica when error is happen while applying prepared transaction.
165+
* In this case we do not have prepared transaction and can not do abort-prepared.
166+
* But we have to record the fact of abort to be able to replay it in case of crash of coordinator of this transaction.
167+
* We are using logical abort message with code 'A' for it
168+
*/
163169
typedefstructMtmAbortLogicalMessage
164170
{
165171
pgid_tgid;

‎contrib/mmts/pglogical_apply.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,12 @@ process_remote_commit(StringInfo in)
625625
{
626626
casePGLOGICAL_PRECOMMIT_PREPARED:
627627
{
628+
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
628629
gid=pq_getmsgstring(in);
630+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s",MyProcPid,gid);
631+
MtmBeginSession(origin_node);
629632
MtmPrecommitTransaction(gid);
633+
MtmEndSession(origin_node, true);
630634
return;
631635
}
632636
casePGLOGICAL_COMMIT:
@@ -691,6 +695,7 @@ process_remote_commit(StringInfo in)
691695
{
692696
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
693697
gid=pq_getmsgstring(in);
698+
/* MtmRollbackPreparedTransaction will set origin session itself */
694699
MtmRollbackPreparedTransaction(origin_node,gid);
695700
break;
696701
}

‎contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
192192
else
193193
Assert(false);
194194

195-
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE||flags==PGLOGICAL_PRECOMMIT_PREPARED) {
195+
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
196+
/* COMMIT and PREPARE are preceded by BEGIN, which set MtmIsFilteredTxn flag */
196197
if (MtmIsFilteredTxn) {
197198
Assert(MtmTransactionRecords==0);
198199
return;

‎src/backend/access/transam/twophase.c

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ static TwoPhaseStateData *TwoPhaseState;
183183
staticGlobalTransactionMyLockedGxact=NULL;
184184

185185
staticbooltwophaseExitRegistered= false;
186+
staticTransactionIdcached_xid=InvalidTransactionId;
187+
staticGlobalTransactioncached_gxact=NULL;
188+
186189

187190
staticvoidRecordTransactionCommitPrepared(TransactionIdxid,
188191
intnchildren,
@@ -216,10 +219,10 @@ TwoPhaseShmemSize(void)
216219
/* Need the fixed struct, the array of pointers, and the GTD structs */
217220
size= offsetof(TwoPhaseStateData,prepXacts);
218221
size=add_size(size,mul_size(max_prepared_xacts,
219-
sizeof(GlobalTransaction)));
222+
sizeof(GlobalTransaction)*2));
220223
size=MAXALIGN(size);
221224
size=add_size(size,mul_size(max_prepared_xacts,
222-
sizeof(GlobalTransactionData)*2));
225+
sizeof(GlobalTransactionData)));
223226

224227
returnsize;
225228
}
@@ -247,9 +250,9 @@ TwoPhaseShmemInit(void)
247250
gxacts= (GlobalTransaction)
248251
((char*)TwoPhaseState+
249252
MAXALIGN(offsetof(TwoPhaseStateData,prepXacts)+
250-
sizeof(GlobalTransaction)*max_prepared_xacts));
253+
sizeof(GlobalTransaction)*2*max_prepared_xacts));
251254

252-
TwoPhaseState->hashTable=(GlobalTransaction*)&gxacts[max_prepared_xacts];
255+
TwoPhaseState->hashTable=&TwoPhaseState->prepXacts[max_prepared_xacts];
253256

254257
for (i=0;i<max_prepared_xacts;i++)
255258
{
@@ -438,6 +441,10 @@ MarkAsPreparing(TransactionId xid, const char *gid,
438441
proc->lwWaitMode=0;
439442
proc->waitLock=NULL;
440443
proc->waitProcLock=NULL;
444+
445+
cached_xid=xid;
446+
cached_gxact=gxact;
447+
441448
for (i=0;i<NUM_LOCK_PARTITIONS;i++)
442449
SHMQueueInit(&(proc->myProcLocks[i]));
443450
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -696,9 +703,7 @@ void SetPrepareTransactionState(char const* gid, char const* state)
696703
strcpy(gxact->state_3pc,state);
697704
EndPrepare(gxact);
698705

699-
/* Unlock GXact */
700-
gxact->locking_backend=InvalidBackendId;
701-
MyLockedGxact=NULL;
706+
PostPrepare_Twophase();
702707
}
703708

704709
/* Working status for pg_prepared_xact */
@@ -827,9 +832,6 @@ TwoPhaseGetGXact(TransactionId xid)
827832
GlobalTransactionresult=NULL;
828833
inti;
829834

830-
staticTransactionIdcached_xid=InvalidTransactionId;
831-
staticGlobalTransactioncached_gxact=NULL;
832-
833835
/*
834836
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
835837
* repeatedly for the same XID. We can save work with a simple cache.

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
13461346
*
13471347
* We currently can only decode a transaction's contents in when their commit
13481348
* record is read because that's currently the only place where we know about
1349-
* cacheinvalidations. Thus, once a toplevel commit is read, we iterate over
1349+
* cacheinvalidati ons. Thus, once a toplevel commit is read, we iterate over
13501350
* the top and subtransactions (using a k-way merge) and replay the changes in
13511351
* lsn order.
13521352
*/
@@ -1734,7 +1734,7 @@ ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
17341734
txn->origin_lsn=origin_lsn;
17351735
txn->xact_action=rb->xact_action;
17361736
strcpy(txn->gid,rb->gid);
1737-
*txn->gid='\0';
1737+
*txn->state_3pc='\0';
17381738

17391739
rb->commit(rb,txn,commit_lsn);
17401740
}

‎src/include/catalog/pg_proc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3057,7 +3057,7 @@ DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 1 1000 0 0 f f f f t t
30573057
DESCR("view system lock information");
30583058
DATA(insert OID = 2561 ( pg_blocking_pids PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 1007 "23" _null_ _null_ _null_ _null_ _null_ pg_blocking_pids _null_ _null_ _null_ ));
30593059
DESCR("get array of PIDs of sessions blocking specified backend PID");
3060-
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,0}" "{transaction,gid,prepared,ownerid,dbid,state_3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
3060+
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid,state3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
30613061
DESCR("view two-phase transactions");
30623062
DATA(insert OID = 3445 ( pg_precommit_prepared PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 2278 "2275,2275" _null_ _null_ _null_ _null_ _null_ pg_precommit_prepared _null_ _null_ _null_ ));
30633063
DESCR("alter state of prepared transaction (used for 3pc)");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp