Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8d58d05

Browse files
knizhnikkelvich
authored andcommitted
Rewrite origin LSN calculation
1 parentb69eae0 commit8d58d05

File tree

4 files changed

+58
-41
lines changed

4 files changed

+58
-41
lines changed

‎multimaster.c

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,16 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11331133
}
11341134
}
11351135

1136+
staticvoid
1137+
MtmLogAbortLogicalMessage(intnodeId,charconst*gid)
1138+
{
1139+
MtmAbortLogicalMessagemsg;
1140+
strcpy(msg.gid,gid);
1141+
msg.origin_node=nodeId;
1142+
msg.origin_lsn=replorigin_session_origin_lsn;
1143+
XLogFlush(LogLogicalMessage("A", (char*)&msg,sizeofmsg, false));
1144+
}
1145+
11361146
staticvoid
11371147
MtmEndTransaction(MtmCurrentTrans*x,boolcommit)
11381148
{
@@ -1154,7 +1164,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11541164
}
11551165
if (ts!=NULL) {
11561166
if (*ts->gid)
1157-
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ?"commit" :"rollback"),ts->gid,ts->status);
1167+
MTM_LOG1("TRANSLOG: %s transaction git=%s xid=%d node=%d dxid=%d status %d",
1168+
(commit ?"commit" :"rollback"),ts->gid,ts->xid,ts->gtid.node,ts->gtid.xid,ts->status);
11581169
if (commit) {
11591170
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
11601171
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
@@ -1213,7 +1224,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12131224
}
12141225
MtmTransactionListAppend(ts);
12151226
if (*x->gid) {
1216-
LogLogicalMessage("A",x->gid,strlen(x->gid)+1, false);
1227+
replorigin_session_origin_lsn=InvalidXLogRecPtr;
1228+
MtmLogAbortLogicalMessage(MtmNodeId,x->gid);
12171229
}
12181230
}
12191231
MtmSend2PCMessage(ts,MSG_ABORTED);/* send notification to coordinator */
@@ -2826,20 +2838,23 @@ void MtmReleaseRecoverySlot(int nodeId)
28262838
if (Mtm->recoverySlot==nodeId) {
28272839
Mtm->recoverySlot=0;
28282840
}
2829-
}
2841+
}
28302842

2831-
voidMtmRollbackPreparedTransaction(charconst*gid)
2843+
voidMtmRollbackPreparedTransaction(intnodeId,charconst*gid)
28322844
{
2833-
MTM_LOG1("Abort prepared transaction %s",gid);
2834-
if (MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED)==TRANSACTION_STATUS_UNKNOWN) {
2845+
XidStatusstatus=MtmExchangeGlobalTransactionStatus(gid,TRANSACTION_STATUS_ABORTED);
2846+
MTM_LOG1("Abort prepared transaction %s status %d",gid,status);
2847+
if (status==TRANSACTION_STATUS_UNKNOWN) {
28352848
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2",gid);
28362849
MtmResetTransaction();
28372850
StartTransactionCommand();
2838-
MtmBeginSession(MtmReplicationNodeId);
2851+
MtmBeginSession(nodeId);
28392852
MtmSetCurrentTransactionGID(gid);
28402853
FinishPreparedTransaction(gid, false);
28412854
CommitTransactionCommand();
2842-
MtmEndSession(MtmReplicationNodeId, true);
2855+
MtmEndSession(nodeId, true);
2856+
}elseif (status==TRANSACTION_STATUS_IN_PROGRESS) {
2857+
MtmLogAbortLogicalMessage(nodeId,gid);
28432858
}
28442859
}
28452860

@@ -3159,19 +3174,11 @@ bool MtmFilterTransaction(char* record, int size)
31593174
default:
31603175
break;
31613176
}
3162-
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLSN;
3177+
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3178+
duplicate=origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLSN;
31633179

3164-
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3165-
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
3166-
if (Mtm->status==MTM_RECOVERY) {
3167-
if (Mtm->nodes[origin_node-1].restartLSN<origin_lsn) {
3168-
Mtm->nodes[origin_node-1].restartLSN=origin_lsn;
3169-
}
3170-
}else {
3171-
if (Mtm->nodes[replication_node-1].restartLSN<end_lsn) {
3172-
Mtm->nodes[replication_node-1].restartLSN=end_lsn;
3173-
}
3174-
}
3180+
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3181+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLSN);
31753182
returnduplicate;
31763183
}
31773184

‎multimaster.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ typedef struct
149149
pgid_tgid;/* Global transaction identifier */
150150
}MtmArbiterMessage;
151151

152+
typedefstructMtmAbortLogicalMessage
153+
{
154+
pgid_tgid;
155+
intorigin_node;
156+
XLogRecPtrorigin_lsn;
157+
}MtmAbortLogicalMessage;
158+
152159
typedefstructMtmMessageQueue
153160
{
154161
MtmArbiterMessagemsg;
@@ -364,7 +371,7 @@ extern PGconn *PQconnectdb_safe(const char *conninfo);
364371
externvoidMtmBeginSession(intnodeId);
365372
externvoidMtmEndSession(intnodeId,boolunlock);
366373
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);
367-
externvoidMtmRollbackPreparedTransaction(charconst*gid);
374+
externvoidMtmRollbackPreparedTransaction(intnodeId,charconst*gid);
368375
externboolMtmFilterTransaction(char*record,intsize);
369376

370377
#endif

‎pglogical_apply.c

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,14 @@ process_remote_message(StringInfo s)
427427
}
428428
case'A':
429429
{
430-
MtmRollbackPreparedTransaction(messageBody);
430+
MtmAbortLogicalMessage*msg= (MtmAbortLogicalMessage*)messageBody;
431+
intorigin_node=msg->origin_node;
432+
Assert(messageSize==sizeof(MtmAbortLogicalMessage));
433+
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
434+
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
435+
}
436+
replorigin_session_origin_lsn=msg->origin_lsn;
437+
MtmRollbackPreparedTransaction(origin_node,msg->gid);
431438
standalone= true;
432439
break;
433440
}
@@ -597,17 +604,16 @@ process_remote_commit(StringInfo in)
597604

598605
/* read fields */
599606
pq_getmsgint64(in);/* commit_lsn */
600-
replorigin_session_origin_lsn=end_lsn=pq_getmsgint64(in);/* end_lsn */
607+
end_lsn=pq_getmsgint64(in);/* end_lsn */
601608
replorigin_session_origin_timestamp=pq_getmsgint64(in);/* commit_time */
602609

603610
origin_node=pq_getmsgbyte(in);
604611
origin_lsn=pq_getmsgint64(in);
605612

606-
if (origin_node!=MtmReplicationNodeId) {
607-
replorigin_advance(Mtm->nodes[origin_node-1].originId,origin_lsn,GetXLogInsertRecPtr(),
608-
false/* backward */ , false/* WAL */ );
613+
replorigin_session_origin_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
614+
if(Mtm->nodes[origin_node-1].restartLSN<replorigin_session_origin_lsn) {
615+
Mtm->nodes[origin_node-1].restartLSN=replorigin_session_origin_lsn;
609616
}
610-
611617
Assert(replorigin_session_origin==InvalidRepOriginId);
612618

613619
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -617,9 +623,9 @@ process_remote_commit(StringInfo in)
617623
MTM_LOG2("%d: PGLOGICAL_COMMIT commit",MyProcPid);
618624
if (IsTransactionState()) {
619625
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
620-
MtmBeginSession(MtmReplicationNodeId);
626+
MtmBeginSession(origin_node);
621627
CommitTransactionCommand();
622-
MtmEndSession(MtmReplicationNodeId, true);
628+
MtmEndSession(origin_node, true);
623629
}
624630
break;
625631
}
@@ -632,12 +638,12 @@ process_remote_commit(StringInfo in)
632638
AbortCurrentTransaction();
633639
}else {
634640
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
635-
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s",gid);
641+
MTM_LOG1("PGLOGICAL_PREPARE commit: gid=%s",gid);
636642
BeginTransactionBlock();
637643
CommitTransactionCommand();
638644
StartTransactionCommand();
639645

640-
MtmBeginSession(MtmReplicationNodeId);
646+
MtmBeginSession(origin_node);
641647
/* PREPARE itself */
642648
MtmSetCurrentTransactionGID(gid);
643649
PrepareTransactionBlock(gid);
@@ -650,7 +656,7 @@ process_remote_commit(StringInfo in)
650656
FinishPreparedTransaction(gid, false);
651657
CommitTransactionCommand();
652658
}
653-
MtmEndSession(MtmReplicationNodeId, true);
659+
MtmEndSession(origin_node, true);
654660
}
655661
break;
656662
}
@@ -659,22 +665,22 @@ process_remote_commit(StringInfo in)
659665
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
660666
csn=pq_getmsgint64(in);
661667
gid=pq_getmsgstring(in);
662-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld",csn,gid,end_lsn);
668+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%lx",csn,gid,end_lsn);
663669
MtmResetTransaction();
664670
StartTransactionCommand();
665-
MtmBeginSession(MtmReplicationNodeId);
671+
MtmBeginSession(origin_node);
666672
MtmSetCurrentTransactionCSN(csn);
667673
MtmSetCurrentTransactionGID(gid);
668674
FinishPreparedTransaction(gid, true);
669675
CommitTransactionCommand();
670-
MtmEndSession(MtmReplicationNodeId, true);
676+
MtmEndSession(origin_node, true);
671677
break;
672678
}
673679
casePGLOGICAL_ABORT_PREPARED:
674680
{
675681
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
676682
gid=pq_getmsgstring(in);
677-
MtmRollbackPreparedTransaction(gid);
683+
MtmRollbackPreparedTransaction(origin_node,gid);
678684
break;
679685
}
680686
default:

‎pglogical_receiver.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,12 +534,9 @@ pglogical_receiver_main(Datum main_arg)
534534
MtmSpillToFile(spill_file,buf.data,buf.used);
535535
ByteBufferReset(&buf);
536536
}
537-
if (stmt[0]=='M'&&stmt[1]=='L') {
538-
MTM_LOG3("Processdeadlock message from %d",nodeId);
537+
if (stmt[0]=='M'&&(stmt[1]=='L'||stmt[1]=='C'||stmt[1]=='A')) {
538+
MTM_LOG3("Process'%c' message from %d",stmt[1],nodeId);
539539
MtmExecutor(stmt,rc-hdr_len);
540-
}elseif (stmt[0]=='M'&&stmt[1]=='C') {
541-
MTM_LOG1("Process concurrent DDL message from %d",nodeId);
542-
MtmExecute(stmt,rc-hdr_len);
543540
}else {
544541
ByteBufferAppend(&buf,stmt,rc-hdr_len);
545542
if (stmt[0]=='C')/* commit */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp