Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4bf43fa

Browse files
knizhnikkelvich
authored andcommitted
Start rewriting multimaster to 2PC
1 parente7c5d11 commit4bf43fa

File tree

3 files changed

+49
-150
lines changed

3 files changed

+49
-150
lines changed

‎arbiter.c

Lines changed: 14 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
458458
MTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
459459
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
460460
Assert(ts->cmd!=MSG_INVALID);
461-
buf->data[buf->used].code=ts->cmd;
461+
buf->data[buf->used].code=ts->status==TRANSACTION_STATUS_ABORTED ?MSG_ABORTED :MSG_PREPARED;
462462
buf->data[buf->used].dxid=xid;
463463
buf->data[buf->used].sxid=ts->xid;
464464
buf->data[buf->used].csn=ts->csn;
@@ -467,21 +467,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
467467
buf->used+=1;
468468
}
469469

470-
staticvoidMtmBroadcastMessage(MtmBuffer*txBuffer,MtmTransState*ts)
471-
{
472-
inti;
473-
intn=1;
474-
for (i=0;i<MtmNodes;i++)
475-
{
476-
if (TransactionIdIsValid(ts->xids[i])) {
477-
Assert(i+1!=MtmNodeId);
478-
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
479-
n+=1;
480-
}
481-
}
482-
Assert(n==ds->nNodes);
483-
}
484-
485470
staticvoidMtmTransSender(Datumarg)
486471
{
487472
intnNodes=MtmNodes;
@@ -508,12 +493,7 @@ static void MtmTransSender(Datum arg)
508493
MtmLock(LW_SHARED);
509494

510495
for (ts=ds->votingTransactions;ts!=NULL;ts=ts->nextVoting) {
511-
if (MtmIsCoordinator(ts)) {
512-
MtmBroadcastMessage(txBuffer,ts);
513-
}else {
514-
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
515-
}
516-
ts->cmd=MSG_INVALID;
496+
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
517497
}
518498
ds->votingTransactions=NULL;
519499
MtmUnlock();
@@ -634,109 +614,31 @@ static void MtmTransReceiver(Datum arg)
634614
MtmArbiterMessage*msg=&rxBuffer[i].data[j];
635615
MtmTransState*ts= (MtmTransState*)hash_search(xid2state,&msg->dxid,HASH_FIND,NULL);
636616
Assert(ts!=NULL);
637-
Assert(ts->cmd==MSG_INVALID);
638617
Assert(msg->node>0&&msg->node <=nNodes&&msg->node!=MtmNodeId);
639-
ts->xids[msg->node-1]=msg->sxid;
640-
641-
if (MtmIsCoordinator(ts)) {
642-
switch (msg->code) {
643-
caseMSG_READY:
644-
Assert(ts->status==TRANSACTION_STATUS_ABORTED||ts->status==TRANSACTION_STATUS_IN_PROGRESS);
645-
Assert(ts->nVotes<ds->nNodes);
646-
ds->nodeTransDelay[msg->node-1]+=MtmGetCurrentTime()-ts->csn;
647-
if (++ts->nVotes==ds->nNodes) {
648-
/* All nodes are finished their transactions */
649-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
650-
ts->nVotes=1;/* I voted myself */
651-
ts->cmd=MSG_PREPARE;
652-
}else {
653-
ts->status=TRANSACTION_STATUS_ABORTED;
654-
ts->cmd=MSG_ABORT;
655-
MtmAdjustSubtransactions(ts);
656-
MtmWakeUpBackend(ts);
657-
}
658-
MtmSendNotificationMessage(ts);
659-
}
660-
break;
618+
Assert (MtmIsCoordinator(ts));
619+
switch (msg->code) {
661620
caseMSG_PREPARED:
662-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
663-
Assert(ts->nVotes<ds->nNodes);
664-
if (msg->csn>ts->csn) {
665-
ts->csn=msg->csn;
666-
MtmSyncClock(ts->csn);
667-
}
668-
if (++ts->nVotes==ds->nNodes) {
669-
/* ts->csn is maximum of CSNs at all nodes */
670-
ts->nVotes=1;/* I voted myself */
671-
ts->cmd=MSG_COMMIT;
672-
ts->csn=MtmAssignCSN();
673-
ts->status=TRANSACTION_STATUS_UNKNOWN;
674-
MtmAdjustSubtransactions(ts);
675-
MtmSendNotificationMessage(ts);
676-
}
677-
break;
678-
caseMSG_COMMITTED:
679-
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
680-
Assert(ts->nVotes<ds->nNodes);
681-
if (++ts->nVotes==ds->nNodes) {
682-
/* All nodes have the same CSN */
683-
MtmWakeUpBackend(ts);
621+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
622+
if (msg->csn>ts->csn) {
623+
ts->csn=msg->csn;
624+
MtmSyncClock(ts->csn);
625+
}
626+
if (++ts->nVotes==ds->nNodes) {
627+
MtmWakeUpBackend(ts);
628+
}
684629
}
685630
break;
686631
caseMSG_ABORTED:
687-
Assert(ts->status==TRANSACTION_STATUS_ABORTED||ts->status==TRANSACTION_STATUS_IN_PROGRESS);
688-
Assert(ts->nVotes<ds->nNodes);
689-
ts->status=TRANSACTION_STATUS_ABORTED;
690-
if (++ts->nVotes==ds->nNodes) {
691-
ts->cmd=MSG_ABORT;
692-
MtmAdjustSubtransactions(ts);
693-
MtmSendNotificationMessage(ts);
694-
MtmWakeUpBackend(ts);
695-
}
696-
break;
697-
default:
698-
Assert(false);
699-
}
700-
}else {/* replica */
701-
switch (msg->code) {
702-
caseMSG_PREPARE:
703-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
704-
if ((msg->disabledNodeMask& ~ds->disabledNodeMask)!=0) {
705-
/* Coordinator's disabled mask is wider than my: so reject such transaction to avoid
706-
commit on smaller subset of nodes */
632+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
707633
ts->status=TRANSACTION_STATUS_ABORTED;
708-
ts->cmd=MSG_ABORT;
709-
MtmAdjustSubtransactions(ts);
710-
MtmWakeUpBackend(ts);
711-
}else {
712-
ts->status=TRANSACTION_STATUS_UNKNOWN;
713-
ts->csn=MtmAssignCSN();
714-
ts->cmd=MSG_PREPARED;
715-
}
716-
MtmSendNotificationMessage(ts);
717-
break;
718-
caseMSG_COMMIT:
719-
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
720-
Assert(ts->csn<msg->csn);
721-
ts->csn=msg->csn;
722-
MtmSyncClock(ts->csn);
723-
ts->cmd=MSG_COMMITTED;
724-
MtmAdjustSubtransactions(ts);
725-
MtmSendNotificationMessage(ts);
726-
MtmWakeUpBackend(ts);
727-
break;
728-
caseMSG_ABORT:
729-
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
730-
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN||ts->status==TRANSACTION_STATUS_IN_PROGRESS);
731-
ts->status=TRANSACTION_STATUS_ABORTED;
732634
MtmAdjustSubtransactions(ts);
733635
MtmWakeUpBackend(ts);
734636
}
735637
break;
736638
default:
737639
Assert(false);
738640
}
739-
}
641+
}
740642
}
741643
MtmUnlock();
742644

‎multimaster.c

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ typedef struct {
6464
boolisReplicated;/* transaction on replica */
6565
boolisDistributed;/* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666
boolcontainsDML;/* transaction contains DML statements */
67-
boolisPrepared;/* transaction is prepared as part of 2PC */
6867
csn_tsnapshot;/* transaction snaphsot */
6968
}MtmCurrentTrans;
7069

@@ -100,8 +99,8 @@ static void MtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
10099
staticvoidMtmInitialize(void);
101100
staticvoidMtmXactCallback(XactEventevent,void*arg);
102101
staticvoidMtmBeginTransaction(MtmCurrentTrans*x);
103-
staticvoidMtmPrecommitTransaction(MtmCurrentTrans*x);
104102
staticboolMtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids);
103+
staticvoidMtmPrePrepareTransaction(MtmCurrentTrans*x);
105104
staticvoidMtmPrepareTransaction(MtmCurrentTrans*x);
106105
staticvoidMtmCommitPreparedTransaction(MtmCurrentTrans*x);
107106
staticvoidMtmEndTransaction(MtmCurrentTrans*x,boolcommit);
@@ -347,15 +346,6 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
347346
returnPgXidInMVCCSnapshot(xid,snapshot);
348347
}
349348

350-
staticuint32MtmXidHashFunc(constvoid*key,Sizekeysize)
351-
{
352-
return (uint32)*(TransactionId*)key;
353-
}
354-
355-
staticintMtmXidMatchFunc(constvoid*key1,constvoid*key2,Sizekeysize)
356-
{
357-
return*(TransactionId*)key1-*(TransactionId*)key2;
358-
}
359349

360350
staticvoidMtmTransactionListAppend(MtmTransState*ts)
361351
{
@@ -489,8 +479,8 @@ MtmXactCallback(XactEvent event, void *arg)
489479
caseXACT_EVENT_START:
490480
MtmBeginTransaction(&dtmTx);
491481
break;
492-
caseXACT_EVENT_PRE_COMMIT:
493-
MtmPrecommitTransaction(&dtmTx);
482+
caseXACT_EVENT_PRE_PREPARE:
483+
MtmPrePrepareTransaction(&dtmTx);
494484
break;
495485
caseXACT_EVENT_PREPARE:
496486
MtmPrepareTransaction(&dtmTx);
@@ -598,16 +588,14 @@ MtmCheckClusterLock()
598588
}
599589

600590
/*
601-
* This functions is called as pre-commit callback.
602-
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
603-
* before commit
591+
* Prepare transaction for two-phase commit
604592
*/
605-
staticvoidMtmPrecommitTransaction(MtmCurrentTrans*x)
593+
MtmPrePrepareTransaction(MtmCurrentTrans*x)
606594
{
607595
MtmTransState*ts;
608596
inti;
609597

610-
if (!x->isDistributed||x->isPrepared) {
598+
if (!x->isDistributed) {
611599
return;
612600
}
613601

@@ -632,7 +620,6 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
632620
ts->snapshot=x->isReplicated|| !x->containsDML ?INVALID_CSN :x->snapshot;
633621
ts->csn=MtmAssignCSN();
634622
ts->gtid=x->gtid;
635-
ts->cmd=MSG_INVALID;
636623
ts->procno=MyProc->pgprocno;
637624
ts->nVotes=0;
638625
ts->voteCompleted= false;
@@ -644,24 +631,41 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
644631
ts->gtid.xid=x->xid;
645632
ts->gtid.node=MtmNodeId;
646633
}
647-
for (i=0;i<MtmNodes;i++) {
648-
ts->xids[i]=InvalidTransactionId;
649-
}
650634
MtmTransactionListAppend(ts);
651635

652636
MtmUnlock();
653637

654638
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",MyProcPid,x->xid,ts->csn);
655639
}
656640

657-
staticvoid
658641
MtmPrepareTransaction(MtmCurrentTrans*x)
659-
{
660-
MtmPrecommitTransaction(x);
661-
MTM_TRACE("Prepare transaction %d",x->xid);
662-
x->isPrepared= true;
642+
{
643+
MtmLock(LW_EXCLUSIVE);
644+
if (ts->status=TRANSACTION_STATUS_IN_PROGRESS) {
645+
ts->status=TRANSACTION_STATUS_UNKNOWN;
646+
MtmAdjustSubtransactions(ts);
647+
}
648+
649+
if (!MtmIsCoordinator(ts)) {
650+
MtmSendNotificationMessage(ts);/* send notification to coordinator */
651+
MtmUnlock();
652+
}else {
653+
/* wait N commits or just one ABORT */
654+
ts->nVotes+=1;
655+
while (ts->nVotes!=dtm->nNodes&&ts->status==TRANSACTION_STATUS_PROGRESS) {
656+
MtmUnlock();
657+
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
658+
ResetLatch(&MyProc->procLatch);
659+
MtmLock(LW_SHARED);
660+
}
661+
MtmUnlock();
662+
if (ts->status==TRANSACTION_STATUS_ABORTED) {
663+
elog(ERROR,"Distributed transaction %d is rejected by DTM",x->xid);
664+
}
665+
}
663666
}
664667

668+
665669
staticvoid
666670
MtmCommitPreparedTransaction(MtmCurrentTrans*x)
667671
{
@@ -1679,14 +1683,12 @@ HTAB* MtmCreateHash(void)
16791683
Assert(MtmNodes>0);
16801684
memset(&info,0,sizeof(info));
16811685
info.keysize=sizeof(TransactionId);
1682-
info.entrysize=sizeof(MtmTransState)+ (MtmNodes-1)*sizeof(TransactionId);
1683-
info.hash=MtmXidHashFunc;
1684-
info.match=MtmXidMatchFunc;
1686+
info.entrysize=sizeof(MtmTransState);
16851687
htab=ShmemInitHash(
16861688
"xid2state",
16871689
MTM_HASH_SIZE,MTM_HASH_SIZE,
16881690
&info,
1689-
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
1691+
HASH_ELEM
16901692
);
16911693
returnhtab;
16921694
}

‎multimaster.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
2020
#defineMULTIMASTER_MIN_PROTO_VERSION 1
2121
#defineMULTIMASTER_MAX_PROTO_VERSION 1
22+
#defineMULTIMASTER_MAX_GID 32
2223

2324
#defineUSEC 1000000
2425

@@ -51,12 +52,7 @@ typedef enum
5152
{
5253
MSG_INVALID,
5354
MSG_HANDSHAKE,
54-
MSG_READY,
55-
MSG_PREPARE,
56-
MSG_COMMIT,
57-
MSG_ABORT,
5855
MSG_PREPARED,
59-
MSG_COMMITTED,
6056
MSG_ABORTED,
6157
MSG_STATUS
6258
}MtmMessageCode;
@@ -88,11 +84,10 @@ typedef struct MtmTransState
8884
finally should be nNodes-1 */
8985
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
9086
used to notify coordinator by arbiter */
91-
MtmMessageCodecmd;/*Notification message codetobe sent */
87+
boolvoteCompleted;/*Responses necessarytomake a decision are received by coordinator of transaction */
9288
intnSubxids;/* Number of subtransanctions */
9389
structMtmTransState*nextVoting;/* Next element in L1-list of voting transactions. */
9490
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */
95-
boolvoteCompleted;/* Responses necessary to make a decision are received by coordinator of transaction */
9691
TransactionIdxids[1];/* transaction ID at replicas: varying size MtmNodes */
9792
}MtmTransState;
9893

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp