Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6d61a95

Browse files
knizhnikkelvich
authored andcommitted
Fix unreleased lock in GetPreparedTransactionState
1 parenta0e253b commit6d61a95

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

‎multimaster.c

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,10 @@ void MtmRecoveryCompleted(void)
17731773
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
17741774
MtmNodeId, (long long)Mtm->disabledNodeMask,
17751775
(long long)SELF_CONNECTIVITY_MASK,GetXLogInsertRecPtr(),Mtm->nLiveNodes);
1776+
if (Mtm->nAllNodes >=3) {
1777+
elog(WARNING,"restartLSNs at the end of recovery: {%lx, %lx, %lx}",
1778+
Mtm->nodes[0].restartLSN,Mtm->nodes[1].restartLSN,Mtm->nodes[2].restartLSN);
1779+
}
17761780
MtmLock(LW_EXCLUSIVE);
17771781
Mtm->recoverySlot=0;
17781782
Mtm->recoveredLSN=GetXLogInsertRecPtr();
@@ -3244,7 +3248,12 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32443248
||Mtm->recoverySlot==nodeId)
32453249
{
32463250
/* Choose for recovery first available slot or slot of donor node (if any) */
3247-
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
3251+
if (Mtm->nAllNodes >=3) {
3252+
elog(WARNING,"Process %d starts recovery from node %d restartLSNs={%lx, %lx, %lx}",
3253+
MyProcPid,nodeId,Mtm->nodes[0].restartLSN,Mtm->nodes[1].restartLSN,Mtm->nodes[2].restartLSN);
3254+
}else {
3255+
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
3256+
}
32483257
Mtm->recoverySlot=nodeId;
32493258
Mtm->nReceivers=0;
32503259
Mtm->nSenders=0;
@@ -3383,7 +3392,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
33833392
sscanf(strVal(elem->arg),"%lx",&recoveredLSN);
33843393
MTM_LOG1("Recovered position of node %d is %lx",MtmReplicationNodeId,recoveredLSN);
33853394
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
3386-
MTM_LOG2("[restartlsn]node %d:%lx-> %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
3395+
MTM_LOG1("Advance restartLSN fornode %d from%lxto %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
33873396
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN==InvalidXLogRecPtr
33883397
||recoveredLSN<Mtm->nodes[MtmReplicationNodeId-1].restartLSN+MtmMaxRecoveryLag);
33893398
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
@@ -3587,9 +3596,13 @@ bool MtmFilterTransaction(char* record, int size)
35873596
origin_node=pq_getmsgbyte(&s);
35883597
origin_lsn=pq_getmsgint64(&s);
35893598

3590-
Assert(replication_node==MtmReplicationNodeId&&
3591-
origin_node!=0&&
3592-
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
3599+
Assert(replication_node==MtmReplicationNodeId);
3600+
if (!(origin_node!=0&&
3601+
(Mtm->status==MTM_RECOVERY||origin_node==replication_node)))
3602+
{
3603+
elog(WARNING,"Receive redirected commit event %d from node %d origin node %d origin LSN %lx in %s mode",
3604+
event,replication_node,origin_node,origin_lsn,MtmNodeStatusMnem[Mtm->status]);
3605+
}
35933606

35943607
switch (event)
35953608
{
@@ -3616,8 +3629,8 @@ bool MtmFilterTransaction(char* record, int size)
36163629
}
36173630

36183631
if (duplicate) {
3619-
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx)",
3620-
gid,replication_node,event,Mtm->nodes[origin_node-1].restartLSN,origin_node,restart_lsn,end_lsn,origin_lsn);
3632+
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx) mode %s",
3633+
gid,replication_node,event,Mtm->nodes[origin_node-1].restartLSN,origin_node,restart_lsn,end_lsn,origin_lsn,MtmNodeStatusMnem[Mtm->status]);
36213634
}else {
36223635
MTM_LOG2("Apply transaction %s from node %d lsn %lx, event=%x, origin node %d, original lsn=%lx, current lsn=%lx",
36233636
gid,replication_node,end_lsn,event,origin_node,origin_lsn,restart_lsn);

‎pglogical_receiver.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ pglogical_receiver_main(Datum main_arg)
342342
* Them are either empty, either new node is synchronized using base_backup.
343343
* So we assume that LSNs are the same for local and remote node
344344
*/
345-
originStartPos=Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
345+
originStartPos=(Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId) ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
346346
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
347347
}else {
348348
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
349-
MTM_LOG2("[restartlsn]node %d: %lx-> %lx (pglogical_receiver_mains)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
349+
MTM_LOG1("Advance restartLSN fornode %d:from%lxto %lx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
350350
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
351351
}
352352
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
@@ -545,16 +545,17 @@ pglogical_receiver_main(Datum main_arg)
545545
}
546546
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C'))) {
547547
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
548-
if (stmt[1]=='C') {/* concurrent DDL */
548+
if (stmt[0]=='M'&&stmt[1]=='C') {/* concurrent DDL should be executed by parallel workers */
549549
MtmExecute(stmt,rc-hdr_len);
550550
}else {
551-
MtmExecutor(stmt,rc-hdr_len);
551+
MtmExecutor(stmt,rc-hdr_len);/* all other messages can be processed by receiver itself */
552552
}
553553
}else {
554554
ByteBufferAppend(&buf,stmt,rc-hdr_len);
555555
if (stmt[0]=='C')/* commit */
556556
{
557-
if (!MtmFilterTransaction(stmt,rc-hdr_len)) {
557+
if (!MtmFilterTransaction(stmt,rc-hdr_len))
558+
{
558559
if (spill_file >=0) {
559560
ByteBufferAppend(&buf,")",1);
560561
pq_sendbyte(&spill_info,'(');
@@ -574,6 +575,7 @@ pglogical_receiver_main(Datum main_arg)
574575
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
575576
}
576577
}else {
578+
Assert(stmt[1]==PGLOGICAL_PREPARE||stmt[1]==PGLOGICAL_COMMIT);/* all other commits should be applied in place */
577579
MtmExecute(buf.data,buf.used);
578580
}
579581
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp