Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit12d2bc4

Browse files
committed
Merge branch 'master' into more_tests
2 parentsd7e483a +20afa29 commit12d2bc4

File tree

5 files changed

+61
-61
lines changed

5 files changed

+61
-61
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ static void MtmSetSocketOptions(int sd)
314314
staticvoidMtmCheckResponse(MtmArbiterMessage*resp)
315315
{
316316
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)) {
317-
elog(WARNING,"Node %d thinks that I was dead, while I am %s",resp->node,MtmNodeStatusMnem[Mtm->status]);
317+
elog(WARNING,"Node %d thinks that I was dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],messageKindText[resp->code]);
318318
if (Mtm->status!=MTM_RECOVERY) {
319319
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
320320
MtmSwitchClusterMode(MTM_RECOVERY);
@@ -918,7 +918,7 @@ static void MtmReceiver(Datum arg)
918918
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
919919
elog(LOG,"Abort transaction %s because it is in state %d at node %d",
920920
msg->gid,ts->status,node);
921-
MtmFinishPreparedTransaction(node,ts, false);
921+
MtmFinishPreparedTransaction(ts, false);
922922
}
923923
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
924924
{
@@ -928,7 +928,7 @@ static void MtmReceiver(Datum arg)
928928
}
929929
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
930930
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
931-
MtmFinishPreparedTransaction(node,ts, true);
931+
MtmFinishPreparedTransaction(ts, true);
932932
}
933933
}else {
934934
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",

‎contrib/mmts/multimaster.c‎

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11041104
staticvoid
11051105
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
11061106
{
1107-
MTM_LOG1("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
1107+
MTM_LOG2("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
11081108
MyProcPid,x->xid,x->isPrepared,x->isReplicated,x->isDistributed,x->isTwoPhase,x->gid,commit ?"commit" :"abort");
11091109
if (x->status!=TRANSACTION_STATUS_ABORTED&&x->isDistributed&& (x->isPrepared||x->isReplicated)&& !x->isTwoPhase) {
11101110
MtmTransState*ts=NULL;
@@ -1122,7 +1122,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11221122
}
11231123
if (ts!=NULL) {
11241124
if (*ts->gid)
1125-
MTM_LOG1("TRANSLOG: %s transaction %s status %d", (commit ?"commit" :"rollback"),ts->gid,ts->status);
1125+
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ?"commit" :"rollback"),ts->gid,ts->status);
11261126
if (commit) {
11271127
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
11281128
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
@@ -1177,6 +1177,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11771177
Mtm->nActiveTransactions-=1;
11781178
}
11791179
MtmTransactionListAppend(ts);
1180+
if (*x->gid) {
1181+
LogLogicalMessage("A",x->gid,strlen(x->gid)+1, false);
1182+
}
11801183
}
11811184
MtmSend2PCMessage(ts,MSG_ABORTED);/* send notification to coordinator */
11821185
}elseif (x->status==TRANSACTION_STATUS_ABORTED&&x->isReplicated&& !x->isPrepared) {
@@ -1230,7 +1233,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12301233
MtmSendMessage(&msg);
12311234
}
12321235
}
1233-
}else {
1236+
}elseif (!BIT_CHECK(Mtm->disabledNodeMask,ts->gtid.node-1)){
12341237
msg.node=ts->gtid.node;
12351238
msg.dxid=ts->gtid.xid;
12361239
MtmSendMessage(&msg);
@@ -1436,7 +1439,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14361439
Assert(ts->gid[0]);
14371440
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
14381441
elog(LOG,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d",ts->gid,MtmNodeId);
1439-
MtmFinishPreparedTransaction(disabledNodeId,ts, false);
1442+
MtmFinishPreparedTransaction(ts, false);
14401443
}else {
14411444
MTM_LOG1("Poll state of transaction %d (%s)",ts->xid,ts->gid);
14421445
MtmBroadcastPollMessage(ts);
@@ -1459,7 +1462,9 @@ static void MtmDisableNode(int nodeId)
14591462
if (nodeId!=MtmNodeId) {
14601463
Mtm->nLiveNodes-=1;
14611464
}
1465+
MtmUnlock();
14621466
MtmPollStatusOfPreparedTransactions(nodeId);
1467+
MtmLock(LW_EXCLUSIVE);
14631468
}
14641469

14651470
staticvoidMtmEnableNode(intnodeId)
@@ -2780,34 +2785,41 @@ void MtmReleaseRecoverySlot(int nodeId)
27802785
}
27812786
}
27822787

2783-
voidMtmFinishPreparedTransaction(intnodeId,MtmTransState*ts,boolcommit)
2788+
voidMtmRollbackPreparedTransaction(charconst*gid)
2789+
{
2790+
MTM_LOG1("Abort prepared transaction %s",gid);
2791+
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
2792+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
2793+
MtmResetTransaction();
2794+
StartTransactionCommand();
2795+
MtmBeginSession(MtmReplicationNodeId);
2796+
MtmSetCurrentTransactionGID(gid);
2797+
FinishPreparedTransaction(gid, false);
2798+
CommitTransactionCommand();
2799+
MtmEndSession(MtmReplicationNodeId, true);
2800+
}
2801+
}
2802+
2803+
2804+
voidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit)
27842805
{
27852806
Assert(ts->votingCompleted);
27862807
Assert(!IsTransactionState());
27872808
MtmResetTransaction();
27882809
StartTransactionCommand();
2789-
MtmBeginSession(nodeId);
2810+
if (Mtm->nodes[MtmNodeId-1].originId==InvalidRepOriginId) {
2811+
/* This dummy origin is used for local commits/aborts which should not be replicated */
2812+
Mtm->nodes[MtmNodeId-1].originId=replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN,MtmNodeId));
2813+
}
2814+
MtmBeginSession(MtmNodeId);
27902815
MtmSetCurrentTransactionCSN(ts->csn);
27912816
MtmSetCurrentTransactionGID(ts->gid);
27922817
FinishPreparedTransaction(ts->gid,commit);
27932818
CommitTransactionCommand();
2794-
MtmEndSession(nodeId, true);
2819+
MtmEndSession(MtmNodeId, true);
27952820
Assert(ts->status==commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED);
27962821
}
27972822

2798-
#if0
2799-
staticvoidMtmFinishAllPreparedTransactions(void)
2800-
{
2801-
MtmTransState*ts;
2802-
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
2803-
if (ts->status!=TRANSACTION_STATUS_COMMITTED&&ts->status!=TRANSACTION_STATUS_ABORTED) {
2804-
MtmFinishPreparedTransaction(MtmReplicationNodeId,ts, false);
2805-
}
2806-
}
2807-
}
2808-
#endif
2809-
2810-
28112823
/*
28122824
* Determine when and how we should open replication slot.
28132825
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
@@ -2841,11 +2853,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28412853
Mtm->nodes[i].restartLsn=InvalidXLogRecPtr;
28422854
}
28432855
MtmUnlock();
2844-
#if0
2845-
MtmBeginSession(MtmReplicationNodeId);
2846-
FinishAllPreparedTransactions(false);
2847-
MtmEndSession(MtmReplicationNodeId, true);
2848-
#endif
28492856
returnREPLMODE_RECOVERY;
28502857
}
28512858
}

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,6 @@ extern void MtmReleaseRecoverySlot(int nodeId);
360360
externPGconn*PQconnectdb_safe(constchar*conninfo);
361361
externvoidMtmBeginSession(intnodeId);
362362
externvoidMtmEndSession(intnodeId,boolunlock);
363-
externvoidMtmFinishPreparedTransaction(intnodeId,MtmTransState*ts,boolcommit);
364-
363+
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);
364+
externvoidMtmRollbackPreparedTransaction(charconst*gid);
365365
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,12 @@ process_remote_message(StringInfo s)
425425
}
426426
break;
427427
}
428+
case'A':
429+
{
430+
MtmRollbackPreparedTransaction(messageBody);
431+
standalone= true;
432+
break;
433+
}
428434
case'L':
429435
{
430436
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
@@ -590,8 +596,8 @@ process_remote_commit(StringInfo in)
590596
MtmReplicationNodeId=pq_getmsgbyte(in);
591597

592598
/* read fields */
593-
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
594-
end_lsn=pq_getmsgint64(in);/* end_lsn */
599+
pq_getmsgint64(in);/* commit_lsn */
600+
replorigin_session_origin_lsn=end_lsn=pq_getmsgint64(in);/* end_lsn */
595601
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
596602

597603
origin_node=pq_getmsgbyte(in);
@@ -609,6 +615,7 @@ process_remote_commit(StringInfo in)
609615
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
610616
MtmBeginSession(MtmReplicationNodeId);
611617
CommitTransactionCommand();
618+
MtmEndSession(MtmReplicationNodeId, true);
612619
}
613620
break;
614621
}
@@ -639,6 +646,7 @@ process_remote_commit(StringInfo in)
639646
FinishPreparedTransaction(gid, false);
640647
CommitTransactionCommand();
641648
}
649+
MtmEndSession(MtmReplicationNodeId, true);
642650
}
643651
break;
644652
}
@@ -655,34 +663,19 @@ process_remote_commit(StringInfo in)
655663
MtmSetCurrentTransactionGID(gid);
656664
FinishPreparedTransaction(gid, true);
657665
CommitTransactionCommand();
666+
MtmEndSession(MtmReplicationNodeId, true);
658667
break;
659668
}
660669
casePGLOGICAL_ABORT_PREPARED:
661670
{
662671
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
663672
gid=pq_getmsgstring(in);
664-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s",gid);
665-
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
666-
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
667-
MtmResetTransaction();
668-
StartTransactionCommand();
669-
MtmBeginSession(MtmReplicationNodeId);
670-
MtmSetCurrentTransactionGID(gid);
671-
FinishPreparedTransaction(gid, false);
672-
CommitTransactionCommand();
673-
}
673+
MtmRollbackPreparedTransaction(gid);
674674
break;
675675
}
676676
default:
677677
Assert(false);
678678
}
679-
#if0/* Do ont need to advance slot position here: it will be done by transaction commit */
680-
if (replorigin_session_origin!=InvalidRepOriginId) {
681-
replorigin_advance(replorigin_session_origin,end_lsn,
682-
XactLastCommitEnd, false, false);
683-
}
684-
#endif
685-
MtmEndSession(MtmReplicationNodeId, true);
686679
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
687680
if (flags&PGLOGICAL_CAUGHT_UP) {
688681
MtmRecoveryCompleted();

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,29 +133,29 @@ static void
133133
pglogical_write_message(StringInfoout,
134134
constchar*prefix,Sizesz,constchar*message)
135135
{
136-
if (*prefix=='L')
137-
{
138-
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
139-
}
140-
elseif (*prefix=='D')
141-
{
142-
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
143-
{
136+
switch (*prefix) {
137+
case'L':
138+
if (MtmIsRecoveredNode(MtmReplicationNodeId)) {
139+
return;
140+
}else {
141+
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
142+
}
143+
break;
144+
case'D':
145+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
144146
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);
145147
return;
146148
}
147149
DDLInProress= true;
148-
}
149-
elseif (*prefix=='E')
150-
{
150+
break;
151+
case'E':
151152
DDLInProress= false;
152153
/*
153154
* we use End message only as indicator of DDL transaction finish,
154155
* so no need to send that to replicas.
155156
*/
156157
return;
157158
}
158-
159159
pq_sendbyte(out,'M');
160160
pq_sendbyte(out,*prefix);
161161
pq_sendint(out,sz,4);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp