Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc506245

Browse files
knizhnikkelvich
authored andcommitted
Add wait 2pc timeout
1 parentb6abbca commitc506245

File tree

5 files changed

+75
-31
lines changed

5 files changed

+75
-31
lines changed

‎arbiter.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -720,14 +720,24 @@ static void MtmTransReceiver(Datum arg)
720720
MtmAbortTransaction(ts);
721721
}
722722

723+
if (!MtmUseDtm&&msg->csn>ts->csn) {
724+
ts->csn=msg->csn;
725+
MtmSyncClock(ts->csn);
726+
}
727+
723728
if (++ts->nVotes==Mtm->nNodes) {
724729
/* All nodes are finished their transactions */
725-
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
730+
if (ts->status==TRANSACTION_STATUS_ABORTED) {
731+
MtmWakeUpBackend(ts);
732+
}elseif (MtmUseDtm) {
733+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
726734
ts->nVotes=1;/* I voted myself */
727735
MtmSendNotificationMessage(ts,MSG_PREPARE);
728736
}else {
729-
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
730-
MtmWakeUpBackend(ts);
737+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
738+
ts->csn=MtmAssignCSN();
739+
ts->status=TRANSACTION_STATUS_UNKNOWN;
740+
MtmWakeUpBackend(ts);
731741
}
732742
}
733743
break;

‎multimaster.c

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ static int MtmWorkers;
190190
staticintMtmVacuumDelay;
191191
staticintMtmMinRecoveryLag;
192192
staticintMtmMaxRecoveryLag;
193+
staticintMtm2PCPrepareRatio;
194+
staticintMtm2PCMinTimeout;
193195
staticboolMtmIgnoreTablesWithoutPk;
194196

195197
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -765,8 +767,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
765767

766768
}
767769

768-
statictime_tmaxWakeupTime;
769-
770770
staticvoid
771771
MtmPostPrepareTransaction(MtmCurrentTrans*x)
772772
{
@@ -782,25 +782,32 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
782782
tm->state=ts;
783783
ts->votingCompleted= true;
784784
if (Mtm->status!=MTM_RECOVERY) {
785-
MtmSendNotificationMessage(ts,MtmUseDtm ?MSG_READY :MSG_PREPARED);/* send notification to coordinator */
785+
if (MtmUseDtm) {
786+
MtmSendNotificationMessage(ts,MSG_READY);/* send notification to coordinator */
787+
}else {
788+
ts->csn=MtmAssignCSN();
789+
MtmSendNotificationMessage(ts,MSG_PREPARED);/* send notification to coordinator */
790+
ts->status=TRANSACTION_STATUS_UNKNOWN;
791+
}
786792
}else {
787793
ts->status=TRANSACTION_STATUS_UNKNOWN;
788794
}
789795
MtmUnlock();
790796
MtmResetTransaction(x);
791797
}else {
792-
time_twakeupTime;
798+
time_ttimeout=Max(Mtm2PCMinTimeout, (ts->csn-ts->snapshot)*Mtm2PCPrepareRatio/100000);/* usec->msec and percents */
799+
intresult=0;
793800
/* wait votes from all nodes */
794-
while (!ts->votingCompleted) {
801+
while (!ts->votingCompleted&& !(result&WL_TIMEOUT)) {
795802
MtmUnlock();
796-
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
803+
result=WaitLatch(&MyProc->procLatch,WL_LATCH_SET|WL_TIMEOUT,timeout);
797804
ResetLatch(&MyProc->procLatch);
798-
wakeupTime=MtmGetCurrentTime()-ts->wakeupTime;
799-
if (wakeupTime>maxWakeupTime) {
800-
maxWakeupTime=wakeupTime;
801-
}
802805
MtmLock(LW_SHARED);
803806
}
807+
if (!ts->votingCompleted) {
808+
ts->status=TRANSACTION_STATUS_ABORTED;
809+
elog(WARNING,"Transaction is aborted because of %d msec timeout expiration", (int)timeout);
810+
}
804811
x->status=ts->status;
805812
MTM_LOG3("%d: Result of vote: %d",MyProcPid,ts->status);
806813
MtmUnlock();
@@ -988,11 +995,12 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
988995
}
989996

990997
voidMtmWakeUpBackend(MtmTransState*ts)
991-
{
992-
MTM_LOG3("Wakeup backed procno=%d, pid=%d",ts->procno,ProcGlobal->allProcs[ts->procno].pid);
993-
ts->votingCompleted= true;
994-
ts->wakeupTime=MtmGetCurrentTime();
995-
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
998+
{
999+
if (!ts->votingCompleted) {
1000+
MTM_LOG3("Wakeup backed procno=%d, pid=%d",ts->procno,ProcGlobal->allProcs[ts->procno].pid);
1001+
ts->votingCompleted= true;
1002+
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
1003+
}
9961004
}
9971005

9981006
voidMtmAbortTransaction(MtmTransState*ts)
@@ -1598,6 +1606,38 @@ _PG_init(void)
15981606
if (!process_shared_preload_libraries_in_progress)
15991607
return;
16001608

1609+
DefineCustomIntVariable(
1610+
"multimaster.2pc_min_timeout",
1611+
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
1612+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1613+
&Mtm2PCMinTimeout,
1614+
10000,
1615+
0,
1616+
INT_MAX,
1617+
PGC_BACKEND,
1618+
0,
1619+
NULL,
1620+
NULL,
1621+
NULL
1622+
);
1623+
1624+
DefineCustomIntVariable(
1625+
"multimaster.2pc_prepare_ratio",
1626+
"Percent of prepare time for maximal time of second phase of two-pahse commit",
1627+
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
1628+
&Mtm2PCPrepareRatio,
1629+
100,
1630+
0,
1631+
INT_MAX,
1632+
PGC_BACKEND,
1633+
0,
1634+
NULL,
1635+
NULL,
1636+
NULL
1637+
);
1638+
1639+
1640+
16011641
DefineCustomIntVariable(
16021642
"multimaster.node_disable_delay",
16031643
"Minamal amount of time (sec) between node status change",

‎multimaster.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ typedef struct MtmTransState
135135
intprocno;/* pgprocno of transaction coordinator waiting for responses from replicas,
136136
used to notify coordinator by arbiter */
137137
intnSubxids;/* Number of subtransanctions */
138-
time_twakeupTime;
139138
MtmMessageCodecmd;/* Notification message to be sent */
140139
structMtmTransState*nextVoting;/* Next element in L1-list of voting transactions. */
141140
structMtmTransState*next;/* Next element in L1 list of all finished transaction present in xid2state hash */

‎pglogical_receiver.c

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,13 @@ static volatile sig_atomic_t got_sighup = false;
5353

5454
/* GUC variables */
5555
staticintreceiver_idle_time=0;
56-
staticboolreceiver_sync_mode=true;
56+
staticboolreceiver_sync_mode=false;
5757

5858
/* Worker name */
59-
charworker_proc[BGW_MAXLEN];
59+
staticcharworker_proc[BGW_MAXLEN];
6060

6161
/* Lastly written positions */
6262
staticXLogRecPtroutput_written_lsn=InvalidXLogRecPtr;
63-
staticXLogRecPtroutput_fsync_lsn=InvalidXLogRecPtr;
64-
staticXLogRecPtroutput_applied_lsn=InvalidXLogRecPtr;
6563

6664
/* Stream functions */
6765
staticvoidfe_sendint64(int64i,char*buf);
@@ -91,16 +89,17 @@ receiver_raw_sighup(SIGNAL_ARGS)
9189
* Send a Standby Status Update message to server.
9290
*/
9391
staticbool
94-
sendFeedback(PGconn*conn,int64now)
92+
sendFeedback(PGconn*conn,int64now,RepOriginIdoriginId)
9593
{
9694
charreplybuf[1+8+8+8+8+1];
9795
intlen=0;
96+
XLogRecPtroutput_applied_lsn=replorigin_get_progress(originId, true);
9897

9998
replybuf[len]='r';
10099
len+=1;
101100
fe_sendint64(output_written_lsn,&replybuf[len]);/* write */
102101
len+=8;
103-
fe_sendint64(output_fsync_lsn,&replybuf[len]);/* flush */
102+
fe_sendint64(output_applied_lsn,&replybuf[len]);/* flush */
104103
len+=8;
105104
fe_sendint64(output_applied_lsn,&replybuf[len]);/* apply */
106105
len+=8;
@@ -409,8 +408,6 @@ pglogical_receiver_main(Datum main_arg)
409408

410409
/* Update written position */
411410
output_written_lsn=Max(walEnd,output_written_lsn);
412-
output_fsync_lsn=output_written_lsn;
413-
output_applied_lsn=output_written_lsn;
414411

415412
/*
416413
* If the server requested an immediate reply, send one.
@@ -424,7 +421,7 @@ pglogical_receiver_main(Datum main_arg)
424421
int64now=feGetCurrentTimestamp();
425422

426423
/* Leave is feedback is not sent properly */
427-
if (!sendFeedback(conn,now))
424+
if (!sendFeedback(conn,now,originId))
428425
proc_exit(1);
429426
}
430427
continue;
@@ -482,8 +479,6 @@ pglogical_receiver_main(Datum main_arg)
482479
}
483480
/* Update written position */
484481
output_written_lsn=Max(walEnd,output_written_lsn);
485-
output_fsync_lsn=output_written_lsn;
486-
output_applied_lsn=output_written_lsn;
487482
}
488483

489484
/* No data, move to next loop */

‎tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
//r = txn.exec("selectmtm_get_snapshot()");
132+
r = txn.exec("selectmtm.get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp