Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit04648d8

Browse files
committed
Fix logical abort stuff.
1 parent86afc8d commit04648d8

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

‎multimaster.c

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
11461146
strcpy(msg.gid,gid);
11471147
msg.origin_node=nodeId;
11481148
msg.origin_lsn=replorigin_session_origin_lsn;
1149+
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)",nodeId,gid);
11491150
XLogFlush(LogLogicalMessage("A", (char*)&msg,sizeofmsg, false));
11501151
}
11511152

@@ -1234,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12341235
MtmTransactionListAppend(ts);
12351236
if (*x->gid) {
12361237
replorigin_session_origin_lsn=InvalidXLogRecPtr;
1238+
MTM_TXTRACE(x,"MtmEndTransaction/MtmLogAbortLogicalMessage");
12371239
MtmLogAbortLogicalMessage(MtmNodeId,x->gid);
12381240
}
12391241
}
@@ -2888,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28882890
CommitTransactionCommand();
28892891
MtmEndSession(nodeId, true);
28902892
}elseif (status==TRANSACTION_STATUS_IN_PROGRESS) {
2893+
MtmBeginSession(nodeId);
28912894
MtmLogAbortLogicalMessage(nodeId,gid);
2895+
MtmEndSession(nodeId, true);
28922896
}
28932897
}
28942898

@@ -3055,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30553059
sscanf(strVal(elem->arg),"%lx",&recoveredLSN);
30563060
MTM_LOG1("Recovered position of node %d is %lx",MtmReplicationNodeId,recoveredLSN);
30573061
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
3062+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
30583063
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
30593064
}
30603065
}else {
@@ -3220,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
32203225
}
32213226
restart_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
32223227
if (Mtm->nodes[origin_node-1].restartLSN<restart_lsn) {
3228+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,restart_lsn);
32233229
Mtm->nodes[origin_node-1].restartLSN=restart_lsn;
32243230
}else {
32253231
duplicate= true;
32263232
}
32273233

32283234
if (duplicate) {
3229-
MTM_LOG1("Ignore transaction %s from node %dlsn%lx, flags=%x,origin node %d, original lsn=%lx,current lsn=%lx",
3230-
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
3235+
MTM_LOG1("Ignore transaction %s from node %dflags=%x, our restartLSN for node:%lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx,origin_lsn=%lx",
3236+
gid,replication_node,flags,Mtm->nodes[origin_node-1].restartLSN,origin_node,MtmReplicationNodeId,end_lsn,origin_lsn);
32313237
}else {
32323238
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
32333239
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
32343240
}
3241+
32353242
returnduplicate;
32363243
}
32373244

@@ -4137,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
41374144

41384145
caseT_VacuumStmt:
41394146
skipCommand= true;
4140-
if (context==PROCESS_UTILITY_TOPLEVEL) {
4141-
MtmProcessDDLCommand(queryString, false, true);
4142-
MtmTx.isDistributed= false;
4143-
}elseif (MtmApplyContext!=NULL) {
4144-
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
4145-
Assert(oldContext!=MtmApplyContext);
4146-
MtmVacuumStmt= (VacuumStmt*)copyObject(parsetree);
4147-
MemoryContextSwitchTo(oldContext);
4148-
return;
4149-
}
4147+
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4148+
// MtmProcessDDLCommand(queryString, false, true);
4149+
// MtmTx.isDistributed = false;
4150+
// } else if (MtmApplyContext != NULL) {
4151+
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152+
// Assert(oldContext != MtmApplyContext);
4153+
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154+
// MemoryContextSwitchTo(oldContext);
4155+
// return;
4156+
// }
41504157
break;
41514158

41524159
caseT_CreateDomainStmt:
@@ -4241,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42414248
if (indexStmt->concurrent)
42424249
{
42434250
if (context==PROCESS_UTILITY_TOPLEVEL) {
4244-
MtmProcessDDLCommand(queryString, false, true);
4251+
// MtmProcessDDLCommand(queryString, false, true);
42454252
MtmTx.isDistributed= false;
42464253
skipCommand= true;
42474254
/*
@@ -4268,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42684275
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent)
42694276
{
42704277
if (context==PROCESS_UTILITY_TOPLEVEL) {
4271-
MtmProcessDDLCommand(queryString, false, true);
4278+
//MtmProcessDDLCommand(queryString, false, true);
42724279
MtmTx.isDistributed= false;
42734280
skipCommand= true;
42744281
}elseif (MtmApplyContext!=NULL) {

‎pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ process_remote_message(StringInfo s)
434434
* restartLSN without locks
435435
*/
436436
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
437+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",origin_node,Mtm->nodes[origin_node-1].restartLSN,msg->origin_lsn);
437438
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
438439
replorigin_session_origin_lsn=msg->origin_lsn;
439440
MtmRollbackPreparedTransaction(origin_node,msg->gid);

‎pglogical_receiver.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ pglogical_receiver_main(Datum main_arg)
338338
}else {
339339
originStartPos=replorigin_get_progress(originId, false);
340340
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
341+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
341342
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
342343
}
343344
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp