Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9e43e7b

Browse files
knizhnikkelvich
authored andcommitted
Fix node locks
1 parentf59bef8 commit9e43e7b

File tree

5 files changed

+46
-22
lines changed

5 files changed

+46
-22
lines changed

‎multimaster.c

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -946,14 +946,20 @@ void MtmPrecommitTransaction(char const* gid)
946946
}else {
947947
MtmTransState*ts=tm->state;
948948
Assert(ts!=NULL);
949-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
950-
ts->status=TRANSACTION_STATUS_UNKNOWN;
951-
ts->csn=MtmAssignCSN();
952-
MtmAdjustSubtransactions(ts);
953-
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
954-
Assert(replorigin_session_origin!=InvalidRepOriginId);
955-
MtmUnlock();
956-
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
949+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
950+
ts->status=TRANSACTION_STATUS_UNKNOWN;
951+
ts->csn=MtmAssignCSN();
952+
MtmAdjustSubtransactions(ts);
953+
if (Mtm->status!=MTM_RECOVERY) {
954+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
955+
}
956+
MtmUnlock();
957+
Assert(replorigin_session_origin!=InvalidRepOriginId);
958+
SetPreparedTransactionState(ts->gid,MULTIMASTER_PRECOMMITTED);
959+
}else {
960+
elog(WARNING,"MtmPrecommitTransaction: transaction '%s' is already in %s state",gid,MtmTxnStatusMnem[ts->status]);
961+
MtmUnlock();
962+
}
957963
}
958964
}
959965
}
@@ -1038,7 +1044,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10381044
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10391045
elog(LOG,"Distributed transaction is not committed in %ld msec",USEC_TO_MSEC(now-start));
10401046
}else {
1041-
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(timeout));
1047+
elog(WARNING,"Commit of distributed transaction %s (%d) is canceled because of %ld msec timeout expiration",
1048+
ts->gid,ts->xid,USEC_TO_MSEC(timeout));
10421049
MtmAbortTransaction(ts);
10431050
break;
10441051
}
@@ -1383,15 +1390,15 @@ static voidMtmLoadPreparedTransactions(void)
13831390
if (!found) {
13841391
TransactionIdxid=GetNewTransactionId(false);
13851392
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_ENTER,&found);
1386-
MTM_LOG1("Recover prepared transaction %s xid %d",gid,xid);
1393+
MTM_LOG1("Recover prepared transaction %s xid=%d state=%s",gid,xid,pxacts[i].state_3pc);
13871394
MyPgXact->xid=InvalidTransactionId;/* dirty hack:((( */
13881395
Assert(!found);
13891396
Mtm->nActiveTransactions+=1;
13901397
ts->isEnqueued= false;
13911398
ts->isActive= true;
13921399
ts->status=strcmp(pxacts[i].state_3pc,MULTIMASTER_PRECOMMITTED)==0 ?TRANSACTION_STATUS_UNKNOWN :TRANSACTION_STATUS_IN_PROGRESS;
13931400
ts->isLocal= true;
1394-
ts->isPrepared=false;
1401+
ts->isPrepared=true;
13951402
ts->isPinned= false;
13961403
ts->snapshot=INVALID_CSN;
13971404
ts->isTwoPhase= false;
@@ -1491,6 +1498,10 @@ XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus new_stat
14911498
if (old_status!=TRANSACTION_STATUS_ABORTED) {
14921499
tm->status=new_status;
14931500
}
1501+
if (tm->state!=NULL&&old_status==TRANSACTION_STATUS_IN_PROGRESS) {
1502+
/* Return UNKNOWN to mark that transaction was prepared */
1503+
old_status=TRANSACTION_STATUS_UNKNOWN;
1504+
}
14941505
}else {
14951506
tm->state=NULL;
14961507
tm->status=new_status;
@@ -1605,7 +1616,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
16051616
MtmBroadcastPollMessage(ts);
16061617
}
16071618
}else {
1608-
MTM_LOG1("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
1619+
MTM_LOG2("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
16091620
ts->xid,ts->gid,MtmTxnStatusMnem[ts->status],ts->gtid.node,ts->gtid.xid,ts->votedMask);
16101621
}
16111622
}
@@ -1656,7 +1667,8 @@ void MtmRecoveryCompleted(void)
16561667
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
16571668
}
16581669
/* Mode will be changed to online once all logical receiver are connected */
1659-
MtmSwitchClusterMode(MTM_CONNECTED);
1670+
elog(LOG,"Recovery completed with %d active receivers from %d",Mtm->nReceivers,Mtm->nLiveNodes-1);
1671+
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1 ?MTM_ONLINE :MTM_CONNECTED);
16601672
MtmUnlock();
16611673
}
16621674

@@ -2549,7 +2561,7 @@ _PG_init(void)
25492561
0,
25502562
INT_MAX,
25512563
PGC_BACKEND,
2552-
0,
2564+
0,\
25532565
NULL,
25542566
NULL,
25552567
NULL
@@ -2894,6 +2906,7 @@ void MtmReceiverStarted(int nodeId)
28942906
MtmEnableNode(nodeId);
28952907
MtmCheckQuorum();
28962908
}
2909+
elog(LOG,"Start %d receivers from %d cluster status %s",Mtm->nReceivers+1,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
28972910
if (++Mtm->nReceivers==Mtm->nLiveNodes-1) {
28982911
if (Mtm->status==MTM_CONNECTED) {
28992912
MtmSwitchClusterMode(MTM_ONLINE);
@@ -3672,11 +3685,11 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
36723685
{
36733686
size_tlockGraphSize;
36743687
char*lockGraphData;
3675-
MtmLockNode(i+MtmMaxNodes,LW_SHARED);
3688+
MtmLockNode(i+1+MtmMaxNodes,LW_SHARED);
36763689
lockGraphSize=Mtm->nodes[i].lockGraphUsed;
36773690
lockGraphData=palloc(lockGraphSize);
36783691
memcpy(lockGraphData,Mtm->nodes[i].lockGraphData,lockGraphSize);
3679-
MtmUnlockNode(i+MtmMaxNodes);
3692+
MtmUnlockNode(i+1+MtmMaxNodes);
36803693

36813694
if (lockGraphData) {
36823695
GlobalTransactionId*gtid= (GlobalTransactionId*)lockGraphData;
@@ -4603,11 +4616,11 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
46034616
if (i+1!=MtmNodeId&& !BIT_CHECK(Mtm->disabledNodeMask,i)) {
46044617
size_tlockGraphSize;
46054618
void*lockGraphData;
4606-
MtmLockNode(i+MtmMaxNodes,LW_SHARED);
4619+
MtmLockNode(i+1+MtmMaxNodes,LW_SHARED);
46074620
lockGraphSize=Mtm->nodes[i].lockGraphUsed;
46084621
lockGraphData=palloc(lockGraphSize);
46094622
memcpy(lockGraphData,Mtm->nodes[i].lockGraphData,lockGraphSize);
4610-
MtmUnlockNode(i+MtmMaxNodes);
4623+
MtmUnlockNode(i+1+MtmMaxNodes);
46114624

46124625
if (lockGraphData==NULL) {
46134626
return true;

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ extern VacuumStmt* MtmVacuumStmt;
335335
externIndexStmt*MtmIndexStmt;
336336
externDropStmt*MtmDropStmt;
337337
externMemoryContextMtmApplyContext;
338+
externXLogRecPtrMtmSenderWalEnd;
339+
338340

339341
externvoidMtmArbiterInitialize(void);
340342
externvoidMtmStartReceivers(void);

‎pglogical_apply.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,14 +433,20 @@ process_remote_message(StringInfo s)
433433
/* This function is called directly by receiver, so there is no race condition and we can update
434434
* restartLSN without locks
435435
*/
436+
if (origin_node==MtmReplicationNodeId) {
437+
Assert(msg->origin_lsn==InvalidXLogRecPtr);
438+
msg->origin_lsn=MtmSenderWalEnd;
439+
}
436440
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
437441
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",origin_node,Mtm->nodes[origin_node-1].restartLSN,msg->origin_lsn);
438442
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
439443
replorigin_session_origin_lsn=msg->origin_lsn;
440444
MtmRollbackPreparedTransaction(origin_node,msg->gid);
441445
}else {
442-
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
443-
msg->gid,origin_node,msg->origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
446+
if (msg->origin_lsn!=InvalidXLogRecPtr) {
447+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
448+
msg->gid,origin_node,msg->origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
449+
}
444450
}
445451
standalone= true;
446452
break;

‎pglogical_receiver.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ static char worker_proc[BGW_MAXLEN];
5757

5858
/* Lastly written positions */
5959
staticXLogRecPtroutput_written_lsn=InvalidXLogRecPtr;
60+
XLogRecPtrMtmSenderWalEnd;
6061

6162
/* Stream functions */
6263
staticvoidfe_sendint64(int64i,char*buf);
@@ -510,6 +511,9 @@ pglogical_receiver_main(Datum main_arg)
510511
hdr_len+=8;/* WALEnd */
511512
hdr_len+=8;/* sendTime */
512513

514+
/* WAL position of the end of this message at WAL sender */
515+
MtmSenderWalEnd=walEnd;
516+
513517
/*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
514518

515519
Assert(rc >=hdr_len);

‎tests2/docker-entrypoint.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ if [ "$1" = 'postgres' ]; then
6868
multimaster.conn_strings = '$CONNSTRS'
6969
multimaster.heartbeat_recv_timeout = 1100
7070
multimaster.heartbeat_send_timeout = 250
71-
multimaster.twopc_min_timeout = 50000
72-
multimaster.min_2pc_timeout = 50000
71+
multimaster.min_2pc_timeout = 2000
7372
EOF
7473

7574
cat$PGDATA/postgresql.conf

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp