Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit07321e6

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parentsbf09fde +a649b7d commit07321e6

File tree

15 files changed

+226
-105
lines changed

15 files changed

+226
-105
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,17 @@ static void MtmSendHeartbeat()
359359
for (i=0;i<Mtm->nAllNodes;i++)
360360
{
361361
if (i+1!=MtmNodeId) {
362-
if (!BIT_CHECK(busy_mask,i)
363-
&& (Mtm->status!=MTM_ONLINE
364-
||sockets[i] >=0
365-
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
366-
||BIT_CHECK(Mtm->reconnectMask,i)))
362+
if (!BIT_CHECK(busy_mask,i))
363+
/*
364+
* Old behaviour here can cause subtle bugs, for example
365+
* it can happened that none of mentioned conditiotions is
366+
* true when disabled node connects to a major node which
367+
* is online. So just send it allways. --sk
368+
*/
369+
// && (Mtm->status != MTM_ONLINE
370+
// || sockets[i] >= 0
371+
// || !BIT_CHECK(Mtm->disabledNodeMask, i)
372+
// || BIT_CHECK(Mtm->reconnectMask, i)))
367373
{
368374
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
369375
MTM_ELOG(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
@@ -981,6 +987,7 @@ static void MtmReceiver(Datum arg)
981987
msg->gid,MtmTxnStatusMnem[msg->status],node);
982988

983989
replorigin_session_origin=DoNotReplicateId;
990+
TXFINISH("%s ABORT, MSG_POLL_STATUS",msg->gid);
984991
MtmFinishPreparedTransaction(ts, false);
985992
replorigin_session_origin=InvalidRepOriginId;
986993
}
@@ -994,6 +1001,7 @@ static void MtmReceiver(Datum arg)
9941001
MTM_ELOG(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
9951002

9961003
replorigin_session_origin=DoNotReplicateId;
1004+
TXFINISH("%s COMMIT, MSG_POLL_STATUS",msg->gid);
9971005
MtmFinishPreparedTransaction(ts, true);
9981006
replorigin_session_origin=InvalidRepOriginId;
9991007
}else {
@@ -1069,17 +1077,10 @@ static void MtmReceiver(Datum arg)
10691077
if (ts->isTwoPhase) {
10701078
MtmWakeUpBackend(ts);
10711079
}elseif (MtmUseDtm) {
1072-
ts->votedMask=0;
10731080
MTM_TXTRACE(ts,"MtmTransReceiver send MSG_PRECOMMIT");
10741081
Assert(replorigin_session_origin==InvalidRepOriginId);
1075-
MTM_LOG2("SetPreparedTransactionState for %s",ts->gid);
1076-
MtmUnlock();
1077-
MtmResetTransaction();
1078-
StartTransactionCommand();
1079-
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
1080-
CommitTransactionCommand();
1081-
Assert(!MtmTransIsActive());
1082-
MtmLock(LW_EXCLUSIVE);
1082+
ts->isPrepared= false;
1083+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
10831084
}else {
10841085
ts->status=TRANSACTION_STATUS_UNKNOWN;
10851086
MtmWakeUpBackend(ts);

‎contrib/mmts/multimaster.c

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ MtmVotingCompleted(MtmTransState* ts)
12551255
ts->status=TRANSACTION_STATUS_UNKNOWN;
12561256
return true;
12571257
}else {
1258-
MTM_LOG1("Transaction %s is considered as prepared (status=%s participants=%llx disabled=%llx, voted=%llx)",
1258+
MTM_LOG2("Transaction %s is considered as prepared (status=%s participants=%llx disabled=%llx, voted=%llx)",
12591259
ts->gid,MtmTxnStatusMnem[ts->status],ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
12601260
ts->isPrepared= true;
12611261
if (ts->isTwoPhase) {
@@ -1530,7 +1530,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15301530
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,MtmTxnStatusMnem[ts->status]);
15311531
if (commit) {
15321532
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
1533-
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
1533+
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status<=MTM_RECOVERY)))
15341534
{
15351535
MtmUnlock();
15361536
MTM_ELOG(ERROR,"Attempt to commit %s transaction %s (%llu)",
@@ -1690,7 +1690,7 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
16901690

16911691
for (i=0;i<Mtm->nAllNodes;i++)
16921692
{
1693-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
1693+
if (BIT_CHECK(ts->participantsMask,i))
16941694
{
16951695
msg.node=i+1;
16961696
MTM_LOG3("Send request for transaction %s to node %d",msg.gid,msg.node);
@@ -1734,7 +1734,7 @@ static voidMtmLoadPreparedTransactions(void)
17341734
ts->gtid.xid=xid;
17351735
ts->nSubxids=0;
17361736
ts->votingCompleted= true;
1737-
ts->participantsMask= (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask& ~((nodemask_t)1 << (MtmNodeId-1));
1737+
ts->participantsMask= (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~((nodemask_t)1 << (MtmNodeId-1));
17381738
ts->nConfigChanges=Mtm->nConfigChanges;
17391739
ts->votedMask=0;
17401740
strcpy(ts->gid,gid);
@@ -1973,10 +1973,12 @@ void MtmPollStatusOfPreparedTransactionsForDisabledNode(int disabledNodeId, bool
19731973
Assert(ts->gid[0]);
19741974
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
19751975
MTM_ELOG(LOG,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d",ts->gid,MtmNodeId);
1976+
TXFINISH("%s ABORT, PollStatusOfPrepared",ts->gid);
19761977
MtmFinishPreparedTransaction(ts, false);
19771978
}else {
19781979
if (commitPrecommited)
19791980
{
1981+
TXFINISH("%s COMMIT, PollStatusOfPrepared",ts->gid);
19801982
MtmFinishPreparedTransaction(ts, true);
19811983
}
19821984
else
@@ -2033,10 +2035,10 @@ MtmCheckSlots()
20332035
if (slot->in_use
20342036
&&sscanf(slot->data.name.data,MULTIMASTER_SLOT_PATTERN,&nodeId)==1
20352037
&&BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)
2036-
&&slot->data.confirmed_flush+MtmMaxRecoveryLag<GetXLogInsertRecPtr()
2038+
&&slot->data.confirmed_flush+MtmMaxRecoveryLag*1024<GetXLogInsertRecPtr()
20372039
&&slot->data.confirmed_flush!=0)
20382040
{
2039-
MTM_ELOG(WARNING,"Drop slot for node %d which lag %lld is larger than threshold %d",
2041+
MTM_ELOG(WARNING,"Drop slot for node %d which lag %lldBis larger than threshold %d kB",
20402042
nodeId,
20412043
(long64)(GetXLogInsertRecPtr()-slot->data.restart_lsn),
20422044
MtmMaxRecoveryLag);
@@ -2102,7 +2104,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
21022104
if (MtmIsRecoveredNode(nodeId)) {
21032105
lsn_twalLSN=GetXLogInsertRecPtr();
21042106
if (!BIT_CHECK(Mtm->originLockNodeMask,nodeId-1)
2105-
&&slotLSN+MtmMinRecoveryLag>walLSN)
2107+
&&slotLSN+MtmMinRecoveryLag*1024>walLSN)
21062108
{
21072109
/*
21082110
* Wal sender almost caught up.
@@ -2860,14 +2862,14 @@ _PG_init(void)
28602862
);
28612863
DefineCustomIntVariable(
28622864
"multimaster.trans_spill_threshold",
2863-
"Maximal size(Mb)of transaction after which transaction is written to the disk",
2865+
"Maximal size of transaction after which transaction is written to the disk",
28642866
NULL,
28652867
&MtmTransSpillThreshold,
2866-
100,/* 100Mb */
2867-
0,
2868-
MaxAllocSize/MB,
2869-
PGC_BACKEND,
2868+
100*1024,/* 100Mb */
28702869
0,
2870+
MaxAllocSize/GUC_UNIT_KB,
2871+
PGC_SIGHUP,
2872+
GUC_UNIT_KB,
28712873
NULL,
28722874
NULL,
28732875
NULL
@@ -2894,11 +2896,11 @@ _PG_init(void)
28942896
"When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile competition' and "
28952897
"temporary stop commit of new transactions until node will be completely repared",
28962898
&MtmMinRecoveryLag,
2897-
100000,
2898-
1,
2899-
INT_MAX,
2900-
PGC_BACKEND,
2899+
10*1024,/* 10 MB */
29012900
0,
2901+
INT_MAX,
2902+
PGC_SIGHUP,
2903+
GUC_UNIT_KB,
29022904
NULL,
29032905
NULL,
29042906
NULL
@@ -2910,11 +2912,11 @@ _PG_init(void)
29102912
"Dropping slot makes it not possible to recover node using logical replication mechanism, it will be ncessary to completely copy content of some other nodes "
29112913
"using basebackup or similar tool. Zero value of parameter disable dropping slot.",
29122914
&MtmMaxRecoveryLag,
2913-
100000000,
2915+
1*1024*1024,/* 1 GB */
29142916
0,
29152917
INT_MAX,
2916-
PGC_BACKEND,
2917-
0,
2918+
PGC_SIGHUP,
2919+
GUC_UNIT_KB,
29182920
NULL,
29192921
NULL,
29202922
NULL
@@ -3309,6 +3311,7 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
33093311
StartTransactionCommand();
33103312
MtmBeginSession(nodeId);
33113313
MtmSetCurrentTransactionGID(gid);
3314+
TXFINISH("%s ABORT, MtmRollbackPrepared",gid);
33123315
FinishPreparedTransaction(gid, false);
33133316
MtmTx.isActive= true;
33143317
CommitTransactionCommand();
@@ -3814,7 +3817,7 @@ void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
38143817
*/
38153818
voidMtmBeginSession(intnodeId)
38163819
{
3817-
MtmLockNode(nodeId,LW_EXCLUSIVE);
3820+
//MtmLockNode(nodeId, LW_EXCLUSIVE);
38183821
Assert(replorigin_session_origin==InvalidRepOriginId);
38193822
replorigin_session_origin=Mtm->nodes[nodeId-1].originId;
38203823
Assert(replorigin_session_origin!=InvalidRepOriginId);
@@ -3834,9 +3837,9 @@ void MtmEndSession(int nodeId, bool unlock)
38343837
replorigin_session_origin_lsn=INVALID_LSN;
38353838
replorigin_session_origin_timestamp=0;
38363839
replorigin_session_reset();
3837-
if (unlock) {
3838-
MtmUnlockNode(nodeId);
3839-
}
3840+
//if (unlock) {
3841+
//MtmUnlockNode(nodeId);
3842+
//}
38403843
MTM_LOG3("%d: End reset replorigin session: %d",MyProcPid,replorigin_session_origin);
38413844
}
38423845
}
@@ -4592,9 +4595,11 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
45924595
ts= (MtmTransState*)hash_search(MtmXid2State,&(x->xid),HASH_FIND,NULL);
45934596
Assert(ts);
45944597

4598+
TXFINISH("%s ABORT, MtmTwoPhase",x->gid);
45954599
FinishPreparedTransaction(x->gid, false);
45964600
MTM_ELOG(ERROR,"Transaction %s (%llu) is aborted on node %d. Check its log to see error details.",x->gid, (long64)x->xid,ts->abortedByNode);
45974601
}else {
4602+
TXFINISH("%s COMMIT, MtmTwoPhase",x->gid);
45984603
FinishPreparedTransaction(x->gid, true);
45994604
MTM_TXTRACE(x,"MtmTwoPhaseCommit Committed");
46004605
MTM_LOG2("Distributed transaction %s (%lld) is committed at %lld with LSN=%lld",x->gid, (long64)x->xid,MtmGetCurrentTime(), (long64)GetXLogInsertRecPtr());

‎contrib/mmts/multimaster.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@
4141
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4242
#endif
4343

44+
// #define MTM_TXFINISH 1
45+
46+
#ifndefMTM_TXFINISH
47+
#defineTXFINISH(fmt, ...)
48+
#else
49+
#defineTXFINISH(fmt, ...) elog(LOG, MTM_TAG "[TXFINISH] " fmt, ## __VA_ARGS__)
50+
#endif
51+
4452
// #define MTM_TRACE 1
4553

4654
#ifndefMTM_TRACE

‎contrib/mmts/pglogical_apply.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,7 @@ process_remote_commit(StringInfo in)
726726
MTM_LOG1("Perform delayed rollback of prepared global transaction %s",gid);
727727
StartTransactionCommand();
728728
MtmSetCurrentTransactionGID(gid);
729+
TXFINISH("%s ABORT, PGLOGICAL_PREPARE",gid);
729730
FinishPreparedTransaction(gid, false);
730731
CommitTransactionCommand();
731732
Assert(!MtmTransIsActive());
@@ -739,7 +740,7 @@ process_remote_commit(StringInfo in)
739740
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
740741
csn=pq_getmsgint64(in);
741742
/*
742-
* Since our recovery method allows undershoot ofcsn, we can receive
743+
* Since our recovery method allows undershoot oflsn, we can receive
743744
* some already committed transactions. And in case of donor node reboot
744745
* xid<->csn mapping for them will be lost. However we must filter such
745746
* transactions in walreceiver before this code. --sk
@@ -750,8 +751,12 @@ process_remote_commit(StringInfo in)
750751
MtmResetTransaction();
751752
StartTransactionCommand();
752753
MtmBeginSession(origin_node);
753-
MtmSetCurrentTransactionCSN(csn);
754+
if (csn==INVALID_CSN&&Mtm->status==MTM_RECOVERY)
755+
MtmSetCurrentTransactionCSN(MtmAssignCSN());
756+
else
757+
MtmSetCurrentTransactionCSN(csn);
754758
MtmSetCurrentTransactionGID(gid);
759+
TXFINISH("%s COMMIT, PGLOGICAL_COMMIT_PREPARED csn=%lld",gid,csn);
755760
FinishPreparedTransaction(gid, true);
756761
MTM_LOG2("Distributed transaction %s is committed",gid);
757762
CommitTransactionCommand();

‎contrib/mmts/pglogical_receiver.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -366,20 +366,21 @@ pglogical_receiver_main(Datum main_arg)
366366
res=PQexec(conn,query->data);
367367
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
368368
{
369-
inti,n_deleted_slots=0;
369+
//int i, n_deleted_slots = 0;
370370

371-
elog(WARNING,"Can't find slot on node%d. Shutting down receiver.",nodeId);
372-
Mtm->nodes[nodeId-1].slotDeleted= true;
373-
for (i=0;i<Mtm->nAllNodes;i++)
374-
{
375-
if (Mtm->nodes[i].slotDeleted)
376-
n_deleted_slots++;
377-
}
378-
if (n_deleted_slots==Mtm->nAllNodes-1)
379-
{
380-
elog(FATAL,"All neighbour nopes have no replication slot for us. Exiting.");
381-
}
382-
proc_exit(1);
371+
elog(WARNING,"Can't find slot on node%d. Shutting down receiver. %s",nodeId,PQresultErrorMessage(res));
372+
gotoOnError;
373+
// Mtm->nodes[nodeId-1].slotDeleted = true;
374+
// for (i = 0; i < Mtm->nAllNodes; i++)
375+
// {
376+
// if (Mtm->nodes[i].slotDeleted)
377+
// n_deleted_slots++;
378+
// }
379+
// if (n_deleted_slots == Mtm->nAllNodes - 1)
380+
// {
381+
// elog(FATAL, "All neighbour nopes have no replication slot for us. Exiting.");
382+
// }
383+
// proc_exit(1);
383384
}
384385
PQclear(res);
385386
resetPQExpBuffer(query);
@@ -576,7 +577,8 @@ pglogical_receiver_main(Datum main_arg)
576577
elog(WARNING,"Commit of prepared transaction takes %lld usec, flags=%x",stop-start,stmt[1]);
577578
}
578579
}else {
579-
Assert(stmt[1]==PGLOGICAL_PREPARE||stmt[1]==PGLOGICAL_COMMIT);/* all other commits should be applied in place */
580+
/* all other commits should be applied in place */
581+
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);
580582
MtmExecute(buf.data,buf.used);
581583
}
582584
}

‎contrib/mmts/state.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,15 @@ MtmCheckState(void)
102102
// XXXX: should we restrict major with two nodes setup?
103103
|| (nConnected==Mtm->nAllNodes/2&&MtmMajorNode)/* or half + major node */
104104
|| (nConnected==Mtm->nAllNodes/2&&Mtm->refereeGrant) )/* or half + referee */
105-
&&BIT_CHECK(Mtm->clique,MtmNodeId-1)/* in clique */
105+
&&(BIT_CHECK(Mtm->clique,MtmNodeId-1)||Mtm->refereeGrant)/* in clique when non-major */
106106
&& !BIT_CHECK(Mtm->stoppedNodeMask,MtmNodeId-1);/* is not stopped */
107107

108108
/* ANY -> MTM_DISABLED */
109109
if (!isEnabledState)
110110
{
111-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
111+
//BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
112112
MtmSetClusterStatus(MTM_DISABLED);
113+
MtmDisableNode(MtmNodeId);
113114
return;
114115
}
115116

@@ -311,9 +312,9 @@ void MtmOnNodeDisconnect(int nodeId)
311312
* We should disable it, as clique detector will not necessarily
312313
* do that. For example it will anyway find clique with one node.
313314
*/
314-
MtmDisableNode(nodeId);
315315

316316
MtmLock(LW_EXCLUSIVE);
317+
MtmDisableNode(nodeId);
317318
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
318319
BIT_SET(Mtm->reconnectMask,nodeId-1);
319320
Mtm->nConfigChanges+=1;
@@ -323,6 +324,7 @@ void MtmOnNodeDisconnect(int nodeId)
323324
// MtmRefreshClusterStatus();
324325
}
325326

327+
// XXXX: make that event too
326328
voidMtmOnNodeConnect(intnodeId)
327329
{
328330
// if (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId-1))
@@ -450,12 +452,6 @@ MtmRefreshClusterStatus()
450452
}
451453
}
452454

453-
/*
454-
* Do not check clique with referee grant, because we can disable ourself.
455-
*/
456-
if (Mtm->refereeGrant)
457-
return;
458-
459455
/*
460456
* Check for clique.
461457
*/
@@ -499,6 +495,18 @@ MtmRefreshClusterStatus()
499495

500496
Mtm->clique=newClique;
501497

498+
/*
499+
* Do not perform any action based on clique with referee grant,
500+
* because we can disable ourself.
501+
* But we also need to maintain actual clique not disable ourselves
502+
* when neighbour node will come back and we erase refereeGrant.
503+
*/
504+
if (Mtm->refereeGrant)
505+
{
506+
MtmUnlock();
507+
return;
508+
}
509+
502510
for (i=0;i<Mtm->nAllNodes;i++)
503511
{
504512
boolold_status=BIT_CHECK(Mtm->disabledNodeMask,i);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp