Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb06753a

Browse files
committed
merge
2 parents94b6265 +b958316 commitb06753a

File tree

6 files changed

+143
-49
lines changed

6 files changed

+143
-49
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
staticvoidMtmCheckResponse(MtmArbiterMessage*resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
318+
&&Mtm->status!=MTM_RECOVERY
319+
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
320+
{
317321
elog(WARNING,"Node %d thinks that I was dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],messageKindText[resp->code]);
318-
if (Mtm->status!=MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -561,7 +563,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
561563
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
562564
{
563565
intrc=MtmReadSocket(sockets[node],buf,buf_size);
564-
if (rc<0) {
566+
if (rc <=0) {
565567
elog(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
566568
MtmDisconnect(node);
567569
}
@@ -957,6 +959,7 @@ static void MtmReceiver(Datum arg)
957959
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,node);
958960
continue;
959961
}
962+
Assert(msg->code==MSG_ABORTED||strcmp(msg->gid,ts->gid)==0);
960963
if (BIT_CHECK(ts->votedMask,node-1)) {
961964
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
962965
messageKindText[msg->code],ts->xid,ts->gid,node);
@@ -990,6 +993,8 @@ static void MtmReceiver(Datum arg)
990993
MtmWakeUpBackend(ts);
991994
}else {
992995
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
996+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
993998
ts->isPrepared= true;
994999
if (ts->isTwoPhase) {
9951000
MtmWakeUpBackend(ts);
@@ -1048,9 +1053,11 @@ static void MtmReceiver(Datum arg)
10481053
ts->csn=MtmAssignCSN();
10491054
MtmAdjustSubtransactions(ts);
10501055
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1051-
}else {
1052-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
1056+
}elseif (ts->status==TRANSACTION_STATUS_ABORTED) {
10531057
MtmSend2PCMessage(ts,MSG_ABORTED);
1058+
}else {
1059+
elog(WARNING,"Transaction %s is already %s",
1060+
ts->gid,ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"prepared");
10541061
}
10551062
break;
10561063
default:

‎contrib/mmts/multimaster.c‎

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include"miscadmin.h"
1616

1717
#include"libpq-fe.h"
18+
#include"lib/stringinfo.h"
19+
#include"libpq/pqformat.h"
1820
#include"common/username.h"
1921

2022
#include"postmaster/postmaster.h"
@@ -926,7 +928,9 @@ MtmVotingCompleted(MtmTransState* ts)
926928
ts->votingCompleted= true;
927929
ts->status=TRANSACTION_STATUS_UNKNOWN;
928930
return true;
929-
}else {
931+
}else {
932+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
933+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
930934
ts->isPrepared= true;
931935
if (ts->isTwoPhase) {
932936
ts->votingCompleted= true;
@@ -980,9 +984,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980984
MtmResetTransaction();
981985
}else {
982986
intresult=0;
983-
987+
intnConfigChanges=Mtm->nConfigChanges;
984988
/* Wait votes from all nodes until: */
985-
while (!MtmVotingCompleted(ts))
989+
while (!MtmVotingCompleted(ts)
990+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
986991
{
987992
MtmUnlock();
988993
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Start");
@@ -998,8 +1003,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9981003
MtmLock(LW_EXCLUSIVE);
9991004
}
10001005
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1006+
if (ts->isPrepared) {
1007+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1008+
}
1009+
if (Mtm->status!=MTM_ONLINE) {
1010+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1011+
}else {
1012+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1013+
}
10011014
MtmAbortTransaction(ts);
1002-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10031015
}
10041016
x->status=ts->status;
10051017
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
@@ -1032,6 +1044,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10321044
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
10331045
}else {
10341046
intresult=0;
1047+
intnConfigChanges=Mtm->nConfigChanges;
10351048

10361049
Assert(tm->state!=NULL);
10371050
MTM_LOG3("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
@@ -1046,7 +1059,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10461059
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
10471060

10481061
/* Wait votes from all nodes until: */
1049-
while (!MtmVotingCompleted(ts))
1062+
while (!MtmVotingCompleted(ts)
1063+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
10501064
{
10511065
MtmUnlock();
10521066
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
@@ -1063,8 +1077,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10631077
}
10641078
}
10651079
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1080+
if (ts->isPrepared) {
1081+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1082+
}
1083+
if (Mtm->status!=MTM_ONLINE) {
1084+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1085+
}else {
1086+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1087+
}
10661088
MtmAbortTransaction(ts);
1067-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10681089
}
10691090
x->status=ts->status;
10701091
x->xid=ts->xid;
@@ -1166,11 +1187,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11661187
}
11671188
ts->status=TRANSACTION_STATUS_ABORTED;
11681189
ts->isLocal= true;
1190+
ts->isPrepared= false;
11691191
ts->snapshot=x->snapshot;
1192+
ts->isTwoPhase=x->isTwoPhase;
11701193
ts->csn=MtmAssignCSN();
11711194
ts->gtid=x->gtid;
11721195
ts->nSubxids=0;
11731196
ts->votingCompleted= true;
1197+
strcpy(ts->gid,x->gid);
11741198
if (ts->isActive) {
11751199
ts->isActive= false;
11761200
Assert(Mtm->nActiveTransactions!=0);
@@ -1226,8 +1250,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12261250
inti;
12271251
for (i=0;i<Mtm->nAllNodes;i++)
12281252
{
1229-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i]))
1253+
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
12301254
{
1255+
Assert(TransactionIdIsValid(ts->xids[i]));
12311256
msg.node=i+1;
12321257
msg.dxid=ts->xids[i];
12331258
MtmSendMessage(&msg);
@@ -1655,7 +1680,7 @@ MtmCheckClusterLock()
16551680
continue;
16561681
}else {
16571682
/* All lockers are synchronized their logs */
1658-
/* Remove lock and mark them asrceovered */
1683+
/* Remove lock and mark them asrecovered */
16591684
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)",Mtm->nLockers, (long)Mtm->nodeLockerMask);
16601685
Assert(Mtm->walSenderLockerMask==0);
16611686
Assert((Mtm->nodeLockerMask&Mtm->disabledNodeMask)==Mtm->nodeLockerMask);
@@ -2082,6 +2107,8 @@ static void MtmInitialize()
20822107
Mtm->nodes[i].timeline=0;
20832108
}
20842109
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
2110+
/* All transaction originated from the current node should be ignored during recovery */
2111+
Mtm->nodes[MtmNodeId-1].restartLsn= (XLogRecPtr)PG_UINT64_MAX;
20852112
PGSemaphoreCreate(&Mtm->sendSemaphore);
20862113
PGSemaphoreReset(&Mtm->sendSemaphore);
20872114
SpinLockInit(&Mtm->spinlock);
@@ -2808,12 +2835,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28082835
Assert(!IsTransactionState());
28092836
MtmResetTransaction();
28102837
StartTransactionCommand();
2811-
#if0
2812-
if (Mtm->nodes[MtmNodeId-1].originId==InvalidRepOriginId) {
2813-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2814-
Mtm->nodes[MtmNodeId-1].originId=replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN,MtmNodeId));
2815-
}
2816-
#endif
2838+
28172839
MtmBeginSession(MtmNodeId);
28182840
MtmSetCurrentTransactionCSN(ts->csn);
28192841
MtmSetCurrentTransactionGID(ts->gid);
@@ -2830,7 +2852,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28302852
*/
28312853
MtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown)
28322854
{
2833-
inti;
28342855
boolrecovery= false;
28352856

28362857
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)
@@ -2852,9 +2873,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28522873
Mtm->nReceivers=0;
28532874
Mtm->recoveryCount+=1;
28542875
Mtm->pglogicalNodeMask=0;
2855-
for (i=0;i<Mtm->nAllNodes;i++) {
2856-
Mtm->nodes[i].restartLsn=InvalidXLogRecPtr;
2857-
}
28582876
MtmUnlock();
28592877
returnREPLMODE_RECOVERY;
28602878
}
@@ -3071,6 +3089,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30713089
returnisDistributed;
30723090
}
30733091

3092+
boolMtmFilterTransaction(char*record,intsize)
3093+
{
3094+
StringInfoDatas;
3095+
uint8flags;
3096+
XLogRecPtrorigin_lsn;
3097+
XLogRecPtrend_lsn;
3098+
intreplication_node;
3099+
intorigin_node;
3100+
charconst*gid="";
3101+
boolduplicate;
3102+
3103+
s.data=record;
3104+
s.len=size;
3105+
s.maxlen=-1;
3106+
s.cursor=0;
3107+
3108+
Assert(pq_getmsgbyte(&s)=='C');
3109+
flags=pq_getmsgbyte(&s);/* flags */
3110+
replication_node=pq_getmsgbyte(&s);
3111+
3112+
/* read fields */
3113+
pq_getmsgint64(&s);/* commit_lsn */
3114+
end_lsn=pq_getmsgint64(&s);/* end_lsn */
3115+
pq_getmsgint64(&s);/* commit_time */
3116+
3117+
origin_node=pq_getmsgbyte(&s);
3118+
origin_lsn=pq_getmsgint64(&s);
3119+
3120+
Assert(replication_node==MtmReplicationNodeId&&
3121+
origin_node!=0&&
3122+
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
3123+
3124+
switch(PGLOGICAL_XACT_EVENT(flags))
3125+
{
3126+
casePGLOGICAL_PREPARE:
3127+
casePGLOGICAL_ABORT_PREPARED:
3128+
gid=pq_getmsgstring(&s);
3129+
break;
3130+
casePGLOGICAL_COMMIT_PREPARED:
3131+
pq_getmsgint64(&s);/* CSN */
3132+
gid=pq_getmsgstring(&s);
3133+
break;
3134+
default:
3135+
break;
3136+
}
3137+
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLsn;
3138+
3139+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3140+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLsn);
3141+
if (Mtm->status==MTM_RECOVERY) {
3142+
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
3143+
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
3144+
}
3145+
}else {
3146+
if (Mtm->nodes[replication_node-1].restartLsn<end_lsn) {
3147+
Mtm->nodes[replication_node-1].restartLsn=end_lsn;
3148+
}
3149+
}
3150+
returnduplicate;
3151+
}
3152+
30743153
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
30753154
{
30763155
hooks->startup_hook=MtmReplicationStartupHook;

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,6 @@ extern void MtmBeginSession(int nodeId);
362362
externvoidMtmEndSession(intnodeId,boolunlock);
363363
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);
364364
externvoidMtmRollbackPreparedTransaction(charconst*gid);
365+
externboolMtmFilterTransaction(char*record,intsize);
366+
365367
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node>0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
347+
MtmResetTransaction();
347348
#if1
348349
if (BIT_CHECK(Mtm->disabledNodeMask,gtid.node-1)) {
349350
elog(WARNING,"Ignore transaction %d from disabled node %d",gtid.xid,gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node=pq_getmsgbyte(in);
604604
origin_lsn=pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
608-
}
609606
if (origin_node!=MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId,origin_lsn,GetXLogInsertRecPtr(),
611608
false/* backward */ , false/* WAL */ );

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187
Assert(false);
188188

189189
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
190+
Assert(txn->xid<1000||MtmTransactionRecords >=2);
190191
// if (MtmIsFilteredTxn) {
191192
// Assert(MtmTransactionRecords == 0);
192193
// return;

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
340340
}else {
341341
originStartPos=replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn<originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn=originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId=originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf,stmt,rc-hdr_len);
536539
if (stmt[0]=='C')/* commit */
537540
{
538-
if (spill_file >=0) {
539-
ByteBufferAppend(&buf,")",1);
540-
pq_sendbyte(&spill_info,'(');
541-
pq_sendint(&spill_info,buf.used,4);
542-
MtmSpillToFile(spill_file,buf.data,buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data,spill_info.len);
545-
spill_file=-1;
546-
resetStringInfo(&spill_info);
547-
}else {
548-
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_tstop,start=MtmGetSystemTime();
551-
MtmExecutor(buf.data,buf.used);
552-
stop=MtmGetSystemTime();
553-
if (stop-start>USECS_PER_SEC) {
554-
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
541+
if (!MtmFilterTransaction(stmt,rc-hdr_len)) {
542+
if (spill_file >=0) {
543+
ByteBufferAppend(&buf,")",1);
544+
pq_sendbyte(&spill_info,'(');
545+
pq_sendint(&spill_info,buf.used,4);
546+
MtmSpillToFile(spill_file,buf.data,buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data,spill_info.len);
549+
spill_file=-1;
550+
resetStringInfo(&spill_info);
551+
}else {
552+
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_tstop,start=MtmGetSystemTime();
555+
MtmExecutor(buf.data,buf.used);
556+
stop=MtmGetSystemTime();
557+
if (stop-start>USECS_PER_SEC) {
558+
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
559+
}
560+
}else {
561+
MtmExecute(buf.data,buf.used);
555562
}
556-
}else {
557-
MtmExecute(buf.data,buf.used);
558563
}
564+
}elseif (spill_file >=0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file=-1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp