Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite12b5b4

Browse files
committed
mmts:twophase_decoding
1 parent11d8a7d commite12b5b4

File tree

5 files changed

+96
-9
lines changed

5 files changed

+96
-9
lines changed

‎arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ static bool MtmRecovery()
554554
}
555555
}
556556
}
557-
returnrecorvered;
557+
returnrecovered;
558558
}
559559
#endif
560560

@@ -601,8 +601,8 @@ static void MtmTransReceiver(Datum arg)
601601
do {
602602
structtimevaltv;
603603
events=inset;
604-
tv.tv_sec=MtmKeepAliveTimeout/USEC;
605-
tv.tv_usec=MtmKeepAliveTimeout%USEC;
604+
tv.tv_sec=MtmKeepaliveTimeout/USEC;
605+
tv.tv_usec=MtmKeepaliveTimeout%USEC;
606606
n=select(max_fd+1,&events,NULL,NULL,&tv);
607607
}while (n<0&&MtmRecovery());
608608

‎multimaster.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
typedefuint64csn_t;/* commit serial number */
3030
#defineINVALID_CSN ((csn_t)-1)
3131

32+
#definePGLOGICAL_COMMIT0x00
33+
#definePGLOGICAL_PREPARE0x01
34+
#definePGLOGICAL_COMMIT_PREPARED0x02
35+
#definePGLOGICAL_ABORT_PREPARED0x03
36+
37+
#definePGLOGICAL_XACT_EVENT(flags)(flags & 0x03)
38+
3239
typedefuint64timestamp_t;
3340

3441
/* Identifier of global transaction */

‎pglogical_apply.c

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,61 @@ read_rel(StringInfo s, LOCKMODE mode)
464464
}
465465

466466
staticvoid
467-
process_remote_commit(StringInfos)
467+
process_remote_commit(StringInfoin)
468468
{
469-
CommitTransactionCommand();
469+
XLogRecPtrcommit_lsn;
470+
XLogRecPtrend_lsn;
471+
TimestampTzcommit_time;
472+
uint8flags;
473+
constchar*gid;
474+
475+
/* read flags */
476+
flags=pq_getmsgbyte(in);
477+
478+
/* read fields */
479+
commit_lsn=pq_getmsgint64(in);
480+
end_lsn=pq_getmsgint64(in);
481+
commit_time=pq_getmsgint64(in);
482+
483+
if (PGLOGICAL_XACT_EVENT(flags)!=PGLOGICAL_COMMIT)
484+
gid=pq_getmsgstring(in);
485+
486+
switch(PGLOGICAL_XACT_EVENT(flags))
487+
{
488+
casePGLOGICAL_COMMIT:
489+
{
490+
if (IsTransactionState())
491+
CommitTransactionCommand();
492+
break;
493+
}
494+
casePGLOGICAL_PREPARE:
495+
{
496+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
497+
BeginTransactionBlock();
498+
CommitTransactionCommand();
499+
StartTransactionCommand();
500+
/* PREPARE itself */
501+
PrepareTransactionBlock(gid);
502+
CommitTransactionCommand();
503+
break;
504+
}
505+
casePGLOGICAL_COMMIT_PREPARED:
506+
{
507+
StartTransactionCommand();
508+
FinishPreparedTransaction(gid, true);
509+
CommitTransactionCommand();
510+
break;
511+
}
512+
casePGLOGICAL_ABORT_PREPARED:
513+
{
514+
StartTransactionCommand();
515+
FinishPreparedTransaction(gid, false);
516+
CommitTransactionCommand();
517+
break;
518+
}
519+
default:
520+
Assert(false);
521+
}
470522
}
471523

472524
staticvoid

‎pglogical_proto.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,37 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
127127
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn)
128128
{
129129
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
130-
if (!mm->isLocal) {
131-
pq_sendbyte(out,'C');/* sending COMMIT */
132-
}
130+
uint8flags=0;
131+
132+
if (mm->isLocal)
133+
return;
134+
135+
pq_sendbyte(out,'C');/* sending COMMIT */
136+
137+
if (txn->xact_action==XLOG_XACT_COMMIT)
138+
flags=PGLOGICAL_COMMIT;
139+
elseif (txn->xact_action==XLOG_XACT_PREPARE)
140+
flags=PGLOGICAL_PREPARE;
141+
elseif (txn->xact_action==XLOG_XACT_COMMIT_PREPARED)
142+
flags=PGLOGICAL_COMMIT_PREPARED;
143+
elseif (txn->xact_action==XLOG_XACT_ABORT_PREPARED)
144+
flags=PGLOGICAL_ABORT_PREPARED;
145+
else
146+
Assert(false);
147+
148+
/* send the flags field */
149+
pq_sendbyte(out,flags);
150+
151+
/* send fixed fields */
152+
pq_sendint64(out,commit_lsn);
153+
pq_sendint64(out,txn->end_lsn);
154+
pq_sendint64(out,txn->commit_time);
155+
156+
if (txn->xact_action==XLOG_XACT_PREPARE||
157+
txn->xact_action==XLOG_XACT_COMMIT_PREPARED||
158+
txn->xact_action==XLOG_XACT_ABORT_PREPARED)
159+
pq_sendstring(out,txn->gid);
160+
133161
}
134162

135163
/*

‎tests/reinit-mm.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
n_nodes=3
2-
export PATH=~/postgres_cluster/dist/bin/:$PATH
2+
export PATH=~/code/postgres_cluster/install/bin/:$PATH
33
ulimit -c unlimited
44
pkill -9 postgres
55
pkill -9 arbiter

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp