Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5a2f9b3

Browse files
committed
2 parentsc857a98 +3b2518a commit5a2f9b3

File tree

8 files changed

+47
-27
lines changed

8 files changed

+47
-27
lines changed

‎contrib/mmts/arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ static void MtmSendHeartbeat()
369369
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
370370
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
371371
}else {
372-
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
372+
MTM_LOG4("Send heartbeat to node %d with timestamp %ld",i+1,now);
373373
}
374374
}else {
375375
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
@@ -897,7 +897,7 @@ static void MtmReceiver(Datum arg)
897897

898898
switch (msg->code) {
899899
caseMSG_HEARTBEAT:
900-
MTM_LOG2("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
900+
MTM_LOG4("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
901901
node,msg->csn,USEC_TO_MSEC(MtmGetSystemTime()-msg->csn));
902902
continue;
903903
caseMSG_POLL_REQUEST:

‎contrib/mmts/multimaster.c

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10281028
MTM_TXTRACE(x,"PostPrepareTransaction Start");
10291029

10301030
if (!x->isDistributed) {
1031+
MTM_TXTRACE(x,"not distributed?");
10311032
return;
10321033
}
10331034

@@ -1040,25 +1041,34 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10401041
Assert(ts!=NULL);
10411042
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10421043
if (!MtmIsCoordinator(ts)||Mtm->status==MTM_RECOVERY) {
1044+
MTM_TXTRACE(x,"recovery?");
10431045
Assert(x->gid[0]);
10441046
ts->votingCompleted= true;
1045-
if (Mtm->status!=MTM_RECOVERY||Mtm->recoverySlot!=MtmReplicationNodeId) {
1047+
MTM_TXTRACE(x,"recovery? 1");
1048+
if (Mtm->status!=MTM_RECOVERY||Mtm->recoverySlot!=MtmReplicationNodeId) {
1049+
MTM_TXTRACE(x,"recovery? 2");
10461050
MtmSend2PCMessage(ts,MSG_PREPARED);/* send notification to coordinator */
10471051
if (!MtmUseDtm) {
10481052
ts->status=TRANSACTION_STATUS_UNKNOWN;
10491053
}
10501054
}else {
1055+
MTM_TXTRACE(x,"recovery? 3");
10511056
ts->status=TRANSACTION_STATUS_UNKNOWN;
10521057
}
1058+
MTM_TXTRACE(x,"recovery? 4");
10531059
MtmUnlock();
1060+
MTM_TXTRACE(x,"recovery? 5");
10541061
MtmResetTransaction();
1062+
MTM_TXTRACE(x,"recovery? 6");
10551063
}else {
1064+
MTM_TXTRACE(x,"not recovery?");
10561065
Mtm2PCVoting(x,ts);
10571066
MtmUnlock();
10581067
if (x->isTwoPhase) {
10591068
MtmResetTransaction();
10601069
}
10611070
}
1071+
MTM_TXTRACE(x,"recovery? 7");
10621072
//if (x->gid[0]) MTM_LOG1("Prepared transaction %d (%s) csn=%ld at %ld: %d", x->xid, x->gid, ts->csn, MtmGetCurrentTime(), ts->status);
10631073
if (Mtm->inject2PCError==3) {
10641074
Mtm->inject2PCError=0;
@@ -1136,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
11361146
strcpy(msg.gid,gid);
11371147
msg.origin_node=nodeId;
11381148
msg.origin_lsn=replorigin_session_origin_lsn;
1149+
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)",nodeId,gid);
11391150
XLogFlush(LogLogicalMessage("A", (char*)&msg,sizeofmsg, false));
11401151
}
11411152

@@ -1224,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12241235
MtmTransactionListAppend(ts);
12251236
if (*x->gid) {
12261237
replorigin_session_origin_lsn=InvalidXLogRecPtr;
1238+
MTM_TXTRACE(x,"MtmEndTransaction/MtmLogAbortLogicalMessage");
12271239
MtmLogAbortLogicalMessage(MtmNodeId,x->gid);
12281240
}
12291241
}
@@ -2878,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28782890
CommitTransactionCommand();
28792891
MtmEndSession(nodeId, true);
28802892
}elseif (status==TRANSACTION_STATUS_IN_PROGRESS) {
2893+
MtmBeginSession(nodeId);
28812894
MtmLogAbortLogicalMessage(nodeId,gid);
2895+
MtmEndSession(nodeId, true);
28822896
}
28832897
}
28842898

@@ -3045,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30453059
sscanf(strVal(elem->arg),"%lx",&recoveredLSN);
30463060
MTM_LOG1("Recovered position of node %d is %lx",MtmReplicationNodeId,recoveredLSN);
30473061
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
3062+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
30483063
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
30493064
}
30503065
}else {
@@ -3210,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
32103225
}
32113226
restart_lsn=origin_node==MtmReplicationNodeId ?end_lsn :origin_lsn;
32123227
if (Mtm->nodes[origin_node-1].restartLSN<restart_lsn) {
3228+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,restart_lsn);
32133229
Mtm->nodes[origin_node-1].restartLSN=restart_lsn;
32143230
}else {
32153231
duplicate= true;
32163232
}
32173233

32183234
if (duplicate) {
3219-
MTM_LOG1("Ignore transaction %s from node %dlsn%lx, flags=%x,origin node %d, original lsn=%lx,current lsn=%lx",
3220-
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
3235+
MTM_LOG1("Ignore transaction %s from node %dflags=%x, our restartLSN for node:%lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx,origin_lsn=%lx",
3236+
gid,replication_node,flags,Mtm->nodes[origin_node-1].restartLSN,origin_node,MtmReplicationNodeId,end_lsn,origin_lsn);
32213237
}else {
32223238
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
32233239
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
32243240
}
3241+
32253242
returnduplicate;
32263243
}
32273244

@@ -4127,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
41274144

41284145
caseT_VacuumStmt:
41294146
skipCommand= true;
4130-
if (context==PROCESS_UTILITY_TOPLEVEL) {
4131-
MtmProcessDDLCommand(queryString, false, true);
4132-
MtmTx.isDistributed= false;
4133-
}elseif (MtmApplyContext!=NULL) {
4134-
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
4135-
Assert(oldContext!=MtmApplyContext);
4136-
MtmVacuumStmt= (VacuumStmt*)copyObject(parsetree);
4137-
MemoryContextSwitchTo(oldContext);
4138-
return;
4139-
}
4147+
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4148+
// MtmProcessDDLCommand(queryString, false, true);
4149+
// MtmTx.isDistributed = false;
4150+
// } else if (MtmApplyContext != NULL) {
4151+
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152+
// Assert(oldContext != MtmApplyContext);
4153+
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154+
// MemoryContextSwitchTo(oldContext);
4155+
// return;
4156+
// }
41404157
break;
41414158

41424159
caseT_CreateDomainStmt:
@@ -4231,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42314248
if (indexStmt->concurrent)
42324249
{
42334250
if (context==PROCESS_UTILITY_TOPLEVEL) {
4234-
MtmProcessDDLCommand(queryString, false, true);
4251+
// MtmProcessDDLCommand(queryString, false, true);
42354252
MtmTx.isDistributed= false;
42364253
skipCommand= true;
42374254
/*
@@ -4258,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42584275
if (stmt->removeType==OBJECT_INDEX&&stmt->concurrent)
42594276
{
42604277
if (context==PROCESS_UTILITY_TOPLEVEL) {
4261-
MtmProcessDDLCommand(queryString, false, true);
4278+
//MtmProcessDDLCommand(queryString, false, true);
42624279
MtmTx.isDistributed= false;
42634280
skipCommand= true;
42644281
}elseif (MtmApplyContext!=NULL) {

‎contrib/mmts/multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
#defineMTM_TXTRACE(tx,event)
3939
#else
4040
#defineMTM_TXTRACE(tx,event) \
41-
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s\n", tx->gid, (long long)MtmGetSystemTime(), event)
41+
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, getpid())
4242
#endif
4343

4444
#defineMULTIMASTER_NAME "multimaster"

‎contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ process_remote_message(StringInfo s)
434434
* restartLSN without locks
435435
*/
436436
if (Mtm->nodes[origin_node-1].restartLSN<msg->origin_lsn) {
437+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)",origin_node,Mtm->nodes[origin_node-1].restartLSN,msg->origin_lsn);
437438
Mtm->nodes[origin_node-1].restartLSN=msg->origin_lsn;
438439
replorigin_session_origin_lsn=msg->origin_lsn;
439440
MtmRollbackPreparedTransaction(origin_node,msg->gid);

‎contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ pglogical_receiver_main(Datum main_arg)
338338
}else {
339339
originStartPos=replorigin_get_progress(originId, false);
340340
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
341+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
341342
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
342343
}
343344
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
@@ -533,7 +534,7 @@ pglogical_receiver_main(Datum main_arg)
533534
MtmSpillToFile(spill_file,buf.data,buf.used);
534535
ByteBufferReset(&buf);
535536
}
536-
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='C'||stmt[1]=='A')) {
537+
if (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A')) {
537538
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
538539
MtmExecutor(stmt,rc-hdr_len);
539540
}else {

‎contrib/mmts/tests/reinit-mm.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ do
5353
multimaster.use_raftable = false
5454
multimaster.queue_size=52857600
5555
multimaster.ignore_tables_without_pk = 1
56-
multimaster.heartbeat_recv_timeout =1000
56+
multimaster.heartbeat_recv_timeout =2000
5757
multimaster.heartbeat_send_timeout = 250
58-
multimaster.twopc_min_timeout =400000
59-
multimaster.min_2pc_timeout =400000
58+
multimaster.twopc_min_timeout =40000000
59+
multimaster.min_2pc_timeout =40000000
6060
multimaster.volkswagen_mode = 1
6161
multimaster.conn_strings = '$conn_str'
6262
multimaster.node_id =$i

‎contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ if [ "$1" = 'postgres' ]; then
6767
multimaster.conn_strings = '$CONNSTRS'
6868
multimaster.heartbeat_recv_timeout = 1100
6969
multimaster.heartbeat_send_timeout = 250
70-
multimaster.twopc_min_timeout = 200000
70+
multimaster.twopc_min_timeout = 20000
71+
multimaster.min_2pc_timeout = 10000
7172
EOF
7273

7374
cat$PGDATA/postgresql.conf

‎src/test/regress/pg_regress.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,10 @@ convert_sourcefiles_in(char *source_subdir, char *dest_dir, char *dest_subdir, c
549549
}
550550
while (fgets(line,sizeof(line),infile))
551551
{
552-
//replace_string(line, "@abs_srcdir@", inputdir);
553-
replace_string(line,"@abs_srcdir@","/pg/src/src/test/regress");
554-
//replace_string(line, "@abs_builddir@", outputdir);
555-
replace_string(line,"@abs_builddir@","/pg/src/src/test/regress");
552+
replace_string(line,"@abs_srcdir@",inputdir);
553+
//replace_string(line, "@abs_srcdir@", "/pg/src/src/test/regress");
554+
replace_string(line,"@abs_builddir@",outputdir);
555+
//replace_string(line, "@abs_builddir@", "/pg/src/src/test/regress");
556556
replace_string(line,"@testtablespace@",testtablespace);
557557
replace_string(line,"@libdir@",dlpath);
558558
replace_string(line,"@DLSUFFIX@",DLSUFFIX);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp