Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc98402b

Browse files
committed
fix last problem with lost PRECOMMIT confirmation
1 parentaf0dd88 commitc98402b

File tree

6 files changed

+23
-32
lines changed

6 files changed

+23
-32
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,9 +1171,7 @@ void MtmPrecommitTransaction(char const* gid)
11711171
ts->status=TRANSACTION_STATUS_UNKNOWN;
11721172
ts->csn=MtmAssignCSN();
11731173
MtmAdjustSubtransactions(ts);
1174-
if (Mtm->status!=MTM_RECOVERY) {
1175-
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1176-
}
1174+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
11771175
MtmUnlock();
11781176
Assert(replorigin_session_origin!=InvalidRepOriginId);
11791177
if (!IsTransactionState()) {
@@ -1617,12 +1615,10 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
16171615
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
16181616

16191617
Assert(!MtmIsCoordinator(ts));/* All broadcasts are now done through logical decoding */
1620-
if (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)) {
1621-
MTM_LOG2("Send %s message to node %d xid=%d gid=%s",MtmMessageKindMnem[cmd],ts->gtid.node,ts->gtid.xid,ts->gid);
1622-
msg.node=ts->gtid.node;
1623-
msg.dxid=ts->gtid.xid;
1624-
MtmSendMessage(&msg);
1625-
}
1618+
MTM_TXTRACE(ts,"MtmSend2PCMessage sending %s message to node %d",MtmMessageKindMnem[cmd],ts->gtid.node);
1619+
msg.node=ts->gtid.node;
1620+
msg.dxid=ts->gtid.xid;
1621+
MtmSendMessage(&msg);
16261622
}
16271623

16281624
/*

‎contrib/mmts/multimaster.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414
#defineDEBUG_LEVEL 0
1515
#endif
1616

17-
#ifndefMTM_TRACE
18-
#defineMTM_TRACE 0
19-
#endif
2017

2118
#defineMTM_TAG "[MTM] "
2219
#defineMTM_ELOG(level,fmt,...) elog(level, MTM_TAG fmt, ## __VA_ARGS__)
@@ -44,12 +41,14 @@
4441
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4542
#endif
4643

47-
// #if MTM_TRACE == 0
48-
// #define MTM_TXTRACE(tx, event)
49-
// #else
50-
#defineMTM_TXTRACE(tx,event) \
51-
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
52-
// #endif
44+
// #define MTM_TRACE 1
45+
46+
#ifndefMTM_TRACE
47+
#defineMTM_TXTRACE(tx,event, ...)
48+
#else
49+
#defineMTM_TXTRACE(tx,event, ...) \
50+
fprintf(stderr, MTM_TAG "%s, %lld, %u " event "\n", tx->gid, (long long)MtmGetSystemTime(), MyProcPid, ## __VA_ARGS__)
51+
#endif
5352

5453
#defineMULTIMASTER_NAME "multimaster"
5554
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -388,7 +387,6 @@ extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, no
388387
externcsn_tMtmAssignCSN(void);
389388
externcsn_tMtmSyncClock(csn_tcsn);
390389
externvoidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tsnapshot,nodemask_tparticipantsMask);
391-
externvoidMtmReceiverStarted(intnodeId);
392390
externMtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown);
393391
externvoidMtmExecute(void*work,intsize);
394392
externvoidMtmExecutor(void*work,size_tsize);

‎contrib/mmts/pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -684,15 +684,15 @@ process_remote_commit(StringInfo in)
684684
{
685685
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
686686
strncpy(gid,pq_getmsgstring(in),sizeofgid);
687-
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s",MyProcPid,gid);
687+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
688688
MtmBeginSession(origin_node);
689689
MtmPrecommitTransaction(gid);
690690
MtmEndSession(origin_node, true);
691691
return;
692692
}
693693
casePGLOGICAL_COMMIT:
694694
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMITcommit",MyProcPid);
695+
MTM_LOG2("%d: PGLOGICAL_COMMIT%s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696696
if (IsTransactionState()) {
697697
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698698
MtmBeginSession(origin_node);
@@ -705,12 +705,12 @@ process_remote_commit(StringInfo in)
705705
{
706706
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
707707
strncpy(gid,pq_getmsgstring(in),sizeofgid);
708+
MTM_LOG2("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
708709
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
709710
MTM_LOG1("Avoid prepare of previously aborted global transaction %s",gid);
710711
AbortCurrentTransaction();
711712
}else {
712713
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
713-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
714714
BeginTransactionBlock(false);
715715
CommitTransactionCommand();
716716
StartTransactionCommand();
@@ -745,7 +745,7 @@ process_remote_commit(StringInfo in)
745745
*/
746746
Assert(csn);
747747
strncpy(gid,pq_getmsgstring(in),sizeofgid);
748-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%lld, gid=%s,lsn=%llx",csn,gid,end_lsn);
748+
MTM_LOG2("%d: PGLOGICAL_COMMIT_PREPARED%s,(%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
749749
MtmResetTransaction();
750750
StartTransactionCommand();
751751
MtmBeginSession(origin_node);

‎contrib/mmts/pglogical_receiver.c

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -515,14 +515,6 @@ pglogical_receiver_main(Datum main_arg)
515515
{
516516
intmsg_len=rc-hdr_len;
517517
stmt=copybuf+hdr_len;
518-
if (mode==REPLMODE_RECOVERED) {
519-
/* Ingore all incompleted transactions from recovered node */
520-
if (stmt[0]!='B') {
521-
output_written_lsn=Max(walEnd,output_written_lsn);
522-
continue;
523-
}
524-
mode=REPLMODE_OPEN_EXISTED;
525-
}
526518
MTM_LOG3("Receive message %c from node %d",stmt[0],nodeId);
527519
if (buf.used+msg_len+1 >=MtmTransSpillThreshold*MB) {
528520
if (spill_file<0) {

‎contrib/mmts/state.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ MtmCheckState(void)
112112
caseMTM_RECOVERY:
113113
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
114114
{
115-
//BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1); // kk trick
115+
BIT_SET(Mtm->originLockNodeMask,MtmNodeId-1);// kk trick
116116
MtmSetClusterStatus(MTM_RECOVERED);
117117
return;
118118
}
@@ -148,6 +148,7 @@ MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev)
148148
break;
149149

150150
caseMTM_NEIGHBOR_WAL_RECEIVER_START:
151+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);
151152
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id-1))
152153
BIT_SET(Mtm->pglogicalReceiverMask,node_id-1);
153154
break;

‎contrib/mmts/t/004_recovery.pl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,7 @@
7474
diag("Sums:$sum0,$sum1,$sum2");
7575
is($sum2,$sum0,"Check that sum_2 == sum_0");
7676
is($sum2,$sum1,"Check that sum_2 == sum_1");
77+
78+
$cluster->{nodes}->[0]->stop('fast');
79+
$cluster->{nodes}->[1]->stop('fast');
80+
$cluster->{nodes}->[2]->stop('fast');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp