Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9545eed

Browse files
knizhnikkelvich
authored andcommitted
Add multimaster.break_connection option
1 parentf0ed19f commit9545eed

File tree

7 files changed

+82
-24
lines changed

7 files changed

+82
-24
lines changed

‎arbiter.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,10 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
327327
{
328328
elog(WARNING,"Node %d thinks that I am dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],MtmMessageKindMnem[resp->code]);
329329
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
330+
Mtm->nConfigChanges+=1;
330331
MtmSwitchClusterMode(MTM_RECOVERY);
331332
}elseif (BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)&&sockets[resp->node-1]<0) {
332-
/* We receive heartbeat fromdsiable node with
333+
/* We receive heartbeat fromdisabled node.
333334
* Looks like it is restarted.
334335
* Try to reconnect to it.
335336
*/
@@ -1040,13 +1041,16 @@ static void MtmReceiver(Datum arg)
10401041
Mtm->nodes[node-1].transDelay+=MtmGetCurrentTime()-ts->csn;
10411042
ts->xids[node-1]=msg->sxid;
10421043

1044+
#if0
1045+
/* This code seems to be deteriorated because now checking that distributed transaction involves all live nodes is done at replica while applying PREPARE */
10431046
if ((~msg->disabledNodeMask&Mtm->disabledNodeMask)!=0) {
10441047
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
10451048
commit on smaller subset of nodes */
10461049
elog(WARNING,"Coordinator of distributed transaction %s (%llu) see less nodes than node %d: %llx instead of %llx",
10471050
ts->gid, (long64)ts->xid,node,Mtm->disabledNodeMask,msg->disabledNodeMask);
10481051
MtmAbortTransaction(ts);
10491052
}
1053+
#endif
10501054
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
10511055
/* All nodes are finished their transactions */
10521056
if (ts->status==TRANSACTION_STATUS_ABORTED) {

‎multimaster.c

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ static int MtmGcPeriod;
246246
staticboolMtmIgnoreTablesWithoutPk;
247247
staticintMtmLockCount;
248248
staticboolMtmMajorNode;
249+
staticboolMtmBreakConnection;
249250

250251
staticExecutorStart_hook_typePreviousExecutorStartHook;
251252
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -418,16 +419,27 @@ MtmInitializeSequence(int64* start, int64* step)
418419
* -------------------------------------------
419420
*/
420421

421-
csn_tMtmTransactionSnapshot(TransactionIdxid)
422+
/*
423+
* Get snapshot of transaction proceed by WAL sender pglogical plugin.
424+
* If it is local transaction or replication node is not in participant mask, then return INVALID_CSN.
425+
* Transaction should be skept by WAL sender in the following cases:
426+
* 1. Transaction was replicated from some other node and it is not a recovery process.
427+
* 2. State of transaction is unknown
428+
* 3. Replication node is not participated in transaction
429+
*/
430+
csn_tMtmDistributedTransactionSnapshot(TransactionIdxid,intnodeId,nodemask_t*participantsMask)
422431
{
423432
csn_tsnapshot=INVALID_CSN;
424-
433+
*participantsMask=0;
425434
MtmLock(LW_SHARED);
426435
if (Mtm->status==MTM_ONLINE) {
427436
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
428-
if (ts!=NULL&& !ts->isLocal) {
429-
snapshot=ts->snapshot;
430-
Assert(ts->gtid.node==MtmNodeId||MtmIsRecoverySession);
437+
if (ts!=NULL) {
438+
*participantsMask=ts->participantsMask;
439+
if (!ts->isLocal&&BIT_CHECK(ts->participantsMask,nodeId-1)) {
440+
snapshot=ts->snapshot;
441+
Assert(ts->gtid.node==MtmNodeId||MtmIsRecoverySession);
442+
}
431443
}
432444
}
433445
MtmUnlock();
@@ -621,6 +633,9 @@ MtmAdjustOldestXid(TransactionId xid)
621633
returnxid;
622634
}
623635

636+
637+
638+
624639
/*
625640
* -------------------------------------------
626641
* Transaction list manipulation.
@@ -789,7 +804,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
789804
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
790805
*/
791806
MtmUnlock();
792-
elog(ERROR,"Multimaster node is not online: current status %s",MtmNodeStatusMnem[Mtm->status]);
807+
elog(MtmBreakConnection ?FATAL :ERROR,"Multimaster node is not online: current status %s",MtmNodeStatusMnem[Mtm->status]);
793808
}
794809
x->containsDML= false;
795810
x->snapshot=MtmAssignCSN();
@@ -799,10 +814,8 @@ MtmBeginTransaction(MtmCurrentTrans* x)
799814

800815
/*
801816
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to caught-up.
802-
* Only "own" transactions are blocked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
803-
* and should not cause cluster status change.
804817
*/
805-
if (x->isDistributed/* && x->isReplicated*/) {
818+
if (x->isDistributed) {
806819
MtmCheckClusterLock();
807820
}
808821

@@ -1141,6 +1154,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
11411154
}
11421155
MtmUnlock();
11431156
if (x->isTwoPhase) {
1157+
if (x->status==TRANSACTION_STATUS_ABORTED) {
1158+
elog(ERROR,"Prepare of user's 2PC transaction %s (%llu) is aborted by DTM",x->gid, (long64)x->xid);
1159+
}
11441160
MtmResetTransaction();
11451161
}
11461162
}
@@ -2198,7 +2214,29 @@ void MtmReconnectNode(int nodeId)
21982214
MtmUnlock();
21992215
}
22002216

2201-
2217+
/*
2218+
* Check particioants of replicated transaction. This function is called by receiver at start of replicated transaction to
2219+
* check that all live nodes are participated in it.
2220+
*/
2221+
boolMtmCheckParticipants(GlobalTransactionId*gtid,nodemask_tparticipantsMask)
2222+
{
2223+
boolresult= true;
2224+
MtmLock(LW_EXCLUSIVE);
2225+
if (BIT_CHECK(Mtm->disabledNodeMask,gtid->node-1)) {
2226+
elog(WARNING,"Ignore transaction %llu from disabled node %d", (long64)gtid->xid,gtid->node);
2227+
result= false;
2228+
}else {
2229+
nodemask_tliveMask= (((nodemask_t)1 <<Mtm->nAllNodes)-1)& ~Mtm->disabledNodeMask;
2230+
BIT_SET(participantsMask,gtid->node-1);
2231+
if (liveMask& ~participantsMask) {
2232+
elog(WARNING,"Ignore transaction %llu from node %d because some of live nodes (%llx) are not participated in it (%llx)",
2233+
(long64)gtid->xid,gtid->node,liveMask,participantsMask);
2234+
result= false;
2235+
}
2236+
}
2237+
MtmUnlock();
2238+
returnresult;
2239+
}
22022240

22032241
/*
22042242
* -------------------------------------------
@@ -2836,6 +2874,18 @@ _PG_init(void)
28362874
NULL
28372875
);
28382876

2877+
DefineCustomBoolVariable(
2878+
"multimaster.break_connection",
2879+
"Break connection with client when node is no online",
2880+
NULL,
2881+
&MtmBreakConnection,
2882+
false,
2883+
PGC_BACKEND,
2884+
0,
2885+
NULL,
2886+
NULL,
2887+
NULL
2888+
);
28392889
DefineCustomBoolVariable(
28402890
"multimaster.major_node",
28412891
"Node which forms a majority in case of partitioning in cliques with equal number of nodes",
@@ -4358,8 +4408,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
43584408
}
43594409
if (!PrepareTransactionBlock(x->gid))
43604410
{
4361-
if (!MtmVolksWagenMode)
4362-
elog(WARNING,"Failed to prepare transaction %s",x->gid);
4411+
//if (!MtmVolksWagenMode)
4412+
elog(WARNING,"Failed to prepare transaction %s (%llu)",x->gid, (long64)x->xid);
43634413
/* ??? Should we do explicit rollback */
43644414
}else {
43654415
CommitTransactionCommand();

‎multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ extern timestamp_t MtmRefreshClusterStatusSchedule;
347347
externvoidMtmArbiterInitialize(void);
348348
externvoidMtmStartReceivers(void);
349349
externvoidMtmStartReceiver(intnodeId,booldynamic);
350-
externcsn_tMtmTransactionSnapshot(TransactionIdxid);
350+
externcsn_tMtmDistributedTransactionSnapshot(TransactionIdxid,intnodeId,nodemask_t*participantsMask);
351351
externcsn_tMtmAssignCSN(void);
352352
externcsn_tMtmSyncClock(csn_tcsn);
353353
externvoidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tsnapshot);
@@ -402,4 +402,6 @@ extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
402402
externboolMtmFilterTransaction(char*record,intsize);
403403
externvoidMtmPrecommitTransaction(charconst*gid);
404404
externchar*MtmGucSerialize(void);
405+
externboolMtmCheckParticipants(GlobalTransactionId*gtid,nodemask_tparticipants);
406+
405407
#endif

‎pglogical_apply.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,22 +337,21 @@ process_remote_begin(StringInfo s)
337337
{
338338
GlobalTransactionIdgtid;
339339
csn_tsnapshot;
340+
nodemask_tparticipantsMask;
340341
intrc;
341342

342343
gtid.node=pq_getmsgint(s,4);
343344
gtid.xid=pq_getmsgint(s,4);
344345
snapshot=pq_getmsgint64(s);
345-
346+
participantsMask=pq_getmsgint64(s);
346347
Assert(gtid.node>0);
347348

348-
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%lld",gtid.node,gtid.xid,snapshot);
349+
MTM_LOG2("REMOTE begin node=%d xid=%lu snapshot=%lld participantsMask=%llx",gtid.node,(long64)gtid.xid,snapshot,participantsMask);
349350
MtmResetTransaction();
350-
#if1
351-
if (BIT_CHECK(Mtm->disabledNodeMask,gtid.node-1)) {
352-
elog(WARNING,"Ignore transaction %llu from disabled node %d", (long64)gtid.xid,gtid.node);
351+
352+
if (!MtmCheckParticipants(&gtid,participantsMask)) {
353353
return false;
354354
}
355-
#endif
356355
SetCurrentStatementStartTimestamp();
357356
StartTransactionCommand();
358357
MtmJoinTransaction(&gtid,snapshot);

‎pglogical_proto.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
ReorderBufferTXN*txn)
120120
{
121121
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
122-
csn_tcsn=MtmTransactionSnapshot(txn->xid);
122+
nodemask_tparticipantsMask;
123+
csn_tcsn=MtmDistributedTransactionSnapshot(txn->xid,MtmReplicationNodeId,&participantsMask);
123124

124125
if (!isRecovery&&csn==INVALID_CSN) {
125126
MtmIsFilteredTxn= true;
@@ -136,6 +137,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
136137
pq_sendint(out,MtmNodeId,4);
137138
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
138139
pq_sendint64(out,csn);
140+
pq_sendint64(out,participantsMask);
139141

140142
MtmTransactionRecords=0;
141143
}
@@ -205,7 +207,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
205207
return;
206208
}
207209
}else {
208-
csn_tcsn=MtmTransactionSnapshot(txn->xid);
210+
nodemask_tpartisipantsMask;
211+
csn_tcsn=MtmDistributedTransactionSnapshot(txn->xid,MtmReplicationNodeId,&partisipantsMask);
209212
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
210213

211214
if (!isRecovery&&csn==INVALID_CSN&& (event!=PGLOGICAL_ABORT_PREPARED||txn->origin_id!=InvalidRepOriginId))

‎tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ if [ "$1" = 'postgres' ]; then
6969
multimaster.conn_strings = '$CONNSTRS'
7070
multimaster.heartbeat_recv_timeout = 1100
7171
multimaster.heartbeat_send_timeout = 250
72-
multimaster.min_2pc_timeout =100000
72+
multimaster.min_2pc_timeout =1000000
7373
EOF
7474

7575
cat$PGDATA/postgresql.conf

‎tests2/lib/bank_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
185185
# back to event loop and block it
186186
yieldfromasyncio.sleep(0.01)
187187
exceptBaseExceptionase:
188+
print('Catch exception: ',e)
188189
agg.finish_tx(str(e).strip())
189-
print('Catch exception ',str(e).strip())
190190
# Give evloop some free time.
191191
# In case of continuous excetions we can loop here without returning
192192
# back to event loop and block it

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp