Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit59e22f8

Browse files
committed
Add 3PC support
1 parentadd9eaf commit59e22f8

File tree

17 files changed

+346
-95
lines changed

17 files changed

+346
-95
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
536536
nodemask_tsave_mask=busy_mask;
537537
BIT_SET(busy_mask,node);
538538
while (true) {
539+
#if0
539540
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
540541
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
541542
close(sockets[node]);
542543
sockets[node]=-1;
543544
}
545+
#endif
544546
if (BIT_CHECK(Mtm->reconnectMask,node)) {
545547
MtmLock(LW_EXCLUSIVE);
546548
BIT_CLEAR(Mtm->reconnectMask,node);
@@ -872,7 +874,8 @@ static void MtmReceiver(Datum arg)
872874
}
873875

874876
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,rxBuffer[i].size-rxBuffer[i].used);
875-
if (rc <=0) {
877+
if (rc <=0) {
878+
MTM_LOG1("Failed to read response from node %d",i+1);
876879
continue;
877880
}
878881

@@ -940,6 +943,8 @@ static void MtmReceiver(Datum arg)
940943
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
941944
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
942945
MtmFinishPreparedTransaction(ts, true);
946+
}else {
947+
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx",msg->gid,msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
943948
}
944949
}else {
945950
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
@@ -1009,7 +1014,8 @@ static void MtmReceiver(Datum arg)
10091014
}elseif (MtmUseDtm) {
10101015
ts->votedMask=0;
10111016
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
1012-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1017+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018+
SetPrepareTransactionState(ts->gid,"precommitted");
10131019
}else {
10141020
ts->status=TRANSACTION_STATUS_UNKNOWN;
10151021
MtmWakeUpBackend(ts);
@@ -1056,6 +1062,7 @@ static void MtmReceiver(Datum arg)
10561062
}else {
10571063
switch (msg->code) {
10581064
caseMSG_PRECOMMIT:
1065+
Assert(false);// Now send through pglogical
10591066
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
10601067
ts->status=TRANSACTION_STATUS_UNKNOWN;
10611068
ts->csn=MtmAssignCSN();

‎contrib/mmts/bgwpool.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
boolMtmIsLogicalReceiver;
1818
intMtmMaxWorkers;
1919

20-
staticBgwPool*pool;
20+
staticBgwPool*MtmPool;
2121

2222
staticvoidBgwShutdownWorker(intsig)
2323
{
24-
BgwPoolStop(pool);
24+
if (MtmPool) {
25+
BgwPoolStop(MtmPool);
26+
}
2527
}
2628

2729
staticvoidBgwPoolMainLoop(BgwPool*pool)
@@ -32,6 +34,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3234
sigset_tsset;
3335

3436
MtmIsLogicalReceiver= true;
37+
MtmPool=pool;
3538

3639
signal(SIGINT,BgwShutdownWorker);
3740
signal(SIGQUIT,BgwShutdownWorker);
@@ -88,6 +91,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8891

8992
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,charconst*dbuser,size_tqueueSize,size_tnWorkers)
9093
{
94+
MtmPool=pool;
9195
pool->queue= (char*)ShmemAlloc(queueSize);
9296
pool->executor=executor;
9397
PGSemaphoreCreate(&pool->available);

‎contrib/mmts/multimaster.c

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
421421

422422
MtmLock(LW_SHARED);
423423
if (Mtm->status==MTM_ONLINE) {
424-
MtmTransState*ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
424+
MtmTransState*ts=(MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
425425
if (ts!=NULL&& !ts->isLocal) {
426426
snapshot=ts->snapshot;
427427
Assert(ts->gtid.node==MtmNodeId||MtmIsRecoverySession);
@@ -811,7 +811,7 @@ static MtmTransState*
811811
MtmCreateTransState(MtmCurrentTrans*x)
812812
{
813813
boolfound;
814-
MtmTransState*ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
814+
MtmTransState*ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
815815
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
816816
ts->snapshot=x->snapshot;
817817
ts->isLocal= true;
@@ -864,6 +864,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
864864
/* Do not take in account bg-workers which are performing recovery */
865865
elog(ERROR,"Abort current transaction because this cluster node is in %s status",MtmNodeStatusMnem[Mtm->status]);
866866
}
867+
if (TransactionIdIsValid(x->gtid.xid)&&BIT_CHECK(Mtm->disabledNodeMask,x->gtid.node-1)) {
868+
/* Coordinator of transaction is disabled: just abort transaction without any further steps */
869+
elog(ERROR,"Abort transaction %d because it's coordinator %d was disabled",x->xid,x->gtid.node);
870+
}
867871

868872
MtmLock(LW_EXCLUSIVE);
869873

@@ -925,6 +929,32 @@ bool MtmWatchdog(timestamp_t now)
925929
returnallAlive;
926930
}
927931

932+
/*
933+
* Mark transaction as precommitted
934+
*/
935+
voidMtmPrecommitTransaction(charconst*gid)
936+
{
937+
MtmLock(LW_EXCLUSIVE);
938+
{
939+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_FIND,NULL);
940+
if (tm==NULL) {
941+
elog(WARNING,"MtmPrecommitTransaction: transaciton '%s' is not found",gid);
942+
}else {
943+
MtmTransState*ts=tm->state;
944+
Assert(ts!=NULL);
945+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
946+
ts->status=TRANSACTION_STATUS_UNKNOWN;
947+
ts->csn=MtmAssignCSN();
948+
MtmAdjustSubtransactions(ts);
949+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
950+
}
951+
}
952+
MtmUnlock();
953+
}
954+
955+
956+
957+
928958

929959
staticbool
930960
MtmVotingCompleted(MtmTransState*ts)
@@ -949,7 +979,8 @@ MtmVotingCompleted(MtmTransState* ts)
949979
return true;
950980
}elseif (MtmUseDtm) {
951981
ts->votedMask=0;
952-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
982+
SetPrepareTransactionState(ts->gid,"precommitted");
983+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
953984
return false;
954985
}else {
955986
ts->status=TRANSACTION_STATUS_UNKNOWN;
@@ -969,7 +1000,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9691000
intnConfigChanges=Mtm->nConfigChanges;
9701001
timestamp_tprepareTime=ts->csn-ts->snapshot;
9711002
timestamp_ttimeout=Max(prepareTime+MSEC_TO_USEC(MtmMin2PCTimeout),prepareTime*MtmMax2PCRatio/100);
972-
timestamp_tdeadline=MtmGetSystemTime()+timeout;
1003+
timestamp_tstart=MtmGetSystemTime();
1004+
timestamp_tdeadline=start+timeout;
9731005
timestamp_tnow;
9741006

9751007
Assert(ts->csn>ts->snapshot);
@@ -994,7 +1026,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9941026
if (now>deadline) {
9951027
if (ts->isPrepared) {
9961028
/* resend precommit message */
997-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1029+
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1030+
elog(LOG,"Distributes transaction is not committed in %ld msec",USEC_TO_MSEC(now-start));
9981031
}else {
9991032
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(timeout));
10001033
MtmAbortTransaction(ts);
@@ -1037,15 +1070,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10371070
elog(ERROR,"ERROR INJECTION for transaction %d (%s)",x->xid,x->gid);
10381071
}
10391072
MtmLock(LW_EXCLUSIVE);
1040-
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
1073+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
10411074
Assert(ts!=NULL);
10421075
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10431076
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
10441077
MTM_TXTRACE(x,"recovery?");
10451078
Assert(x->gid[0]);
10461079
ts->votingCompleted= true;
10471080
MTM_TXTRACE(x,"recovery? 1");
1048-
if (Mtm->status!=MTM_RECOVERY||Mtm->recoverySlot!=MtmReplicationNodeId) {
1081+
if (Mtm->status!=MTM_RECOVERY/* || Mtm->recoverySlot != MtmReplicationNodeId*/) {
10491082
MTM_TXTRACE(x,"recovery? 2");
10501083
MtmSend2PCMessage(ts,MSG_PREPARED);/* send notification to coordinator */
10511084
if (!MtmUseDtm) {
@@ -1102,7 +1135,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11021135
ts->votedMask=0;
11031136
ts->procno=MyProc->pgprocno;
11041137
MTM_TXTRACE(ts,"Coordinator sends MSG_PRECOMMIT");
1105-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1138+
SetPrepareTransactionState(ts->gid,"precommitted");
1139+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
11061140

11071141
Mtm2PCVoting(x,ts);
11081142

@@ -1159,7 +1193,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11591193
MtmTransState*ts=NULL;
11601194
MtmLock(LW_EXCLUSIVE);
11611195
if (x->isPrepared) {
1162-
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
1196+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
11631197
Assert(ts!=NULL);
11641198
Assert(strcmp(x->gid,ts->gid)==0);
11651199
}elseif (x->gid[0]) {
@@ -1211,7 +1245,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12111245
if (ts==NULL) {
12121246
boolfound;
12131247
Assert(TransactionIdIsValid(x->xid));
1214-
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
1248+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
12151249
if (!found) {
12161250
ts->isEnqueued= false;
12171251
ts->isActive= false;
@@ -1321,6 +1355,53 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
13211355
}
13221356
}
13231357

1358+
/*
1359+
* Restore state of recovered prepared transaction in memory
1360+
*/
1361+
staticvoidMtmLoadPreparedTransactions(void)
1362+
{
1363+
PreparedTransactionpxacts;
1364+
intn=GetPreparedTransactions(&pxacts);
1365+
inti;
1366+
1367+
for (i=0;i<n;i++) {
1368+
boolfound;
1369+
charconst*gid=pxacts[i].gid;
1370+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_ENTER,&found);
1371+
if (!found) {
1372+
TransactionIdxid=GetNewTransactionId(false);
1373+
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_ENTER,&found);
1374+
MTM_LOG1("Recover prepared transaction %s xid %d",gid,xid);
1375+
MyPgXact->xid=InvalidTransactionId;/* dirty hack:((( */
1376+
Assert(!found);
1377+
Mtm->nActiveTransactions+=1;
1378+
ts->isEnqueued= false;
1379+
ts->isActive= true;
1380+
ts->status=strcmp(pxacts[i].state_3pc,"precommitted")==0 ?TRANSACTION_STATUS_UNKNOWN :TRANSACTION_STATUS_IN_PROGRESS;
1381+
ts->isLocal= true;
1382+
ts->isPrepared= false;
1383+
ts->isPinned= false;
1384+
ts->snapshot=INVALID_CSN;
1385+
ts->isTwoPhase= false;
1386+
ts->csn=0;/* should be replaced with real CSN by poll result */
1387+
ts->gtid.node=MtmNodeId;
1388+
ts->gtid.xid=xid;
1389+
ts->nSubxids=0;
1390+
ts->votingCompleted= true;
1391+
ts->participantsMask= (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask& ~((nodemask_t)1 << (MtmNodeId-1));
1392+
ts->votedMask=0;
1393+
strcpy(ts->gid,gid);
1394+
MtmTransactionListAppend(ts);
1395+
tm->status=ts->status;
1396+
tm->state=ts;
1397+
MtmBroadcastPollMessage(ts);
1398+
}
1399+
}
1400+
MTM_LOG1("Recover %d prepared transactions",n);
1401+
if (pxacts) {
1402+
pfree(pxacts);
1403+
}
1404+
}
13241405

13251406
staticvoidMtmStartRecovery()
13261407
{
@@ -2084,6 +2165,7 @@ static void MtmCheckControlFile(void)
20842165
}
20852166
}
20862167

2168+
20872169
staticvoidMtmInitialize()
20882170
{
20892171
boolfound;
@@ -2120,6 +2202,7 @@ static void MtmInitialize()
21202202
Mtm->nConfigChanges=0;
21212203
Mtm->recoveryCount=0;
21222204
Mtm->localTablesHashLoaded= false;
2205+
Mtm->preparedTransactionsLoaded= false;
21232206
Mtm->inject2PCError=0;
21242207
Mtm->sendQueue=NULL;
21252208
Mtm->freeQueue=NULL;
@@ -2923,6 +3006,13 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29233006
MtmReplicationModemode=REPLMODE_OPEN_EXISTED;
29243007

29253008
MtmLock(LW_EXCLUSIVE);
3009+
3010+
if (!Mtm->preparedTransactionsLoaded)
3011+
{
3012+
MtmLoadPreparedTransactions();
3013+
Mtm->preparedTransactionsLoaded= true;
3014+
}
3015+
29263016
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
29273017
{
29283018
if (*shutdown)
@@ -3402,7 +3492,7 @@ mtm_get_csn(PG_FUNCTION_ARGS)
34023492
csn_tcsn=INVALID_CSN;
34033493

34043494
MtmLock(LW_SHARED);
3405-
ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
3495+
ts=(MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
34063496
if (ts!=NULL) {
34073497
csn=ts->csn;
34083498
}
@@ -3926,14 +4016,12 @@ static inline void MtmGucUpdate(const char *key, char *value)
39264016
MtmGucEntry*hentry;
39274017
boolfound;
39284018

3929-
hentry=hash_search(MtmGucHash,key,HASH_FIND,&found);
4019+
hentry=(MtmGucEntry*)hash_search(MtmGucHash,key,HASH_ENTER,&found);
39304020
if (found)
39314021
{
39324022
pfree(hentry->value);
39334023
dlist_delete(&hentry->list_node);
39344024
}
3935-
3936-
hentry=hash_search(MtmGucHash,key,HASH_ENTER,NULL);
39374025
hentry->value=value;
39384026
dlist_push_tail(&MtmGucList,&hentry->list_node);
39394027
}
@@ -3943,7 +4031,7 @@ static inline void MtmGucRemove(const char *key)
39434031
MtmGucEntry*hentry;
39444032
boolfound;
39454033

3946-
hentry=hash_search(MtmGucHash,key,HASH_FIND,&found);
4034+
hentry=(MtmGucEntry*)hash_search(MtmGucHash,key,HASH_FIND,&found);
39474035
if (found)
39484036
{
39494037
pfree(hentry->value);
@@ -4531,7 +4619,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
45314619
}
45324620

45334621
staticbool
4534-
MtmDetectGlobalDeadLockFortXid(TransactionIdxid)
4622+
MtmDetectGlobalDeadLockForXid(TransactionIdxid)
45354623
{
45364624
boolhasDeadlock= false;
45374625
if (TransactionIdIsValid(xid)) {
@@ -4587,11 +4675,11 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
45874675

45884676
MTM_LOG1("Detect global deadlock for %d by backend %d",pgxact->xid,MyProcPid);
45894677

4590-
returnMtmDetectGlobalDeadLockFortXid(pgxact->xid);
4678+
returnMtmDetectGlobalDeadLockForXid(pgxact->xid);
45914679
}
45924680

45934681
Datummtm_check_deadlock(PG_FUNCTION_ARGS)
45944682
{
45954683
TransactionIdxid=PG_GETARG_INT32(0);
4596-
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4684+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockForXid(xid));
45974685
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp