Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit194c178

Browse files
knizhnikkelvich
authored andcommitted
Filter applied transactions
1 parentf83bad4 commit194c178

File tree

6 files changed

+146
-50
lines changed

6 files changed

+146
-50
lines changed

‎arbiter.c

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
staticvoidMtmCheckResponse(MtmArbiterMessage*resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
318+
&&Mtm->status!=MTM_RECOVERY
319+
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
320+
{
317321
elog(WARNING,"Node %d thinks that I was dead, while I am %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],messageKindText[resp->code]);
318-
if (Mtm->status!=MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -561,7 +563,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
561563
staticintMtmReadFromNode(intnode,void*buf,intbuf_size)
562564
{
563565
intrc=MtmReadSocket(sockets[node],buf,buf_size);
564-
if (rc<0) {
566+
if (rc <=0) {
565567
elog(WARNING,"Arbiter failed to read from node=%d, rc=%d, errno=%d",node+1,rc,errno);
566568
MtmDisconnect(node);
567569
}
@@ -957,6 +959,7 @@ static void MtmReceiver(Datum arg)
957959
elog(WARNING,"Ignore response for unexisted transaction %d from node %d",msg->dxid,node);
958960
continue;
959961
}
962+
Assert(msg->code==MSG_ABORTED||strcmp(msg->gid,ts->gid)==0);
960963
if (BIT_CHECK(ts->votedMask,node-1)) {
961964
elog(WARNING,"Receive deteriorated %s response for transaction %d (%s) from node %d",
962965
messageKindText[msg->code],ts->xid,ts->gid,node);
@@ -990,6 +993,8 @@ static void MtmReceiver(Datum arg)
990993
MtmWakeUpBackend(ts);
991994
}else {
992995
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
996+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
993998
ts->isPrepared= true;
994999
if (ts->isTwoPhase) {
9951000
MtmWakeUpBackend(ts);
@@ -1048,9 +1053,11 @@ static void MtmReceiver(Datum arg)
10481053
ts->csn=MtmAssignCSN();
10491054
MtmAdjustSubtransactions(ts);
10501055
MtmSend2PCMessage(ts,MSG_PRECOMMITTED);
1051-
}else {
1052-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
1056+
}elseif (ts->status==TRANSACTION_STATUS_ABORTED) {
10531057
MtmSend2PCMessage(ts,MSG_ABORTED);
1058+
}else {
1059+
elog(WARNING,"Transaction %s is already %s",
1060+
ts->gid,ts->status==TRANSACTION_STATUS_COMMITTED ?"committed" :"prepared");
10541061
}
10551062
break;
10561063
default:

‎multimaster.c

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include"miscadmin.h"
1616

1717
#include"libpq-fe.h"
18+
#include"lib/stringinfo.h"
19+
#include"libpq/pqformat.h"
1820
#include"common/username.h"
1921

2022
#include"postmaster/postmaster.h"
@@ -925,7 +927,9 @@ MtmVotingCompleted(MtmTransState* ts)
925927
ts->votingCompleted= true;
926928
ts->status=TRANSACTION_STATUS_UNKNOWN;
927929
return true;
928-
}else {
930+
}else {
931+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
932+
ts->gid,ts->status,ts->participantsMask,Mtm->disabledNodeMask,ts->votedMask);
929933
ts->isPrepared= true;
930934
if (ts->isTwoPhase) {
931935
ts->votingCompleted= true;
@@ -979,9 +983,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
979983
MtmResetTransaction();
980984
}else {
981985
intresult=0;
982-
986+
intnConfigChanges=Mtm->nConfigChanges;
983987
/* Wait votes from all nodes until: */
984-
while (!MtmVotingCompleted(ts))
988+
while (!MtmVotingCompleted(ts)
989+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
985990
{
986991
MtmUnlock();
987992
MTM_TXTRACE(x,"PostPrepareTransaction WaitLatch Start");
@@ -997,8 +1002,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9971002
MtmLock(LW_EXCLUSIVE);
9981003
}
9991004
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1005+
if (ts->isPrepared) {
1006+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1007+
}
1008+
if (Mtm->status!=MTM_ONLINE) {
1009+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1010+
}else {
1011+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1012+
}
10001013
MtmAbortTransaction(ts);
1001-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10021014
}
10031015
x->status=ts->status;
10041016
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
@@ -1031,6 +1043,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10311043
elog(WARNING,"Global transaciton ID '%s' is not found",x->gid);
10321044
}else {
10331045
intresult=0;
1046+
intnConfigChanges=Mtm->nConfigChanges;
10341047

10351048
Assert(tm->state!=NULL);
10361049
MTM_LOG3("Commit prepared transaction %d with gid='%s'",x->xid,x->gid);
@@ -1045,7 +1058,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10451058
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
10461059

10471060
/* Wait votes from all nodes until: */
1048-
while (!MtmVotingCompleted(ts))
1061+
while (!MtmVotingCompleted(ts)
1062+
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
10491063
{
10501064
MtmUnlock();
10511065
MTM_TXTRACE(x,"CommitPreparedTransaction WaitLatch Start");
@@ -1062,8 +1076,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10621076
}
10631077
}
10641078
if (ts->status!=TRANSACTION_STATUS_ABORTED&& !ts->votingCompleted) {
1079+
if (ts->isPrepared) {
1080+
elog(ERROR,"Commit of distributed transaction %s is suspended because node is switched to %s mode",ts->gid,MtmNodeStatusMnem[Mtm->status]);
1081+
}
1082+
if (Mtm->status!=MTM_ONLINE) {
1083+
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1084+
}else {
1085+
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
1086+
}
10651087
MtmAbortTransaction(ts);
1066-
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
10671088
}
10681089
x->status=ts->status;
10691090
x->xid=ts->xid;
@@ -1165,11 +1186,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11651186
}
11661187
ts->status=TRANSACTION_STATUS_ABORTED;
11671188
ts->isLocal= true;
1189+
ts->isPrepared= false;
11681190
ts->snapshot=x->snapshot;
1191+
ts->isTwoPhase=x->isTwoPhase;
11691192
ts->csn=MtmAssignCSN();
11701193
ts->gtid=x->gtid;
11711194
ts->nSubxids=0;
11721195
ts->votingCompleted= true;
1196+
strcpy(ts->gid,x->gid);
11731197
if (ts->isActive) {
11741198
ts->isActive= false;
11751199
Assert(Mtm->nActiveTransactions!=0);
@@ -1225,8 +1249,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12251249
inti;
12261250
for (i=0;i<Mtm->nAllNodes;i++)
12271251
{
1228-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i)&&TransactionIdIsValid(ts->xids[i]))
1252+
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
12291253
{
1254+
Assert(TransactionIdIsValid(ts->xids[i]));
12301255
msg.node=i+1;
12311256
msg.dxid=ts->xids[i];
12321257
MtmSendMessage(&msg);
@@ -1654,7 +1679,7 @@ MtmCheckClusterLock()
16541679
continue;
16551680
}else {
16561681
/* All lockers are synchronized their logs */
1657-
/* Remove lock and mark them asrceovered */
1682+
/* Remove lock and mark them asrecovered */
16581683
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)",Mtm->nLockers, (long)Mtm->nodeLockerMask);
16591684
Assert(Mtm->walSenderLockerMask==0);
16601685
Assert((Mtm->nodeLockerMask&Mtm->disabledNodeMask)==Mtm->nodeLockerMask);
@@ -2081,6 +2106,8 @@ static void MtmInitialize()
20812106
Mtm->nodes[i].timeline=0;
20822107
}
20832108
Mtm->nodes[MtmNodeId-1].originId=DoNotReplicateId;
2109+
/* All transaction originated from the current node should be ignored during recovery */
2110+
Mtm->nodes[MtmNodeId-1].restartLsn= (XLogRecPtr)PG_UINT64_MAX;
20842111
PGSemaphoreCreate(&Mtm->sendSemaphore);
20852112
PGSemaphoreReset(&Mtm->sendSemaphore);
20862113
SpinLockInit(&Mtm->spinlock);
@@ -2807,12 +2834,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28072834
Assert(!IsTransactionState());
28082835
MtmResetTransaction();
28092836
StartTransactionCommand();
2810-
#if0
2811-
if (Mtm->nodes[MtmNodeId-1].originId==InvalidRepOriginId) {
2812-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2813-
Mtm->nodes[MtmNodeId-1].originId=replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN,MtmNodeId));
2814-
}
2815-
#endif
2837+
28162838
MtmBeginSession(MtmNodeId);
28172839
MtmSetCurrentTransactionCSN(ts->csn);
28182840
MtmSetCurrentTransactionGID(ts->gid);
@@ -2829,7 +2851,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28292851
*/
28302852
MtmReplicationModeMtmGetReplicationMode(intnodeId,sig_atomic_tvolatile*shutdown)
28312853
{
2832-
inti;
28332854
boolrecovery= false;
28342855

28352856
while (Mtm->status!=MTM_CONNECTED&&Mtm->status!=MTM_ONLINE)
@@ -2851,9 +2872,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28512872
Mtm->nReceivers=0;
28522873
Mtm->recoveryCount+=1;
28532874
Mtm->pglogicalNodeMask=0;
2854-
for (i=0;i<Mtm->nAllNodes;i++) {
2855-
Mtm->nodes[i].restartLsn=InvalidXLogRecPtr;
2856-
}
28572875
MtmUnlock();
28582876
returnREPLMODE_RECOVERY;
28592877
}
@@ -3070,6 +3088,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30703088
returnisDistributed;
30713089
}
30723090

3091+
boolMtmFilterTransaction(char*record,intsize)
3092+
{
3093+
StringInfoDatas;
3094+
uint8flags;
3095+
XLogRecPtrorigin_lsn;
3096+
XLogRecPtrend_lsn;
3097+
intreplication_node;
3098+
intorigin_node;
3099+
charconst*gid="";
3100+
boolduplicate;
3101+
3102+
s.data=record;
3103+
s.len=size;
3104+
s.maxlen=-1;
3105+
s.cursor=0;
3106+
3107+
Assert(pq_getmsgbyte(&s)=='C');
3108+
flags=pq_getmsgbyte(&s);/* flags */
3109+
replication_node=pq_getmsgbyte(&s);
3110+
3111+
/* read fields */
3112+
pq_getmsgint64(&s);/* commit_lsn */
3113+
end_lsn=pq_getmsgint64(&s);/* end_lsn */
3114+
pq_getmsgint64(&s);/* commit_time */
3115+
3116+
origin_node=pq_getmsgbyte(&s);
3117+
origin_lsn=pq_getmsgint64(&s);
3118+
3119+
Assert(replication_node==MtmReplicationNodeId&&
3120+
origin_node!=0&&
3121+
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
3122+
3123+
switch(PGLOGICAL_XACT_EVENT(flags))
3124+
{
3125+
casePGLOGICAL_PREPARE:
3126+
casePGLOGICAL_ABORT_PREPARED:
3127+
gid=pq_getmsgstring(&s);
3128+
break;
3129+
casePGLOGICAL_COMMIT_PREPARED:
3130+
pq_getmsgint64(&s);/* CSN */
3131+
gid=pq_getmsgstring(&s);
3132+
break;
3133+
default:
3134+
break;
3135+
}
3136+
duplicate=Mtm->status==MTM_RECOVERY&&origin_lsn!=InvalidXLogRecPtr&&origin_lsn <=Mtm->nodes[origin_node-1].restartLsn;
3137+
3138+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3139+
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,origin_node,origin_lsn,Mtm->nodes[origin_node-1].restartLsn);
3140+
if (Mtm->status==MTM_RECOVERY) {
3141+
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
3142+
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
3143+
}
3144+
}else {
3145+
if (Mtm->nodes[replication_node-1].restartLsn<end_lsn) {
3146+
Mtm->nodes[replication_node-1].restartLsn=end_lsn;
3147+
}
3148+
}
3149+
returnduplicate;
3150+
}
3151+
30733152
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
30743153
{
30753154
hooks->startup_hook=MtmReplicationStartupHook;

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,6 @@ extern void MtmBeginSession(int nodeId);
362362
externvoidMtmEndSession(intnodeId,boolunlock);
363363
externvoidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit);
364364
externvoidMtmRollbackPreparedTransaction(charconst*gid);
365+
externboolMtmFilterTransaction(char*record,intsize);
366+
365367
#endif

‎pglogical_apply.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node>0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
347+
MtmResetTransaction();
347348
#if1
348349
if (BIT_CHECK(Mtm->disabledNodeMask,gtid.node-1)) {
349350
elog(WARNING,"Ignore transaction %d from disabled node %d",gtid.xid,gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node=pq_getmsgbyte(in);
604604
origin_lsn=pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn<origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn=origin_lsn;
608-
}
609606
if (origin_node!=MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId,origin_lsn,GetXLogInsertRecPtr(),
611608
false/* backward */ , false/* WAL */ );

‎pglogical_receiver.c

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
340340
}else {
341341
originStartPos=replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn<originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn=originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId=originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf,stmt,rc-hdr_len);
536539
if (stmt[0]=='C')/* commit */
537540
{
538-
if (spill_file >=0) {
539-
ByteBufferAppend(&buf,")",1);
540-
pq_sendbyte(&spill_info,'(');
541-
pq_sendint(&spill_info,buf.used,4);
542-
MtmSpillToFile(spill_file,buf.data,buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data,spill_info.len);
545-
spill_file=-1;
546-
resetStringInfo(&spill_info);
547-
}else {
548-
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_tstop,start=MtmGetSystemTime();
551-
MtmExecutor(buf.data,buf.used);
552-
stop=MtmGetSystemTime();
553-
if (stop-start>USECS_PER_SEC) {
554-
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
541+
if (!MtmFilterTransaction(stmt,rc-hdr_len)) {
542+
if (spill_file >=0) {
543+
ByteBufferAppend(&buf,")",1);
544+
pq_sendbyte(&spill_info,'(');
545+
pq_sendint(&spill_info,buf.used,4);
546+
MtmSpillToFile(spill_file,buf.data,buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data,spill_info.len);
549+
spill_file=-1;
550+
resetStringInfo(&spill_info);
551+
}else {
552+
if (MtmPreserveCommitOrder&&buf.used==rc-hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_tstop,start=MtmGetSystemTime();
555+
MtmExecutor(buf.data,buf.used);
556+
stop=MtmGetSystemTime();
557+
if (stop-start>USECS_PER_SEC) {
558+
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
559+
}
560+
}else {
561+
MtmExecute(buf.data,buf.used);
555562
}
556-
}else {
557-
MtmExecute(buf.data,buf.used);
558563
}
564+
}elseif (spill_file >=0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file=-1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

‎tests2/docker-entrypoint.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ if [ "$1" = 'postgres' ]; then
4545
# dbname=$POSTGRES_DB user=$POSTGRES_USER host=node2, \
4646
# dbname=$POSTGRES_DB user=$POSTGRES_USER host=node3"
4747

48-
# log_line_prefix = '%t: '
4948

5049
cat<<-EOF >>$PGDATA/postgresql.conf
5150
listen_addresses='*'
@@ -58,6 +57,10 @@ if [ "$1" = 'postgres' ]; then
5857
max_wal_senders = 10
5958
shared_preload_libraries = 'raftable,multimaster'
6059
default_transaction_isolation = 'repeatable read'
60+
log_checkpoints = on
61+
checkpoint_timeout = 30
62+
log_autovacuum_min_duration = 0
63+
log_line_prefix = '%t: '
6164
6265
multimaster.workers = 4
6366
multimaster.max_nodes = 3

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp