Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0d9ed50

Browse files
knizhnikkelvich
authored andcommitted
2PC stuff
1 parent670cb78 commit0d9ed50

File tree

3 files changed

+28
-23
lines changed

3 files changed

+28
-23
lines changed

‎arbiter.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -455,9 +455,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
455455
}
456456
buf->used=0;
457457
}
458-
MTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
459-
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
460-
Assert(ts->cmd!=MSG_INVALID);
458+
MTM_TRACE("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
459+
ts->status==TRANSACTION_STATUS_ABORTED ?"abort" :"commit",ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
461460
buf->data[buf->used].code=ts->status==TRANSACTION_STATUS_ABORTED ?MSG_ABORTED :MSG_PREPARED;
462461
buf->data[buf->used].dxid=xid;
463462
buf->data[buf->used].sxid=ts->xid;
@@ -509,7 +508,6 @@ static void MtmTransSender(Datum arg)
509508

510509
staticvoidMtmWakeUpBackend(MtmTransState*ts)
511510
{
512-
ts->voteCompleted= true;
513511
MTM_TRACE("Wakeup backed procno=%d, pid=%d\n",ts->procno,ProcGlobal->allProcs[ts->procno].pid);
514512
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
515513
}
@@ -637,7 +635,6 @@ static void MtmTransReceiver(Datum arg)
637635
break;
638636
default:
639637
Assert(false);
640-
}
641638
}
642639
}
643640
MtmUnlock();

‎multimaster.c

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,15 @@ static TransactionId MtmAdjustOldestXid(TransactionId xid);
116116
staticboolMtmDetectGlobalDeadLock(PGPROC*proc);
117117
staticvoidMtmAddSubtransactions(MtmTransState*ts,TransactionId*subxids,intnSubxids);
118118
staticcharconst*MtmGetName(void);
119-
staticvoidMtmCheckClusterLock()
119+
staticvoidMtmCheckClusterLock(void);
120+
staticvoidMtmCheckSlots(void);
121+
staticvoidMtmAddSubtransactions(MtmTransState*ts,TransactionId*subxids,intnSubxids);
120122

121123
staticvoidMtmShmemStartup(void);
122124

123125
staticBgwPool*MtmPoolConstructor(void);
124126
staticboolMtmRunUtilityStmt(PGconn*conn,charconst*sql);
125127
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError);
126-
staticvoidMtmVoteForTransaction(MtmTransState*ts);
127128

128129
staticHTAB*xid2state;
129130
staticHTAB*gid2xid;
@@ -543,10 +544,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
543544
* Prepare transaction for two-phase commit.
544545
* This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
545546
*/
547+
staticvoid
546548
MtmPrePrepareTransaction(MtmCurrentTrans*x)
547549
{
548550
MtmTransState*ts;
549-
inti;
551+
TransactionId*subxids;
550552

551553
if (!x->isDistributed) {
552554
return;
@@ -575,9 +577,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
575577
ts->gtid=x->gtid;
576578
ts->procno=MyProc->pgprocno;
577579
ts->nVotes=0;
578-
580+
ts->nSubxids=xactGetCommittedChildren(&subxids);
579581
x->isPrepared= true;
580-
x->csn=csn;
582+
x->csn=ts->csn;
581583

582584
dtm->transCount+=1;
583585

@@ -588,34 +590,36 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
588590
ts->gtid.node=MtmNodeId;
589591
}
590592
MtmTransactionListAppend(ts);
593+
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
591594

592595
MtmUnlock();
593596

594597
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",MyProcPid,x->xid,ts->csn);
595598
}
596599

600+
staticvoid
597601
MtmPrepareTransaction(MtmCurrentTrans*x)
598602
{
599603
MtmTransState*ts;
600604

601605
MtmLock(LW_EXCLUSIVE);
602606
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
603607
Assert(ts!=NULL);
604-
if (ts->status=TRANSACTION_STATUS_IN_PROGRESS) {
608+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
605609
ts->status=TRANSACTION_STATUS_UNKNOWN;
606610
MtmAdjustSubtransactions(ts);
607611
}
608612

609613
if (!MtmIsCoordinator(ts)) {
610-
MtmHashMap*hm= (MtmHashMap*)hash_search(gid2xid,x->gid,HASH_ENTER,NULL);
614+
MtmTransMap*hm= (MtmTransMap*)hash_search(gid2xid,x->gid,HASH_ENTER,NULL);
611615
Assert(x->gid[0]);
612616
hm->state=ts;
613617
MtmSendNotificationMessage(ts);/* send notification to coordinator */
614618
MtmUnlock();
615619
}else {
616620
/* wait N commits or just one ABORT */
617-
ts->nVotes+=1;
618-
while (ts->nVotes!=dtm->nNodes&&ts->status==TRANSACTION_STATUS_PROGRESS) {
621+
ts->nVotes+=1;/* I vote myself */
622+
while (ts->nVotes!=dtm->nNodes&&ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
619623
MtmUnlock();
620624
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
621625
ResetLatch(&MyProc->procLatch);
@@ -633,14 +637,14 @@ static void
633637
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
634638
{
635639
MTM_TRACE("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n",MyProcPid,x->xid,x->isPrepared,x->isDistributed,commit ?"commit" :"abort");
636-
if (x->isDistributed) {
640+
if (x->isDistributed&& (TransactionIdIsValid(x->xid)||x->isReplicated)) {
637641
MtmTransState*ts;
638642
MtmLock(LW_EXCLUSIVE);
639643
if (x->isPrepared) {
640644
ts=hash_search(xid2state,&x->xid,HASH_FIND,NULL);
641645
Assert(ts!=NULL);
642646
}else {
643-
MtmHashMap*hm= (MtmHashMap*)hash_search(gid2xid,x->gid,HASH_REMOVE,NULL);
647+
MtmTransMap*hm= (MtmTransMap*)hash_search(gid2xid,x->gid,HASH_REMOVE,NULL);
644648
Assert(hm!=NULL);
645649
ts=hm->state;
646650
}
@@ -712,12 +716,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
712716

713717
voidMtmSetCurrentTransactionGID(charconst*gid)
714718
{
719+
MTM_TRACE("Set current transaction GID %s\n",gid);
715720
strcpy(dtmTx.gid,gid);
721+
dtmTx.isDistributed= true;
722+
dtmTx.isReplicated= true;
716723
}
717724

718725
voidMtmSetCurrentTransactionCSN(csn_tcsn)
719726
{
727+
MTM_TRACE("Set current transaction CSN %ld\n",csn);
720728
dtmTx.csn=csn;
729+
dtmTx.isDistributed= true;
730+
dtmTx.isReplicated= true;
721731
}
722732

723733
/*
@@ -731,7 +741,8 @@ void MtmSetCurrentTransactionCSN(csn_t csn)
731741
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
732742
* WAL overflow
733743
*/
734-
staticvoidMtmCheckSlots()
744+
staticvoid
745+
MtmCheckSlots()
735746
{
736747
if (MtmMaxRecoveryLag!=0&&dtm->disabledNodeMask!=0)
737748
{
@@ -1682,14 +1693,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16821693
{
16831694
caseTRANS_STMT_COMMIT:
16841695
if (dtmTx.isDistributed&&dtmTx.containsDML) {
1685-
chargid{MUTLIMASTER_MAX_GID_SIZE];
1686-
MtmGenerateGid(&gid);
1696+
chargid[MULTIMASTER_MAX_GID_SIZE];
1697+
MtmGenerateGid(gid);
16871698
if (!IsTransactionBlock()) {
16881699
elog(WARNING,"Start transaction block for %d",dtmTx.xid);
16891700
CommitTransactionCommand();
16901701
StartTransactionCommand();
16911702
}
1692-
if (!PrepareTransactionBlock(&gid))
1703+
if (!PrepareTransactionBlock(gid))
16931704
{
16941705
elog(WARNING,"Failed to prepare transaction %s",gid);
16951706
/* report unsuccessful commit in completionTag */

‎pglogical_apply.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,6 @@ process_remote_commit(StringInfo in)
478478
pq_getmsgint64(in);/* end_lsn */
479479
pq_getmsgint64(in);/* commit_time */
480480

481-
if (PGLOGICAL_XACT_EVENT(flags)!=PGLOGICAL_COMMIT)
482-
gid=pq_getmsgstring(in);
483-
484481
MTM_TRACE("PGLOGICAL_RECV commit: flags=%d, gid=%s\n",flags,gid);
485482

486483
switch(PGLOGICAL_XACT_EVENT(flags))

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp