Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb8c3075

Browse files
committed
Fix bug in flushed_lsn reporting
1 parentc1f3c26 commitb8c3075

File tree

6 files changed

+80
-58
lines changed

6 files changed

+80
-58
lines changed

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3535
LANGUAGE C;
3636

3737
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"liveNodes"integer,"allNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer,
38-
"xidHashSize"bigint,"gidHashSize"bigint,"oldestSnapshot"bigint,"configChanges"integer);
38+
"xidHashSize"bigint,"gidHashSize"bigint,"oldestXid"integer,"configChanges"integer);
3939

4040
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
4141
AS'MODULE_PATHNAME','mtm_get_cluster_state'

‎contrib/mmts/multimaster.c‎

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ static int MtmMinRecoveryLag;
200200
staticintMtmMaxRecoveryLag;
201201
staticintMtm2PCPrepareRatio;
202202
staticintMtm2PCMinTimeout;
203+
staticintMtmGcPeriod;
203204
staticboolMtmIgnoreTablesWithoutPk;
204205

205206
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -342,16 +343,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342343
SnapshotMtmGetSnapshot(Snapshotsnapshot)
343344
{
344345
snapshot=PgGetSnapshotData(snapshot);
345-
RecentGlobalDataXmin=RecentGlobalXmin=Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346+
RecentGlobalDataXmin=RecentGlobalXmin=Mtm->oldestXid;
346347
returnsnapshot;
347348
}
348349

349350

350351
TransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum)
351352
{
352353
TransactionIdxmin=PgGetOldestXmin(NULL, false);/* consider all backends */
353-
xmin=MtmAdjustOldestXid(xmin);
354-
returnxmin;
354+
if (TransactionIdIsValid(xmin)) {
355+
MtmLock(LW_EXCLUSIVE);
356+
xmin=MtmAdjustOldestXid(xmin);
357+
MtmUnlock();
358+
}
359+
returnxmin;
355360
}
356361

357362
boolMtmXidInMVCCSnapshot(TransactionIdxid,Snapshotsnapshot)
@@ -446,53 +451,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
446451
staticTransactionId
447452
MtmAdjustOldestXid(TransactionIdxid)
448453
{
449-
if (TransactionIdIsValid(xid)) {
450-
MtmTransState*ts,*prev=NULL;
451-
inti;
452-
453-
MtmLock(LW_EXCLUSIVE);
454-
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
455-
if (ts!=NULL) {
456-
csn_toldestSnapshot=ts->snapshot;
457-
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
458-
for (i=0;i<Mtm->nAllNodes;i++) {
459-
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
460-
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
461-
{
462-
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
463-
}
464-
}
465-
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
466-
467-
for (ts=Mtm->transListHead;
468-
ts!=NULL
469-
&&ts->csn<oldestSnapshot
470-
&&TransactionIdPrecedes(ts->xid,xid)
471-
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
472-
ts->status==TRANSACTION_STATUS_ABORTED);
473-
prev=ts,ts=ts->next)
454+
inti;
455+
MtmTransState*prev=NULL;
456+
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
457+
MTM_LOG1("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d",MyProcPid,xid,ts!=NULL ?ts->snapshot :0,ts!=NULL ?ts->csn :0,ts!=NULL ?ts->status :-1);
458+
Mtm->gcCount=0;
459+
if (ts!=NULL) {
460+
csn_toldestSnapshot=ts->snapshot;
461+
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
462+
for (i=0;i<Mtm->nAllNodes;i++) {
463+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
464+
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
474465
{
475-
if (prev!=NULL) {
476-
/* Remove information about too old transactions */
477-
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
478-
}
466+
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
479467
}
480-
}
481-
if (MtmUseDtm)
468+
}
469+
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
470+
471+
for (ts=Mtm->transListHead;
472+
ts!=NULL
473+
&&ts->csn<oldestSnapshot
474+
&&TransactionIdPrecedes(ts->xid,xid)
475+
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
476+
ts->status==TRANSACTION_STATUS_ABORTED);
477+
prev=ts,ts=ts->next)
482478
{
483479
if (prev!=NULL) {
484-
Mtm->transListHead=prev;
485-
Mtm->oldestXid=xid=prev->xid;
486-
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
487-
xid=Mtm->oldestXid;
488-
}
489-
}else {
490-
if (prev!=NULL) {
491-
Mtm->transListHead=prev;
480+
/* Remove information about too old transactions */
481+
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
492482
}
493483
}
494-
MtmUnlock();
495-
}
484+
}
485+
if (MtmUseDtm)
486+
{
487+
if (prev!=NULL) {
488+
Mtm->transListHead=prev;
489+
Mtm->oldestXid=xid=prev->xid;
490+
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
491+
xid=Mtm->oldestXid;
492+
}
493+
}else {
494+
if (prev!=NULL) {
495+
Mtm->transListHead=prev;
496+
}
497+
}
496498
returnxid;
497499
}
498500
/*
@@ -614,7 +616,12 @@ static void
614616
MtmBeginTransaction(MtmCurrentTrans*x)
615617
{
616618
if (x->snapshot==INVALID_CSN) {
617-
MtmLock(LW_EXCLUSIVE);
619+
TransactionIdxmin= (Mtm->gcCount >=MtmGcPeriod) ?PgGetOldestXmin(NULL, false) :InvalidTransactionId;/* Get oldest xmin outside critical section */
620+
621+
MtmLock(LW_EXCLUSIVE);
622+
if (TransactionIdIsValid(xmin)&&Mtm->gcCount >=MtmGcPeriod) {
623+
MtmAdjustOldestXid(xmin);
624+
}
618625
x->xid=GetCurrentTransactionIdIfAny();
619626
x->isReplicated= false;
620627
x->isDistributed=MtmIsUserTransaction();
@@ -690,7 +697,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
690697
}
691698

692699
MtmLock(LW_EXCLUSIVE);
693-
694700
/*
695701
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
696702
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -716,8 +722,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716722

717723
x->isPrepared= true;
718724
x->csn=ts->csn;
719-
725+
720726
Mtm->transCount+=1;
727+
Mtm->gcCount+=1;
728+
721729
MtmTransactionListAppend(ts);
722730
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
723731
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
@@ -1466,8 +1474,9 @@ static void MtmInitialize()
14661474
Mtm->transListHead=NULL;
14671475
Mtm->transListTail=&Mtm->transListHead;
14681476
Mtm->nReceivers=0;
1469-
Mtm->timeShift=0;
1477+
Mtm->timeShift=0;
14701478
Mtm->transCount=0;
1479+
Mtm->gcCount=0;
14711480
Mtm->nConfigChanges=0;
14721481
Mtm->localTablesHashLoaded= false;
14731482
for (i=0;i<MtmNodes;i++) {
@@ -1600,6 +1609,21 @@ _PG_init(void)
16001609
if (!process_shared_preload_libraries_in_progress)
16011610
return;
16021611

1612+
DefineCustomIntVariable(
1613+
"multimaster.gc_period",
1614+
"Number of distributed transactions after which garbage collection is started",
1615+
"Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map",
1616+
&MtmGcPeriod,
1617+
MTM_HASH_SIZE/10,
1618+
1,
1619+
INT_MAX,
1620+
PGC_BACKEND,
1621+
0,
1622+
NULL,
1623+
NULL,
1624+
NULL
1625+
);
1626+
16031627
DefineCustomIntVariable(
16041628
"multimaster.max_nodes",
16051629
"Maximal number of cluster nodes",
@@ -2339,7 +2363,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23392363
values[11]=Int32GetDatum(Mtm->recoverySlot);
23402364
values[12]=Int64GetDatum(hash_get_num_entries(MtmXid2State));
23412365
values[13]=Int64GetDatum(hash_get_num_entries(MtmGid2State));
2342-
values[14]=Int64GetDatum(Mtm->oldestSnapshot);
2366+
values[14]=Int32GetDatum(Mtm->oldestXid);
23432367
values[15]=Int32GetDatum(Mtm->nConfigChanges);
23442368

23452369
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));

‎contrib/mmts/multimaster.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ typedef struct
180180
MtmTransState**transListTail;/* Tail of L1 list of all finished transactionds, used to append new elements.
181181
This list is expected to be in CSN ascending order, by strict order may be violated */
182182
uint64transCount;/* Counter of transactions perfromed by this node */
183+
uint64gcCount;/* Number of global transactions performed since last GC */
183184
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
184185
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
185186
}MtmState;

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ static void process_remote_insert(StringInfo s, Relation rel);
7575
staticvoidprocess_remote_update(StringInfos,Relationrel);
7676
staticvoidprocess_remote_delete(StringInfos,Relationrel);
7777

78-
staticintMtmReplicationNode;
79-
8078
/*
8179
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8280
*
@@ -481,8 +479,8 @@ static void
481479
MtmBeginSession(void)
482480
{
483481
charslot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
484-
MtmLockNode(MtmReplicationNode);
485-
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,MtmReplicationNode);
482+
MtmLockNode(MtmReplicationNodeId);
483+
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,MtmReplicationNodeId);
486484
Assert(replorigin_session_origin==InvalidRepOriginId);
487485
replorigin_session_origin=replorigin_by_name(slot_name, false);
488486
MTM_LOG3("%d: Begin setup replorigin session: %d",MyProcPid,replorigin_session_origin);
@@ -498,7 +496,7 @@ MtmEndSession(bool unlock)
498496
replorigin_session_origin=InvalidRepOriginId;
499497
replorigin_session_reset();
500498
if (unlock) {
501-
MtmUnlockNode(MtmReplicationNode);
499+
MtmUnlockNode(MtmReplicationNodeId);
502500
}
503501
MTM_LOG3("%d: End reset replorigin session: %d",MyProcPid,replorigin_session_origin);
504502
}
@@ -513,7 +511,7 @@ process_remote_commit(StringInfo in)
513511
XLogRecPtrend_lsn;
514512
/* read flags */
515513
flags=pq_getmsgbyte(in);
516-
MtmReplicationNode=pq_getmsgbyte(in);
514+
MtmReplicationNodeId=pq_getmsgbyte(in);
517515

518516
/* read fields */
519517
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */

‎contrib/mmts/pglogical_receiver.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static volatile sig_atomic_t got_sighup = false;
4747

4848
/* GUC variables */
4949
staticintreceiver_idle_time=0;
50-
staticboolreceiver_sync_mode=false;
50+
staticboolreceiver_sync_mode=true;/* We need sync mode to have up-to-date values of catalog_xmin in replication slots */
5151

5252
/* Worker name */
5353
staticcharworker_proc[BGW_MAXLEN];

‎contrib/mmts/tests/dtmacid.cpp‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ int main (int argc, char* argv[])
285285
for (int i =0; i < cfg.nReaders; i++) {
286286
readers[i].wait();
287287
nSelects += readers[i].selects;
288-
nTransactions += writers[i].transactions;
289288
}
290289

291290
time_t elapsed =getCurrentTime() - start;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp