Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab52513

Browse files
committed
Filter applied transactions
1 parentcab8515 commitab52513

File tree

6 files changed

+143
-50
lines changed

6 files changed

+143
-50
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
staticvoidMtmCheckResponse(MtmArbiterMessage*resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
318+
&&Mtm->status!=MTM_RECOVERY
319+
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
320+
{
317321
elog(WARNING,"Node %d thinks that I was dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],messageKindText[resp->code]);
318-
if (Mtm->status!=MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -565,7 +567,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
565567
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
566568
{
567569
intrc=MtmReadSocket(sockets[node],buf,buf_size);
568-
if (rc<0) {
570+
if (rc <=0) {
569571
elog(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
570572
MtmDisconnect(node);
571573
}
@@ -961,6 +963,7 @@ static void MtmReceiver(Datum arg)
961963
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,node);
962964
continue;
963965
}
966+
Assert(msg->code==MSG_ABORTED||strcmp(msg->gid,ts->gid)==0);
964967
if (BIT_CHECK(ts->votedMask,node-1)) {
965968
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
966969
messageKindText[msg->code],ts->xid,ts->gid,node);
@@ -994,6 +997,8 @@ static void MtmReceiver(Datum arg)
994997
MtmWakeUpBackend(ts);
995998
}else {
996999
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1000+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
1001+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
9971002
ts->isPrepared= true;
9981003
if (ts->isTwoPhase) {
9991004
MtmWakeUpBackend(ts);
@@ -1052,9 +1057,11 @@ static void MtmReceiver(Datum arg)
10521057
ts->csn=MtmAssignCSN();
10531058
MtmAdjustSubtransactions(ts);
10541059
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1055-
}else {
1056-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
1060+
}elseif (ts->status==TRANSACTION_STATUS_ABORTED) {
10571061
MtmSend2PCMessage(ts,MSG_ABORTED);
1062+
}else {
1063+
elog(WARNING,"Transaction %s is already %s",
1064+
ts->gid,ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"prepared");
10581065
}
10591066
break;
10601067
default:

‎contrib/mmts/multimaster.c‎

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include"miscadmin.h"
1616

1717
#include"libpq-fe.h"
18+
#include"lib/stringinfo.h"
19+
#include"libpq/pqformat.h"
1820
#include"common/username.h"
1921

2022
#include"postmaster/postmaster.h"
@@ -926,7 +928,9 @@ MtmVotingCompleted(MtmTransState* ts)
926928
ts->votingCompleted= true;
927929
ts->status=TRANSACTION_STATUS_UNKNOWN;
928930
return true;
929-
}else {
931+
}else {
932+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
933+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
930934
ts->isPrepared= true;
931935
if (ts->isTwoPhase) {
932936
ts->votingCompleted= true;
@@ -980,9 +984,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980984
MtmResetTransaction();
981985
}else {
982986
intresult=0;
983-
987+
intnConfigChanges=Mtm->nConfigChanges;
984988
/* Wait votes from all nodes until: */
985-
while (!MtmVotingCompleted(ts))
989+
while (!MtmVotingCompleted(ts)
990+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
986991
{
987992
MtmUnlock();
988993
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Start");
@@ -998,8 +1003,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9981003
MtmLock(LW_EXCLUSIVE);
9991004
}
10001005
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1006+
if (ts->isPrepared) {
1007+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1008+
}
1009+
if (Mtm->status!=MTM_ONLINE) {
1010+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1011+
}else {
1012+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1013+
}
10011014
MtmAbortTransaction(ts);
1002-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10031015
}
10041016
x->status=ts->status;
10051017
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
@@ -1032,6 +1044,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10321044
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
10331045
}else {
10341046
intresult=0;
1047+
intnConfigChanges=Mtm->nConfigChanges;
10351048

10361049
Assert(tm->state!=NULL);
10371050
MTM_LOG3("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
@@ -1046,7 +1059,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10461059
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
10471060

10481061
/* Wait votes from all nodes until: */
1049-
while (!MtmVotingCompleted(ts))
1062+
while (!MtmVotingCompleted(ts)
1063+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
10501064
{
10511065
MtmUnlock();
10521066
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
@@ -1063,8 +1077,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10631077
}
10641078
}
10651079
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1080+
if (ts->isPrepared) {
1081+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1082+
}
1083+
if (Mtm->status!=MTM_ONLINE) {
1084+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1085+
}else {
1086+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1087+
}
10661088
MtmAbortTransaction(ts);
1067-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10681089
}
10691090
x->status=ts->status;
10701091
x->xid=ts->xid;
@@ -1166,11 +1187,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11661187
}
11671188
ts->status=TRANSACTION_STATUS_ABORTED;
11681189
ts->isLocal= true;
1190+
ts->isPrepared= false;
11691191
ts->snapshot=x->snapshot;
1192+
ts->isTwoPhase=x->isTwoPhase;
11701193
ts->csn=MtmAssignCSN();
11711194
ts->gtid=x->gtid;
11721195
ts->nSubxids=0;
11731196
ts->votingCompleted= true;
1197+
strcpy(ts->gid,x->gid);
11741198
if (ts->isActive) {
11751199
ts->isActive= false;
11761200
Assert(Mtm->nActiveTransactions!=0);
@@ -1226,8 +1250,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12261250
inti;
12271251
for (i=0;i<Mtm->nAllNodes;i++)
12281252
{
1229-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i]))
1253+
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
12301254
{
1255+
Assert(TransactionIdIsValid(ts->xids[i]));
12311256
msg.node=i+1;
12321257
msg.dxid=ts->xids[i];
12331258
MtmSendMessage(&msg);
@@ -1655,7 +1680,7 @@ MtmCheckClusterLock()
16551680
continue;
16561681
}else {
16571682
/* All lockers are synchronized their logs */
1658-
/* Remove lock and mark them asrceovered */
1683+
/* Remove lock and mark them asrecovered */
16591684
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)",Mtm->nLockers, (long)Mtm->nodeLockerMask);
16601685
Assert(Mtm->walSenderLockerMask==0);
16611686
Assert((Mtm->nodeLockerMask&Mtm->disabledNodeMask)==Mtm->nodeLockerMask);
@@ -2082,6 +2107,8 @@ static void MtmInitialize()
20822107
Mtm->nodes[i].timeline=0;
20832108
}
20842109
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
2110+
/* All transaction originated from the current node should be ignored during recovery */
2111+
Mtm->nodes[MtmNodeId-1].restartLsn= (XLogRecPtr)PG_UINT64_MAX;
20852112
PGSemaphoreCreate(&Mtm->sendSemaphore);
20862113
PGSemaphoreReset(&Mtm->sendSemaphore);
20872114
SpinLockInit(&Mtm->spinlock);
@@ -2806,12 +2833,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28062833
Assert(!IsTransactionState());
28072834
MtmResetTransaction();
28082835
StartTransactionCommand();
2809-
#if0
2810-
if (Mtm->nodes[MtmNodeId-1].originId==InvalidRepOriginId) {
2811-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2812-
Mtm->nodes[MtmNodeId-1].originId=replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN,MtmNodeId));
2813-
}
2814-
#endif
2836+
28152837
MtmBeginSession(MtmNodeId);
28162838
MtmSetCurrentTransactionCSN(ts->csn);
28172839
MtmSetCurrentTransactionGID(ts->gid);
@@ -2828,7 +2850,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28282850
*/
28292851
MtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown)
28302852
{
2831-
inti;
28322853
boolrecovery= false;
28332854

28342855
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)
@@ -2850,9 +2871,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28502871
Mtm->nReceivers=0;
28512872
Mtm->recoveryCount+=1;
28522873
Mtm->pglogicalNodeMask=0;
2853-
for (i=0;i<Mtm->nAllNodes;i++) {
2854-
Mtm->nodes[i].restartLsn=InvalidXLogRecPtr;
2855-
}
28562874
MtmUnlock();
28572875
returnREPLMODE_RECOVERY;
28582876
}
@@ -3069,6 +3087,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30693087
returnisDistributed;
30703088
}
30713089

3090+
boolMtmFilterTransaction(char*record,intsize)
3091+
{
3092+
StringInfoDatas;
3093+
uint8flags;
3094+
XLogRecPtrorigin_lsn;
3095+
XLogRecPtrend_lsn;
3096+
intreplication_node;
3097+
intorigin_node;
3098+
charconst*gid="";
3099+
boolduplicate;
3100+
3101+
s.data=record;
3102+
s.len=size;
3103+
s.maxlen=-1;
3104+
s.cursor=0;
3105+
3106+
Assert(pq_getmsgbyte(&s)=='C');
3107+
flags=pq_getmsgbyte(&s);/* flags */
3108+
replication_node=pq_getmsgbyte(&s);
3109+
3110+
/* read fields */
3111+
pq_getmsgint64(&s);/* commit_lsn */
3112+
end_lsn=pq_getmsgint64(&s);/* end_lsn */
3113+
pq_getmsgint64(&s);/* commit_time */
3114+
3115+
origin_node=pq_getmsgbyte(&s);
3116+
origin_lsn=pq_getmsgint64(&s);
3117+
3118+
Assert(replication_node==MtmReplicationNodeId&&
3119+
origin_node!=0&&
3120+
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
3121+
3122+
switch(PGLOGICAL_XACT_EVENT(flags))
3123+
{
3124+
casePGLOGICAL_PREPARE:
3125+
casePGLOGICAL_ABORT_PREPARED:
3126+
gid=pq_getmsgstring(&s);
3127+
break;
3128+
casePGLOGICAL_COMMIT_PREPARED:
3129+
pq_getmsgint64(&s);/* CSN */
3130+
gid=pq_getmsgstring(&s);
3131+
break;
3132+
default:
3133+
break;
3134+
}
3135+
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLsn;
3136+
3137+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3138+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLsn);
3139+
if (Mtm->status==MTM_RECOVERY) {
3140+
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
3141+
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
3142+
}
3143+
}else {
3144+
if (Mtm->nodes[replication_node-1].restartLsn<end_lsn) {
3145+
Mtm->nodes[replication_node-1].restartLsn=end_lsn;
3146+
}
3147+
}
3148+
returnduplicate;
3149+
}
3150+
30723151
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
30733152
{
30743153
hooks->startup_hook=MtmReplicationStartupHook;

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,4 +360,6 @@ extern void MtmBeginSession(int nodeId);
360360
externvoidMtmEndSession(intnodeId,boolunlock);
361361
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);
362362
externvoidMtmRollbackPreparedTransaction(charconst*gid);
363+
externboolMtmFilterTransaction(char*record,intsize);
364+
363365
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node>0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
347+
MtmResetTransaction();
347348
#if1
348349
if (BIT_CHECK(Mtm->disabledNodeMask,gtid.node-1)) {
349350
elog(WARNING,"Ignore transaction %d from disabled node %d",gtid.xid,gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node=pq_getmsgbyte(in);
604604
origin_lsn=pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
608-
}
609606
if (origin_node!=MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId,origin_lsn,GetXLogInsertRecPtr(),
611608
false/* backward */ , false/* WAL */ );

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
340340
}else {
341341
originStartPos=replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn<originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn=originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId=originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf,stmt,rc-hdr_len);
536539
if (stmt[0]=='C')/* commit */
537540
{
538-
if (spill_file >=0) {
539-
ByteBufferAppend(&buf,")",1);
540-
pq_sendbyte(&spill_info,'(');
541-
pq_sendint(&spill_info,buf.used,4);
542-
MtmSpillToFile(spill_file,buf.data,buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data,spill_info.len);
545-
spill_file=-1;
546-
resetStringInfo(&spill_info);
547-
}else {
548-
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_tstop,start=MtmGetSystemTime();
551-
MtmExecutor(buf.data,buf.used);
552-
stop=MtmGetSystemTime();
553-
if (stop-start>USECS_PER_SEC) {
554-
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
541+
if (!MtmFilterTransaction(stmt,rc-hdr_len)) {
542+
if (spill_file >=0) {
543+
ByteBufferAppend(&buf,")",1);
544+
pq_sendbyte(&spill_info,'(');
545+
pq_sendint(&spill_info,buf.used,4);
546+
MtmSpillToFile(spill_file,buf.data,buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data,spill_info.len);
549+
spill_file=-1;
550+
resetStringInfo(&spill_info);
551+
}else {
552+
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_tstop,start=MtmGetSystemTime();
555+
MtmExecutor(buf.data,buf.used);
556+
stop=MtmGetSystemTime();
557+
if (stop-start>USECS_PER_SEC) {
558+
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
559+
}
560+
}else {
561+
MtmExecute(buf.data,buf.used);
555562
}
556-
}else {
557-
MtmExecute(buf.data,buf.used);
558563
}
564+
}elseif (spill_file >=0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file=-1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

‎contrib/mmts/tests2/docker-entrypoint.sh‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ if [ "$1" = 'postgres' ]; then
6464
CONNSTRS='dbname=postgres user=postgres host=node1, dbname=postgres user=postgres host=node2, dbname=postgres user=postgres host=node3'
6565
RAFT_PEERS='1:node1:6666, 2:node2:6666, 3:node3:6666'
6666

67-
# log_line_prefix = '%t: '
6867

6968
cat<<-EOF >>$PGDATA/postgresql.conf
7069
listen_addresses='*'
@@ -80,6 +79,7 @@ if [ "$1" = 'postgres' ]; then
8079
log_checkpoints = on
8180
checkpoint_timeout = 30
8281
log_autovacuum_min_duration = 0
82+
log_line_prefix = '%t: '
8383
8484
multimaster.workers = 4
8585
multimaster.max_nodes = 3

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp