Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5c2ee51

Browse files
committed
fix last problem with lost PRECOMMIT confirmation
1 parent2fe7732 commit5c2ee51

File tree

6 files changed

+23
-32
lines changed

6 files changed

+23
-32
lines changed

‎multimaster.c

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,9 +1179,7 @@ void MtmPrecommitTransaction(char const* gid)
11791179
ts->status=TRANSACTION_STATUS_UNKNOWN;
11801180
ts->csn=MtmAssignCSN();
11811181
MtmAdjustSubtransactions(ts);
1182-
if (Mtm->status!=MTM_RECOVERY) {
1183-
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1184-
}
1182+
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
11851183
MtmUnlock();
11861184
Assert(replorigin_session_origin!=InvalidRepOriginId);
11871185
if (!IsTransactionState()) {
@@ -1625,12 +1623,10 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
16251623
memcpy(msg.gid,ts->gid,MULTIMASTER_MAX_GID_SIZE);
16261624

16271625
Assert(!MtmIsCoordinator(ts));/* All broadcasts are now done through logical decoding */
1628-
if (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)) {
1629-
MTM_LOG2("Send %s message to node %d xid=%d gid=%s",MtmMessageKindMnem[cmd],ts->gtid.node,ts->gtid.xid,ts->gid);
1630-
msg.node=ts->gtid.node;
1631-
msg.dxid=ts->gtid.xid;
1632-
MtmSendMessage(&msg);
1633-
}
1626+
MTM_TXTRACE(ts,"MtmSend2PCMessage sending %s message to node %d",MtmMessageKindMnem[cmd],ts->gtid.node);
1627+
msg.node=ts->gtid.node;
1628+
msg.dxid=ts->gtid.xid;
1629+
MtmSendMessage(&msg);
16341630
}
16351631

16361632
/*

‎multimaster.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414
#defineDEBUG_LEVEL 0
1515
#endif
1616

17-
#ifndefMTM_TRACE
18-
#defineMTM_TRACE 0
19-
#endif
2017

2118
#defineMTM_TAG "[MTM] "
2219
#defineMTM_ELOG(level,fmt,...) elog(level, MTM_TAG fmt, ## __VA_ARGS__)
@@ -44,12 +41,14 @@
4441
#defineMTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4542
#endif
4643

47-
// #if MTM_TRACE == 0
48-
// #define MTM_TXTRACE(tx, event)
49-
// #else
50-
#defineMTM_TXTRACE(tx,event) \
51-
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
52-
// #endif
44+
// #define MTM_TRACE 1
45+
46+
#ifndefMTM_TRACE
47+
#defineMTM_TXTRACE(tx,event, ...)
48+
#else
49+
#defineMTM_TXTRACE(tx,event, ...) \
50+
fprintf(stderr, MTM_TAG "%s, %lld, %u " event "\n", tx->gid, (long long)MtmGetSystemTime(), MyProcPid, ## __VA_ARGS__)
51+
#endif
5352

5453
#defineMULTIMASTER_NAME "multimaster"
5554
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -390,7 +389,6 @@ extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, no
390389
externcsn_tMtmAssignCSN(void);
391390
externcsn_tMtmSyncClock(csn_tcsn);
392391
externvoidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tsnapshot,nodemask_tparticipantsMask);
393-
externvoidMtmReceiverStarted(intnodeId);
394392
externMtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown);
395393
externvoidMtmExecute(void*work,intsize);
396394
externvoidMtmExecutor(void*work,size_tsize);

‎pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -684,15 +684,15 @@ process_remote_commit(StringInfo in)
684684
{
685685
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
686686
strncpy(gid,pq_getmsgstring(in),sizeofgid);
687-
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s",MyProcPid,gid);
687+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
688688
MtmBeginSession(origin_node);
689689
MtmPrecommitTransaction(gid);
690690
MtmEndSession(origin_node, true);
691691
return;
692692
}
693693
casePGLOGICAL_COMMIT:
694694
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMITcommit",MyProcPid);
695+
MTM_LOG2("%d: PGLOGICAL_COMMIT%s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
696696
if (IsTransactionState()) {
697697
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698698
MtmBeginSession(origin_node);
@@ -705,12 +705,12 @@ process_remote_commit(StringInfo in)
705705
{
706706
Assert(IsTransactionState()&&TransactionIdIsValid(MtmGetCurrentTransactionId()));
707707
strncpy(gid,pq_getmsgstring(in),sizeofgid);
708+
MTM_LOG2("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
708709
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_IN_PROGRESS)==TRANSACTION_STATUS_ABORTED) {
709710
MTM_LOG1("Avoid prepare of previously aborted global transaction %s",gid);
710711
AbortCurrentTransaction();
711712
}else {
712713
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
713-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
714714
BeginTransactionBlock(false);
715715
CommitTransactionCommand();
716716
StartTransactionCommand();
@@ -745,7 +745,7 @@ process_remote_commit(StringInfo in)
745745
*/
746746
Assert(csn);
747747
strncpy(gid,pq_getmsgstring(in),sizeofgid);
748-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%lld, gid=%s,lsn=%llx",csn,gid,end_lsn);
748+
MTM_LOG2("%d: PGLOGICAL_COMMIT_PREPARED%s,(%llx,%llx,%llx)",MyProcPid,gid,commit_lsn,end_lsn,origin_lsn);
749749
MtmResetTransaction();
750750
StartTransactionCommand();
751751
MtmBeginSession(origin_node);

‎pglogical_receiver.c

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -519,14 +519,6 @@ pglogical_receiver_main(Datum main_arg)
519519
{
520520
intmsg_len=rc-hdr_len;
521521
stmt=copybuf+hdr_len;
522-
if (mode==REPLMODE_RECOVERED) {
523-
/* Ingore all incompleted transactions from recovered node */
524-
if (stmt[0]!='B') {
525-
output_written_lsn=Max(walEnd,output_written_lsn);
526-
continue;
527-
}
528-
mode=REPLMODE_OPEN_EXISTED;
529-
}
530522
MTM_LOG3("Receive message %c from node %d",stmt[0],nodeId);
531523
if (buf.used+msg_len+1 >=MtmTransSpillThreshold*MB) {
532524
if (spill_file<0) {

‎state.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ MtmCheckState(void)
112112
caseMTM_RECOVERY:
113113
if (!BIT_CHECK(Mtm->disabledNodeMask,MtmNodeId-1))
114114
{
115-
//BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1); // kk trick
115+
BIT_SET(Mtm->originLockNodeMask,MtmNodeId-1);// kk trick
116116
MtmSetClusterStatus(MTM_RECOVERED);
117117
return;
118118
}
@@ -148,6 +148,7 @@ MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev)
148148
break;
149149

150150
caseMTM_NEIGHBOR_WAL_RECEIVER_START:
151+
BIT_CLEAR(Mtm->originLockNodeMask,MtmNodeId-1);
151152
if (!BIT_CHECK(Mtm->pglogicalReceiverMask,node_id-1))
152153
BIT_SET(Mtm->pglogicalReceiverMask,node_id-1);
153154
break;

‎t/004_recovery.pl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,7 @@
7474
diag("Sums:$sum0,$sum1,$sum2");
7575
is($sum2,$sum0,"Check that sum_2 == sum_0");
7676
is($sum2,$sum1,"Check that sum_2 == sum_1");
77+
78+
$cluster->{nodes}->[0]->stop('fast');
79+
$cluster->{nodes}->[1]->stop('fast');
80+
$cluster->{nodes}->[2]->stop('fast');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp