Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite641c63

Browse files
knizhnikkelvich
authored andcommitted
Fix deadlock detection
1 parentcc0a054 commite641c63

File tree

4 files changed

+52
-30
lines changed

4 files changed

+52
-30
lines changed

‎multimaster.c

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,29 @@ MtmBeginTransaction(MtmCurrentTrans* x)
667667
}
668668
}
669669

670+
671+
staticMtmTransState*
672+
MtmCreateTransState(MtmCurrentTrans*x)
673+
{
674+
boolfound;
675+
MtmTransState*ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,&found);
676+
if (!found) {
677+
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
678+
ts->snapshot=x->snapshot;
679+
if (TransactionIdIsValid(x->gtid.xid)) {
680+
Assert(x->gtid.node!=MtmNodeId);
681+
ts->gtid=x->gtid;
682+
}else {
683+
/* I am coordinator of transaction */
684+
ts->gtid.xid=x->xid;
685+
ts->gtid.node=MtmNodeId;
686+
}
687+
}
688+
returnts;
689+
}
690+
691+
692+
670693
/*
671694
* Prepare transaction for two-phase commit.
672695
* This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
@@ -675,7 +698,7 @@ static void
675698
MtmPrePrepareTransaction(MtmCurrentTrans*x)
676699
{
677700
MtmTransState*ts;
678-
TransactionId*subxids;
701+
TransactionId*subxids;
679702

680703
if (!x->isDistributed) {
681704
return;
@@ -703,14 +726,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
703726
MtmCheckClusterLock();
704727
}
705728

706-
ts=hash_search(MtmXid2State,&x->xid,HASH_ENTER,NULL);
707-
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
729+
ts=MtmCreateTransState(x);
708730
/*
709731
* Invalid CSN prevent replication of transaction by logical replication
710732
*/
711733
ts->snapshot=x->isReplicated|| !x->containsDML ?INVALID_CSN :x->snapshot;
712734
ts->csn=MtmAssignCSN();
713-
ts->gtid=x->gtid;
714735
ts->procno=MyProc->pgprocno;
715736
ts->nVotes=1;/* I am voted myself */
716737
ts->votingCompleted= false;
@@ -722,15 +743,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
722743
x->csn=ts->csn;
723744

724745
Mtm->transCount+=1;
725-
726-
if (TransactionIdIsValid(x->gtid.xid)) {
727-
Assert(x->gtid.node!=MtmNodeId);
728-
ts->gtid=x->gtid;
729-
}else {
730-
/* I am coordinator of transaction */
731-
ts->gtid.xid=x->xid;
732-
ts->gtid.node=MtmNodeId;
733-
}
734746
MtmTransactionListAppend(ts);
735747
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
736748
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n",
@@ -844,7 +856,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
844856
MtmTransactionListAppend(ts);
845857
}
846858
MtmSendNotificationMessage(ts,MSG_ABORTED);/* send notification to coordinator */
847-
}
859+
}elseif (x->status==TRANSACTION_STATUS_ABORTED&&x->isReplicated&& !x->isPrepared) {
860+
hash_search(MtmXid2State,&x->xid,HASH_REMOVE,NULL);
861+
}
848862
MtmUnlock();
849863
}
850864
MtmResetTransaction(x);
@@ -868,28 +882,32 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
868882

869883
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
870884
{
885+
MtmTx.gtid=*gtid;
886+
MtmTx.xid=GetCurrentTransactionId();
887+
MtmTx.isReplicated= true;
888+
MtmTx.isDistributed= true;
889+
MtmTx.containsDML= true;
890+
871891
if (globalSnapshot!=INVALID_CSN) {
872892
MtmLock(LW_EXCLUSIVE);
873893
MtmSyncClock(globalSnapshot);
894+
MtmTx.snapshot=globalSnapshot;
895+
if (Mtm->status!=MTM_RECOVERY) {
896+
MtmCreateTransState(&MtmTx);/* we need local->remote xid mapping for deadlock detection */
897+
}
874898
MtmUnlock();
875899
}else {
876900
globalSnapshot=MtmTx.snapshot;
877901
}
878902
if (!TransactionIdIsValid(gtid->xid)) {
879903
/* In case of recovery InvalidTransactionId is passed */
880904
if (Mtm->status!=MTM_RECOVERY) {
881-
elog(PANIC,"Node %d tries to recover node %d which is in %s mode",MtmReplicationNodeId,MtmNodeId,MtmNodeStatusMnem[Mtm->status]);
905+
elog(PANIC,"Node %d tries to recover node %d which is in %s mode",gtid->node,MtmNodeId,MtmNodeStatusMnem[Mtm->status]);
882906
}
883907
}elseif (Mtm->status==MTM_RECOVERY) {
884908
/* When recovery is completed we get normal transaction ID and switch to normal mode */
885909
MtmRecoveryCompleted();
886910
}
887-
MtmTx.gtid=*gtid;
888-
MtmTx.xid=GetCurrentTransactionId();
889-
MtmTx.snapshot=globalSnapshot;
890-
MtmTx.isReplicated= true;
891-
MtmTx.isDistributed= true;
892-
MtmTx.containsDML= true;
893911
}
894912

895913
voidMtmSetCurrentTransactionGID(charconst*gid)

‎t/001_basic_recovery.pl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ sub PostgresNode::inet_connstr {
3131
my@nodes = ();
3232
my$pgconf_common =qq(
3333
listen_addresses = '127.0.0.1'
34-
max_prepared_transactions = 10
35-
max_worker_processes = 10
34+
max_prepared_transactions = 200
35+
max_connections = 200
36+
max_worker_processes = 100
3637
max_wal_senders = 10
3738
max_replication_slots = 10
3839
wal_level = logical
40+
wal_sender_timeout = 0
3941
shared_preload_libraries = 'raftable,multimaster'
40-
multimaster.workers=4
42+
multimaster.workers=10
4143
multimaster.queue_size=10485760 # 10mb
4244
);
4345

‎t/002_dtmbench.pl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ sub allocate_ports
6060
listen_addresses = '$host'
6161
unix_socket_directories = ''
6262
port =$pgport
63-
max_prepared_transactions = 1000
64-
max_worker_processes = 10
63+
max_prepared_transactions = 200
64+
max_connections = 200
65+
max_worker_processes = 100
6566
wal_level = logical
6667
fsync = off
6768
max_wal_senders = 10
6869
wal_sender_timeout = 0
6970
max_replication_slots = 10
7071
shared_preload_libraries = 'raftable,multimaster'
71-
multimaster.workers =4
72+
multimaster.workers =10
7273
multimaster.queue_size = 10485760 # 10mb
7374
multimaster.node_id =$id
7475
multimaster.conn_strings = '$mm_connstr'

‎t/003_pgbench.pl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,16 @@ sub allocate_ports
6262
listen_addresses = '$host'
6363
unix_socket_directories = ''
6464
port =$pgport
65-
max_prepared_transactions = 1000
66-
max_worker_processes = 10
65+
max_prepared_transactions = 200
66+
max_connections = 200
67+
max_worker_processes = 100
6768
wal_level = logical
6869
fsync = off
6970
max_wal_senders = 10
7071
wal_sender_timeout = 0
7172
max_replication_slots = 10
7273
shared_preload_libraries = 'raftable,multimaster'
73-
multimaster.workers =4
74+
multimaster.workers =10
7475
multimaster.queue_size = 10485760 # 10mb
7576
multimaster.node_id =$id
7677
multimaster.conn_strings = '$mm_connstr'

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp