Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0a3c6c0

Browse files
knizhnikkelvich
authored andcommitted
Add 3PC support
1 parentdb108a1 commit0a3c6c0

File tree

8 files changed

+157
-42
lines changed

8 files changed

+157
-42
lines changed

‎arbiter.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,13 @@ static bool MtmSendToNode(int node, void const* buf, int size)
536536
nodemask_tsave_mask=busy_mask;
537537
BIT_SET(busy_mask,node);
538538
while (true) {
539+
#if0
539540
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
540541
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
541542
close(sockets[node]);
542543
sockets[node]=-1;
543544
}
545+
#endif
544546
if (BIT_CHECK(Mtm->reconnectMask,node)) {
545547
MtmLock(LW_EXCLUSIVE);
546548
BIT_CLEAR(Mtm->reconnectMask,node);
@@ -872,7 +874,8 @@ static void MtmReceiver(Datum arg)
872874
}
873875

874876
rc=MtmReadFromNode(i, (char*)rxBuffer[i].data+rxBuffer[i].used,rxBuffer[i].size-rxBuffer[i].used);
875-
if (rc <=0) {
877+
if (rc <=0) {
878+
MTM_LOG1("Failed to read response from node %d",i+1);
876879
continue;
877880
}
878881

@@ -940,6 +943,8 @@ static void MtmReceiver(Datum arg)
940943
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
941944
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
942945
MtmFinishPreparedTransaction(ts, true);
946+
}else {
947+
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx",msg->gid,msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
943948
}
944949
}else {
945950
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
@@ -1009,7 +1014,8 @@ static void MtmReceiver(Datum arg)
10091014
}elseif (MtmUseDtm) {
10101015
ts->votedMask=0;
10111016
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
1012-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1017+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018+
SetPrepareTransactionState(ts->gid,"precommitted");
10131019
}else {
10141020
ts->status=TRANSACTION_STATUS_UNKNOWN;
10151021
MtmWakeUpBackend(ts);
@@ -1056,6 +1062,7 @@ static void MtmReceiver(Datum arg)
10561062
}else {
10571063
switch (msg->code) {
10581064
caseMSG_PRECOMMIT:
1065+
Assert(false);// Now send through pglogical
10591066
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
10601067
ts->status=TRANSACTION_STATUS_UNKNOWN;
10611068
ts->csn=MtmAssignCSN();

‎bgwpool.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
boolMtmIsLogicalReceiver;
1818
intMtmMaxWorkers;
1919

20-
staticBgwPool*pool;
20+
staticBgwPool*MtmPool;
2121

2222
staticvoidBgwShutdownWorker(intsig)
2323
{
24-
BgwPoolStop(pool);
24+
if (MtmPool) {
25+
BgwPoolStop(MtmPool);
26+
}
2527
}
2628

2729
staticvoidBgwPoolMainLoop(BgwPool*pool)
@@ -32,6 +34,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3234
sigset_tsset;
3335

3436
MtmIsLogicalReceiver= true;
37+
MtmPool=pool;
3538

3639
signal(SIGINT,BgwShutdownWorker);
3740
signal(SIGQUIT,BgwShutdownWorker);
@@ -88,6 +91,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8891

8992
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,charconst*dbuser,size_tqueueSize,size_tnWorkers)
9093
{
94+
MtmPool=pool;
9195
pool->queue= (char*)ShmemAlloc(queueSize);
9296
pool->executor=executor;
9397
PGSemaphoreCreate(&pool->available);

‎multimaster.c

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
416416

417417
MtmLock(LW_SHARED);
418418
if (Mtm->status==MTM_ONLINE) {
419-
MtmTransState*ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
419+
MtmTransState*ts=(MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
420420
if (ts!=NULL&& !ts->isLocal) {
421421
snapshot=ts->snapshot;
422422
Assert(ts->gtid.node==MtmNodeId||MtmIsRecoverySession);
@@ -806,7 +806,7 @@ static MtmTransState*
806806
MtmCreateTransState(MtmCurrentTrans*x)
807807
{
808808
boolfound;
809-
MtmTransState*ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
809+
MtmTransState*ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
810810
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
811811
ts->snapshot=x->snapshot;
812812
ts->isLocal= true;
@@ -859,6 +859,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
859859
/* Do not take in account bg-workers which are performing recovery */
860860
elog(ERROR,"Abort current transaction because this cluster node is in %s status",MtmNodeStatusMnem[Mtm->status]);
861861
}
862+
if (TransactionIdIsValid(x->gtid.xid)&&BIT_CHECK(Mtm->disabledNodeMask,x->gtid.node-1)) {
863+
/* Coordinator of transaction is disabled: just abort transaction without any further steps */
864+
elog(ERROR,"Abort transaction %d because it's coordinator %d was disabled",x->xid,x->gtid.node);
865+
}
862866

863867
MtmLock(LW_EXCLUSIVE);
864868

@@ -920,6 +924,32 @@ bool MtmWatchdog(timestamp_t now)
920924
returnallAlive;
921925
}
922926

927+
/*
928+
* Mark transaction as precommitted
929+
*/
930+
voidMtmPrecommitTransaction(charconst*gid)
931+
{
932+
MtmLock(LW_EXCLUSIVE);
933+
{
934+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_FIND,NULL);
935+
if (tm==NULL) {
936+
elog(WARNING,"MtmPrecommitTransaction: transaciton '%s' is not found",gid);
937+
}else {
938+
MtmTransState*ts=tm->state;
939+
Assert(ts!=NULL);
940+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
941+
ts->status=TRANSACTION_STATUS_UNKNOWN;
942+
ts->csn=MtmAssignCSN();
943+
MtmAdjustSubtransactions(ts);
944+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
945+
}
946+
}
947+
MtmUnlock();
948+
}
949+
950+
951+
952+
923953

924954
staticbool
925955
MtmVotingCompleted(MtmTransState*ts)
@@ -944,7 +974,8 @@ MtmVotingCompleted(MtmTransState* ts)
944974
return true;
945975
}elseif (MtmUseDtm) {
946976
ts->votedMask=0;
947-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
977+
SetPrepareTransactionState(ts->gid,"precommitted");
978+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
948979
return false;
949980
}else {
950981
ts->status=TRANSACTION_STATUS_UNKNOWN;
@@ -964,7 +995,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
964995
intnConfigChanges=Mtm->nConfigChanges;
965996
timestamp_tprepareTime=ts->csn-ts->snapshot;
966997
timestamp_ttimeout=Max(prepareTime+MSEC_TO_USEC(MtmMin2PCTimeout),prepareTime*MtmMax2PCRatio/100);
967-
timestamp_tdeadline=MtmGetSystemTime()+timeout;
998+
timestamp_tstart=MtmGetSystemTime();
999+
timestamp_tdeadline=start+timeout;
9681000
timestamp_tnow;
9691001

9701002
Assert(ts->csn>ts->snapshot);
@@ -989,7 +1021,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
9891021
if (now>deadline) {
9901022
if (ts->isPrepared) {
9911023
/* resend precommit message */
992-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1024+
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1025+
elog(LOG,"Distributes transaction is not committed in %ld msec",USEC_TO_MSEC(now-start));
9931026
}else {
9941027
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(timeout));
9951028
MtmAbortTransaction(ts);
@@ -1032,15 +1065,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10321065
elog(ERROR,"ERROR INJECTION for transaction %d (%s)",x->xid,x->gid);
10331066
}
10341067
MtmLock(LW_EXCLUSIVE);
1035-
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
1068+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
10361069
Assert(ts!=NULL);
10371070
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10381071
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
10391072
MTM_TXTRACE(x,"recovery?");
10401073
Assert(x->gid[0]);
10411074
ts->votingCompleted= true;
10421075
MTM_TXTRACE(x,"recovery? 1");
1043-
if (Mtm->status!=MTM_RECOVERY||Mtm->recoverySlot!=MtmReplicationNodeId) {
1076+
if (Mtm->status!=MTM_RECOVERY/* || Mtm->recoverySlot != MtmReplicationNodeId*/) {
10441077
MTM_TXTRACE(x,"recovery? 2");
10451078
MtmSend2PCMessage(ts,MSG_PREPARED);/* send notification to coordinator */
10461079
if (!MtmUseDtm) {
@@ -1097,7 +1130,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10971130
ts->votedMask=0;
10981131
ts->procno=MyProc->pgprocno;
10991132
MTM_TXTRACE(ts,"Coordinator sends MSG_PRECOMMIT");
1100-
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
1133+
SetPrepareTransactionState(ts->gid,"precommitted");
1134+
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
11011135

11021136
Mtm2PCVoting(x,ts);
11031137

@@ -1154,7 +1188,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541188
MtmTransState*ts=NULL;
11551189
MtmLock(LW_EXCLUSIVE);
11561190
if (x->isPrepared) {
1157-
ts=hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
1191+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_FIND,NULL);
11581192
Assert(ts!=NULL);
11591193
Assert(strcmp(x->gid,ts->gid)==0);
11601194
}elseif (x->gid[0]) {
@@ -1206,7 +1240,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12061240
if (ts==NULL) {
12071241
boolfound;
12081242
Assert(TransactionIdIsValid(x->xid));
1209-
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
1243+
ts=(MtmTransState*)hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
12101244
if (!found) {
12111245
ts->isEnqueued= false;
12121246
ts->isActive= false;
@@ -1316,6 +1350,53 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
13161350
}
13171351
}
13181352

1353+
/*
1354+
* Restore state of recovered prepared transaction in memory
1355+
*/
1356+
staticvoidMtmLoadPreparedTransactions(void)
1357+
{
1358+
PreparedTransactionpxacts;
1359+
intn=GetPreparedTransactions(&pxacts);
1360+
inti;
1361+
1362+
for (i=0;i<n;i++) {
1363+
boolfound;
1364+
charconst*gid=pxacts[i].gid;
1365+
MtmTransMap*tm= (MtmTransMap*)hash_search(MtmGid2State,gid,HASH_ENTER,&found);
1366+
if (!found) {
1367+
TransactionIdxid=GetNewTransactionId(false);
1368+
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_ENTER,&found);
1369+
MTM_LOG1("Recover prepared transaction %s xid %d",gid,xid);
1370+
MyPgXact->xid=InvalidTransactionId;/* dirty hack:((( */
1371+
Assert(!found);
1372+
Mtm->nActiveTransactions+=1;
1373+
ts->isEnqueued= false;
1374+
ts->isActive= true;
1375+
ts->status=strcmp(pxacts[i].state_3pc,"precommitted")==0 ?TRANSACTION_STATUS_UNKNOWN :TRANSACTION_STATUS_IN_PROGRESS;
1376+
ts->isLocal= true;
1377+
ts->isPrepared= false;
1378+
ts->isPinned= false;
1379+
ts->snapshot=INVALID_CSN;
1380+
ts->isTwoPhase= false;
1381+
ts->csn=0;/* should be replaced with real CSN by poll result */
1382+
ts->gtid.node=MtmNodeId;
1383+
ts->gtid.xid=xid;
1384+
ts->nSubxids=0;
1385+
ts->votingCompleted= true;
1386+
ts->participantsMask= (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask& ~((nodemask_t)1 << (MtmNodeId-1));
1387+
ts->votedMask=0;
1388+
strcpy(ts->gid,gid);
1389+
MtmTransactionListAppend(ts);
1390+
tm->status=ts->status;
1391+
tm->state=ts;
1392+
MtmBroadcastPollMessage(ts);
1393+
}
1394+
}
1395+
MTM_LOG1("Recover %d prepared transactions",n);
1396+
if (pxacts) {
1397+
pfree(pxacts);
1398+
}
1399+
}
13191400

13201401
staticvoidMtmStartRecovery()
13211402
{
@@ -2051,6 +2132,7 @@ static void MtmCheckControlFile(void)
20512132
}
20522133
}
20532134

2135+
20542136
staticvoidMtmInitialize()
20552137
{
20562138
boolfound;
@@ -2087,6 +2169,7 @@ static void MtmInitialize()
20872169
Mtm->nConfigChanges=0;
20882170
Mtm->recoveryCount=0;
20892171
Mtm->localTablesHashLoaded= false;
2172+
Mtm->preparedTransactionsLoaded= false;
20902173
Mtm->inject2PCError=0;
20912174
Mtm->sendQueue=NULL;
20922175
Mtm->freeQueue=NULL;
@@ -2851,6 +2934,13 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28512934
MtmReplicationModemode=REPLMODE_OPEN_EXISTED;
28522935

28532936
MtmLock(LW_EXCLUSIVE);
2937+
2938+
if (!Mtm->preparedTransactionsLoaded)
2939+
{
2940+
MtmLoadPreparedTransactions();
2941+
Mtm->preparedTransactionsLoaded= true;
2942+
}
2943+
28542944
while ((Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)||BIT_CHECK(Mtm->disabledNodeMask,nodeId-1))
28552945
{
28562946
if (*shutdown)
@@ -3330,7 +3420,7 @@ mtm_get_csn(PG_FUNCTION_ARGS)
33303420
csn_tcsn=INVALID_CSN;
33313421

33323422
MtmLock(LW_SHARED);
3333-
ts=hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
3423+
ts=(MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
33343424
if (ts!=NULL) {
33353425
csn=ts->csn;
33363426
}
@@ -3859,14 +3949,12 @@ static inline void MtmGucUpdate(const char *key, char *value)
38593949
MtmGucEntry*hentry;
38603950
boolfound;
38613951

3862-
hentry=hash_search(MtmGucHash,key,HASH_FIND,&found);
3952+
hentry=(MtmGucEntry*)hash_search(MtmGucHash,key,HASH_ENTER,&found);
38633953
if (found)
38643954
{
38653955
pfree(hentry->value);
38663956
dlist_delete(&hentry->list_node);
38673957
}
3868-
3869-
hentry=hash_search(MtmGucHash,key,HASH_ENTER,NULL);
38703958
hentry->value=value;
38713959
dlist_push_tail(&MtmGucList,&hentry->list_node);
38723960
}
@@ -3876,7 +3964,7 @@ static inline void MtmGucRemove(const char *key)
38763964
MtmGucEntry*hentry;
38773965
boolfound;
38783966

3879-
hentry=hash_search(MtmGucHash,key,HASH_FIND,&found);
3967+
hentry=(MtmGucEntry*)hash_search(MtmGucHash,key,HASH_FIND,&found);
38803968
if (found)
38813969
{
38823970
pfree(hentry->value);
@@ -4465,7 +4553,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
44654553
}
44664554

44674555
staticbool
4468-
MtmDetectGlobalDeadLockFortXid(TransactionIdxid)
4556+
MtmDetectGlobalDeadLockForXid(TransactionIdxid)
44694557
{
44704558
boolhasDeadlock= false;
44714559
if (TransactionIdIsValid(xid)) {
@@ -4530,11 +4618,11 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
45304618

45314619
MTM_LOG1("Detect global deadlock for %d by backend %d",pgxact->xid,MyProcPid);
45324620

4533-
returnMtmDetectGlobalDeadLockFortXid(pgxact->xid);
4621+
returnMtmDetectGlobalDeadLockForXid(pgxact->xid);
45344622
}
45354623

45364624
Datummtm_check_deadlock(PG_FUNCTION_ARGS)
45374625
{
45384626
TransactionIdxid=PG_GETARG_INT32(0);
4539-
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4627+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockForXid(xid));
45404628
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp