Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e9e4fd

Browse files
knizhnikkelvich
authored andcommitted
Fix bug in flushed_lsn reporting
1 parent906fcbb commit8e9e4fd

File tree

6 files changed

+80
-58
lines changed

6 files changed

+80
-58
lines changed

‎multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3535
LANGUAGE C;
3636

3737
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"liveNodes"integer,"allNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer,
38-
"xidHashSize"bigint,"gidHashSize"bigint,"oldestSnapshot"bigint,"configChanges"integer);
38+
"xidHashSize"bigint,"gidHashSize"bigint,"oldestXid"integer,"configChanges"integer);
3939

4040
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
4141
AS'MODULE_PATHNAME','mtm_get_cluster_state'

‎multimaster.c

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ static int MtmMinRecoveryLag;
199199
staticintMtmMaxRecoveryLag;
200200
staticintMtm2PCPrepareRatio;
201201
staticintMtm2PCMinTimeout;
202+
staticintMtmGcPeriod;
202203
staticboolMtmIgnoreTablesWithoutPk;
203204

204205
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -341,16 +342,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
341342
SnapshotMtmGetSnapshot(Snapshotsnapshot)
342343
{
343344
snapshot=PgGetSnapshotData(snapshot);
344-
RecentGlobalDataXmin=RecentGlobalXmin=Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
345+
RecentGlobalDataXmin=RecentGlobalXmin=Mtm->oldestXid;
345346
returnsnapshot;
346347
}
347348

348349

349350
TransactionIdMtmGetOldestXmin(Relationrel,boolignoreVacuum)
350351
{
351352
TransactionIdxmin=PgGetOldestXmin(NULL, false);/* consider all backends */
352-
xmin=MtmAdjustOldestXid(xmin);
353-
returnxmin;
353+
if (TransactionIdIsValid(xmin)) {
354+
MtmLock(LW_EXCLUSIVE);
355+
xmin=MtmAdjustOldestXid(xmin);
356+
MtmUnlock();
357+
}
358+
returnxmin;
354359
}
355360

356361
boolMtmXidInMVCCSnapshot(TransactionIdxid,Snapshotsnapshot)
@@ -445,53 +450,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
445450
staticTransactionId
446451
MtmAdjustOldestXid(TransactionIdxid)
447452
{
448-
if (TransactionIdIsValid(xid)) {
449-
MtmTransState*ts,*prev=NULL;
450-
inti;
451-
452-
MtmLock(LW_EXCLUSIVE);
453-
ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
454-
if (ts!=NULL) {
455-
csn_toldestSnapshot=ts->snapshot;
456-
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
457-
for (i=0;i<Mtm->nAllNodes;i++) {
458-
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
459-
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
460-
{
461-
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
462-
}
463-
}
464-
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
465-
466-
for (ts=Mtm->transListHead;
467-
ts!=NULL
468-
&&ts->csn<oldestSnapshot
469-
&&TransactionIdPrecedes(ts->xid,xid)
470-
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
471-
ts->status==TRANSACTION_STATUS_ABORTED);
472-
prev=ts,ts=ts->next)
453+
inti;
454+
MtmTransState*prev=NULL;
455+
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&xid,HASH_FIND,NULL);
456+
MTM_LOG1("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d",MyProcPid,xid,ts!=NULL ?ts->snapshot :0,ts!=NULL ?ts->csn :0,ts!=NULL ?ts->status :-1);
457+
Mtm->gcCount=0;
458+
if (ts!=NULL) {
459+
csn_toldestSnapshot=ts->snapshot;
460+
Mtm->nodes[MtmNodeId-1].oldestSnapshot=oldestSnapshot;
461+
for (i=0;i<Mtm->nAllNodes;i++) {
462+
if (!BIT_CHECK(Mtm->disabledNodeMask,i)
463+
&&Mtm->nodes[i].oldestSnapshot<oldestSnapshot)
473464
{
474-
if (prev!=NULL) {
475-
/* Remove information about too old transactions */
476-
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
477-
}
465+
oldestSnapshot=Mtm->nodes[i].oldestSnapshot;
478466
}
479-
}
480-
if (MtmUseDtm)
467+
}
468+
oldestSnapshot-=MtmVacuumDelay*USECS_PER_SEC;
469+
470+
for (ts=Mtm->transListHead;
471+
ts!=NULL
472+
&&ts->csn<oldestSnapshot
473+
&&TransactionIdPrecedes(ts->xid,xid)
474+
&& (ts->status==TRANSACTION_STATUS_COMMITTED||
475+
ts->status==TRANSACTION_STATUS_ABORTED);
476+
prev=ts,ts=ts->next)
481477
{
482478
if (prev!=NULL) {
483-
Mtm->transListHead=prev;
484-
Mtm->oldestXid=xid=prev->xid;
485-
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
486-
xid=Mtm->oldestXid;
487-
}
488-
}else {
489-
if (prev!=NULL) {
490-
Mtm->transListHead=prev;
479+
/* Remove information about too old transactions */
480+
hash_search(MtmXid2State,&prev->xid,HASH_REMOVE,NULL);
491481
}
492482
}
493-
MtmUnlock();
494-
}
483+
}
484+
if (MtmUseDtm)
485+
{
486+
if (prev!=NULL) {
487+
Mtm->transListHead=prev;
488+
Mtm->oldestXid=xid=prev->xid;
489+
}elseif (TransactionIdPrecedes(Mtm->oldestXid,xid)) {
490+
xid=Mtm->oldestXid;
491+
}
492+
}else {
493+
if (prev!=NULL) {
494+
Mtm->transListHead=prev;
495+
}
496+
}
495497
returnxid;
496498
}
497499
/*
@@ -613,7 +615,12 @@ static void
613615
MtmBeginTransaction(MtmCurrentTrans*x)
614616
{
615617
if (x->snapshot==INVALID_CSN) {
616-
MtmLock(LW_EXCLUSIVE);
618+
TransactionIdxmin= (Mtm->gcCount >=MtmGcPeriod) ?PgGetOldestXmin(NULL, false) :InvalidTransactionId;/* Get oldest xmin outside critical section */
619+
620+
MtmLock(LW_EXCLUSIVE);
621+
if (TransactionIdIsValid(xmin)&&Mtm->gcCount >=MtmGcPeriod) {
622+
MtmAdjustOldestXid(xmin);
623+
}
617624
x->xid=GetCurrentTransactionIdIfAny();
618625
x->isReplicated= false;
619626
x->isDistributed=MtmIsUserTransaction();
@@ -689,7 +696,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
689696
}
690697

691698
MtmLock(LW_EXCLUSIVE);
692-
693699
/*
694700
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
695701
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -715,8 +721,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
715721

716722
x->isPrepared= true;
717723
x->csn=ts->csn;
718-
724+
719725
Mtm->transCount+=1;
726+
Mtm->gcCount+=1;
727+
720728
MtmTransactionListAppend(ts);
721729
MtmAddSubtransactions(ts,subxids,ts->nSubxids);
722730
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
@@ -1465,8 +1473,9 @@ static void MtmInitialize()
14651473
Mtm->transListHead=NULL;
14661474
Mtm->transListTail=&Mtm->transListHead;
14671475
Mtm->nReceivers=0;
1468-
Mtm->timeShift=0;
1476+
Mtm->timeShift=0;
14691477
Mtm->transCount=0;
1478+
Mtm->gcCount=0;
14701479
Mtm->nConfigChanges=0;
14711480
Mtm->localTablesHashLoaded= false;
14721481
for (i=0;i<MtmNodes;i++) {
@@ -1599,6 +1608,21 @@ _PG_init(void)
15991608
if (!process_shared_preload_libraries_in_progress)
16001609
return;
16011610

1611+
DefineCustomIntVariable(
1612+
"multimaster.gc_period",
1613+
"Number of distributed transactions after which garbage collection is started",
1614+
"Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map",
1615+
&MtmGcPeriod,
1616+
MTM_HASH_SIZE/10,
1617+
1,
1618+
INT_MAX,
1619+
PGC_BACKEND,
1620+
0,
1621+
NULL,
1622+
NULL,
1623+
NULL
1624+
);
1625+
16021626
DefineCustomIntVariable(
16031627
"multimaster.max_nodes",
16041628
"Maximal number of cluster nodes",
@@ -2338,7 +2362,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23382362
values[11]=Int32GetDatum(Mtm->recoverySlot);
23392363
values[12]=Int64GetDatum(hash_get_num_entries(MtmXid2State));
23402364
values[13]=Int64GetDatum(hash_get_num_entries(MtmGid2State));
2341-
values[14]=Int64GetDatum(Mtm->oldestSnapshot);
2365+
values[14]=Int32GetDatum(Mtm->oldestXid);
23422366
values[15]=Int32GetDatum(Mtm->nConfigChanges);
23432367

23442368
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ typedef struct
180180
MtmTransState**transListTail;/* Tail of L1 list of all finished transactionds, used to append new elements.
181181
This list is expected to be in CSN ascending order, by strict order may be violated */
182182
uint64transCount;/* Counter of transactions perfromed by this node */
183+
uint64gcCount;/* Number of global transactions performed since last GC */
183184
BgwPoolpool;/* Pool of background workers for applying logical replication patches */
184185
MtmNodeInfonodes[1];/* [Mtm->nAllNodes]: per-node data */
185186
}MtmState;

‎pglogical_apply.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ static void process_remote_insert(StringInfo s, Relation rel);
7575
staticvoidprocess_remote_update(StringInfos,Relationrel);
7676
staticvoidprocess_remote_delete(StringInfos,Relationrel);
7777

78-
staticintMtmReplicationNode;
79-
8078
/*
8179
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8280
*
@@ -481,8 +479,8 @@ static void
481479
MtmBeginSession(void)
482480
{
483481
charslot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
484-
MtmLockNode(MtmReplicationNode);
485-
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,MtmReplicationNode);
482+
MtmLockNode(MtmReplicationNodeId);
483+
sprintf(slot_name,MULTIMASTER_SLOT_PATTERN,MtmReplicationNodeId);
486484
Assert(replorigin_session_origin==InvalidRepOriginId);
487485
replorigin_session_origin=replorigin_by_name(slot_name, false);
488486
MTM_LOG3("%d: Begin setup replorigin session: %d",MyProcPid,replorigin_session_origin);
@@ -498,7 +496,7 @@ MtmEndSession(void)
498496
replorigin_session_origin=InvalidRepOriginId;
499497
replorigin_session_reset();
500498
if (unlock) {
501-
MtmUnlockNode(MtmReplicationNode);
499+
MtmUnlockNode(MtmReplicationNodeId);
502500
}
503501
MTM_LOG3("%d: End reset replorigin session: %d",MyProcPid,replorigin_session_origin);
504502
}
@@ -513,7 +511,7 @@ process_remote_commit(StringInfo in)
513511
XLogRecPtrend_lsn;
514512
/* read flags */
515513
flags=pq_getmsgbyte(in);
516-
MtmReplicationNode=pq_getmsgbyte(in);
514+
MtmReplicationNodeId=pq_getmsgbyte(in);
517515

518516
/* read fields */
519517
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */

‎pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static volatile sig_atomic_t got_sighup = false;
4747

4848
/* GUC variables */
4949
staticintreceiver_idle_time=0;
50-
staticboolreceiver_sync_mode=false;
50+
staticboolreceiver_sync_mode=true;/* We need sync mode to have up-to-date values of catalog_xmin in replication slots */
5151

5252
/* Worker name */
5353
staticcharworker_proc[BGW_MAXLEN];

‎tests/dtmacid.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ int main (int argc, char* argv[])
285285
for (int i =0; i < cfg.nReaders; i++) {
286286
readers[i].wait();
287287
nSelects += readers[i].selects;
288-
nTransactions += writers[i].transactions;
289288
}
290289

291290
time_t elapsed =getCurrentTime() - start;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp