Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd2c7edb

Browse files
knizhnikkelvich
authored andcommitted
Change transaction status polling
1 parent92c7c87 commitd2c7edb

File tree

3 files changed

+224
-17
lines changed

3 files changed

+224
-17
lines changed

‎arbiter.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ static bool send_heartbeat;
8484
statictimestamp_tlast_sent_heartbeat;
8585
staticTimeoutIdheartbeat_timer;
8686
staticnodemask_tbusy_mask;
87+
statictimestamp_tlast_heartbeat_to_node[MAX_NODES];
8788

8889
staticvoidMtmSender(Datumarg);
8990
staticvoidMtmReceiver(Datumarg);
@@ -369,11 +370,15 @@ static void MtmSendHeartbeat()
369370
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
370371
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
371372
}else {
373+
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
374+
MTM_LOG1("Last hearbeat to node %d was sent %ld microseconds ago",i+1,now-last_heartbeat_to_node[i]);
375+
}
376+
last_heartbeat_to_node[i]=now;
372377
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
373378
if (BIT_CHECK(Mtm->connectivityMask,i)) {
374379
close(sockets[i]);
375380
sockets[i]=-1;
376-
MtmReconnectNode(i+1);
381+
MtmReconnectNode(i+1);/* set reconnect mask to force node reconnent */
377382
//MtmOnNodeConnect(i+1);
378383
}
379384
MTM_LOG4("Send heartbeat to node %d with timestamp %ld",i+1,now);
@@ -386,6 +391,9 @@ static void MtmSendHeartbeat()
386391

387392
}
388393

394+
/* This function shoudl be called from all places where sender can be blocked.
395+
* It checks send_heartbeat flag set by timer and if it is set hthen sends heartbeats to all alive nodes
396+
*/
389397
voidMtmCheckHeartbeat()
390398
{
391399
if (send_heartbeat&& !stop) {
@@ -544,6 +552,9 @@ static bool MtmSendToNode(int node, void const* buf, int size)
544552
BIT_SET(busy_mask,node);
545553
while (true) {
546554
#if0
555+
/* Original intention was to reestablish connectect when reconnet mask is set to avoid hanged-up connection.
556+
* But reconnectMask is set not only when connection is broken, so breaking connection in all this cases cause avalunch of connection failures.
557+
*/
547558
if (sockets[node] >=0&&BIT_CHECK(Mtm->reconnectMask,node)) {
548559
elog(WARNING,"Arbiter is forced to reconnect to node %d",node+1);
549560
close(sockets[node]);
@@ -687,6 +698,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, MtmArbiterMessage* msg)
687698
buf->data[buf->used++]=*msg;
688699
}
689700

701+
690702
staticvoidMtmSender(Datumarg)
691703
{
692704
sigset_tsset;
@@ -709,7 +721,7 @@ static void MtmSender(Datum arg)
709721
/* Connect to a database */
710722
BackgroundWorkerInitializeConnection(MtmDatabaseName,NULL);
711723

712-
724+
/* Start heartbeat times */
713725
heartbeat_timer=RegisterTimeout(USER_TIMEOUT,MtmScheduleHeartbeat);
714726
enable_timeout_after(heartbeat_timer,MtmHeartbeatSendTimeout);
715727

@@ -942,11 +954,14 @@ static void MtmReceiver(Datum arg)
942954
}else {
943955
ts=tm->state;
944956
BIT_SET(ts->votedMask,node-1);
945-
if (ts->status==TRANSACTION_STATUS_UNKNOWN) {
957+
if (ts->status==TRANSACTION_STATUS_UNKNOWN||ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
946958
if (msg->status==TRANSACTION_STATUS_IN_PROGRESS||msg->status==TRANSACTION_STATUS_ABORTED) {
947959
elog(LOG,"Abort prepared transaction %s because it is in state %s at node %d",
948960
msg->gid,MtmTxnStatusMnem[msg->status],node);
961+
962+
replorigin_session_origin=DoNotReplicateId;
949963
MtmFinishPreparedTransaction(ts, false);
964+
replorigin_session_origin=InvalidRepOriginId;
950965
}
951966
elseif (msg->status==TRANSACTION_STATUS_COMMITTED||msg->status==TRANSACTION_STATUS_UNKNOWN)
952967
{
@@ -956,7 +971,10 @@ static void MtmReceiver(Datum arg)
956971
}
957972
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
958973
elog(LOG,"Commit transaction %s because it is prepared at all live nodes",msg->gid);
974+
975+
replorigin_session_origin=DoNotReplicateId;
959976
MtmFinishPreparedTransaction(ts, true);
977+
replorigin_session_origin=InvalidRepOriginId;
960978
}else {
961979
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
962980
msg->gid,MtmTxnStatusMnem[msg->status], (long long)ts->participantsMask, (long long)ts->votedMask);
@@ -1082,7 +1100,7 @@ static void MtmReceiver(Datum arg)
10821100
}else {
10831101
switch (msg->code) {
10841102
caseMSG_PRECOMMIT:
1085-
Assert(false);// Nowsend through pglogical
1103+
Assert(false);// Nowsent through pglogical
10861104
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
10871105
ts->status=TRANSACTION_STATUS_UNKNOWN;
10881106
ts->csn=MtmAssignCSN();

‎multimaster.c

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,11 +1601,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
16011601
Assert(ts->gid[0]);
16021602
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
16031603
elog(LOG,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d",ts->gid,MtmNodeId);
1604-
ts->isPinned= true;
1605-
MtmUnlock();
16061604
MtmFinishPreparedTransaction(ts, false);
1607-
MtmLock(LW_EXCLUSIVE);
1608-
ts->isPinned= false;
16091605
}else {
16101606
MTM_LOG1("Poll state of transaction %d (%s)",ts->xid,ts->gid);
16111607
MtmBroadcastPollMessage(ts);
@@ -2017,7 +2013,7 @@ void MtmOnNodeConnect(int nodeId)
20172013
MtmLock(LW_EXCLUSIVE);
20182014
elog(LOG,"Connect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
20192015
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
2020-
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
2016+
BIT_SET(Mtm->reconnectMask,nodeId-1);/* force sender to reestablish connection and send heartbeat */
20212017
MtmUnlock();
20222018
}
20232019

@@ -2945,26 +2941,36 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
29452941
}
29462942
}
29472943

2948-
2944+
/*
2945+
* This function is call with MTM mutex locked.
2946+
* It shoudl unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
2947+
* ts object is pinned to prevent deallocation while lock is released.
2948+
*/
29492949
voidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit)
29502950
{
29512951
boolinsideTransaction=IsTransactionState();
2952+
29522953
Assert(ts->votingCompleted);
2954+
2955+
ts->isPinned= true;
2956+
MtmUnlock();
2957+
29532958
MtmResetTransaction();
29542959

29552960
if (!insideTransaction) {
29562961
StartTransactionCommand();
29572962
}
2958-
//MtmBeginSession(MtmNodeId);
29592963
MtmSetCurrentTransactionCSN(ts->csn);
29602964
MtmSetCurrentTransactionGID(ts->gid);
29612965
FinishPreparedTransaction(ts->gid,commit);
29622966

29632967
if (!insideTransaction) {
29642968
CommitTransactionCommand();
2965-
//MtmEndSession(MtmNodeId, true);
29662969
Assert(ts->status==commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED);
29672970
}
2971+
2972+
MtmLock(LW_EXCLUSIVE);
2973+
ts->isPinned= false;
29682974
}
29692975

29702976
/*
@@ -2980,6 +2986,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29802986

29812987
if (!Mtm->preparedTransactionsLoaded)
29822988
{
2989+
/* We must restore state of prepared (but no committed or aborted) transaction before start of recovery. */
29832990
MtmLoadPreparedTransactions();
29842991
Mtm->preparedTransactionsLoaded= true;
29852992
}
@@ -2991,6 +2998,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29912998
MtmUnlock();
29922999
returnREPLMODE_EXIT;
29933000
}
3001+
/* We are not interested in receiving any deteriorated logical messages from recovered node, do recreate slot */
29943002
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
29953003
mode=REPLMODE_CREATE_NEW;
29963004
}
@@ -3147,6 +3155,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
31473155
MtmEnableNode(MtmReplicationNodeId);
31483156
MtmCheckQuorum();
31493157
}else {
3158+
/* Force arbiter to reestablish connection with this nodem send heartbeat to inform this node that it was disabled and should perform recovery */
3159+
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);
31503160
MtmUnlock();
31513161
elog(ERROR,"Disabled node %d tries to reconnect without recovery",MtmReplicationNodeId);
31523162
}
@@ -3156,9 +3166,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
31563166
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
31573167
MtmSenderStarted=1;
31583168
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3169+
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
31593170
MtmSwitchClusterMode(MTM_ONLINE);
31603171
}
3161-
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try toreestblish connection with this node */
3172+
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try toreestablish connection with this node */
31623173
MtmUnlock();
31633174
on_shmem_exit(MtmOnProcExit,0);
31643175
}
@@ -3168,6 +3179,16 @@ XLogRecPtr MtmGetFlushPosition(int nodeId)
31683179
returnMtm->nodes[nodeId-1].flushPos;
31693180
}
31703181

3182+
/**
3183+
* Keep track of progress of WAL writer.
3184+
* We need to notify WAL senders at other nodes which logical records
3185+
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3186+
* WAL is flushed by WAL writer. Current flish position can be obtained by GetFlushRecPtr().
3187+
* So on applying new logical record we insert it in the MtmLsnMapping and compare
3188+
* their poistions in local WAL log with current flush position.
3189+
* The records which are flushed to the disk by WAL writer are removed from the list
3190+
* and mapping ing mtm->nodes[].flushPos is updated for this node.
3191+
*/
31713192
voidMtmUpdateLsnMapping(intnode_id,XLogRecPtrend_lsn)
31723193
{
31733194
dlist_mutable_iteriter;
@@ -3216,9 +3237,21 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
32163237
}
32173238
}
32183239

3240+
/*
3241+
* Filter transactions which should be replicated to other nodes.
3242+
* This filter is applied at sender side (WAL sender).
3243+
* Final filtering is also done at destination side by MtmFilterTransaction function.
3244+
*/
32193245
staticbool
32203246
MtmReplicationTxnFilterHook(structPGLogicalTxnFilterArgs*args)
32213247
{
3248+
/* Do not replicate any transactions in recovery mode (because we should apply
3249+
* changes sent to us rather than send our own pending changes)
3250+
* and transactions received from other nodes
3251+
* (originId should be non-zero in this case)
3252+
* unless we are performing recovery of disabled node
3253+
* (in this case all transactions should be sent)
3254+
*/
32223255
boolres=Mtm->status!=MTM_RECOVERY
32233256
&& (args->origin_id==InvalidRepOriginId
32243257
||MtmIsRecoveredNode(MtmReplicationNodeId));
@@ -3228,6 +3261,9 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
32283261
returnres;
32293262
}
32303263

3264+
/**
3265+
* Filter record corresponding to local (non-distributed) tables
3266+
*/
32313267
staticbool
32323268
MtmReplicationRowFilterHook(structPGLogicalRowFilterArgs*args)
32333269
{
@@ -3248,7 +3284,11 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
32483284

32493285
/*
32503286
* Filter received transactions at destination side.
3251-
* This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restartLSN without lock
3287+
* This function is executed by receiver,
3288+
* so there are no race conditions and it is possible to update nodes[i].restartLSN without lock.
3289+
* It is more efficient to filter records at senders size (done by MtmReplicationTxnFilterHook) to avoid sending useless data through network. But asynchronous nature of
3290+
* logical replications makes it not possible to guarantee (at least I failed to do it)
3291+
* that replica do not receive deteriorated data.
32523292
*/
32533293
boolMtmFilterTransaction(char*record,intsize)
32543294
{
@@ -3308,8 +3348,8 @@ bool MtmFilterTransaction(char* record, int size)
33083348
}
33093349

33103350
if (duplicate) {
3311-
MTM_LOG1("Ignore transaction %s from node %d flags=%x,ourrestartLSN for node:%lx,restart_lsn = (origin node %d== MtmReplicationNodeId %d) ?end_lsn=%lx, origin_lsn=%lx",
3312-
gid,replication_node,flags,Mtm->nodes[origin_node-1].restartLSN,origin_node,MtmReplicationNodeId,end_lsn,origin_lsn);
3351+
MTM_LOG1("Ignore transaction %s from node %d flags=%x becauseourLSN position%lx fororigin node %dis greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx)",
3352+
gid,replication_node,flags,Mtm->nodes[origin_node-1].restartLSN,origin_node,restart_lsn,end_lsn,origin_lsn);
33133353
}else {
33143354
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
33153355
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
@@ -3326,7 +3366,11 @@ void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
33263366
hooks->row_filter_hook=MtmReplicationRowFilterHook;
33273367
}
33283368

3329-
3369+
/*
3370+
* Setup replication session origin to include origin location in WAL and
3371+
* update slot position.
3372+
* Sessions are not reetrant so we have to use exclusive lock here.
3373+
*/
33303374
voidMtmBeginSession(intnodeId)
33313375
{
33323376
MtmLockNode(nodeId,LW_EXCLUSIVE);
@@ -3338,6 +3382,9 @@ void MtmBeginSession(int nodeId)
33383382
MTM_LOG3("%d: End setup replorigin session: %d",MyProcPid,replorigin_session_origin);
33393383
}
33403384

3385+
/*
3386+
* Release replication session
3387+
*/
33413388
voidMtmEndSession(intnodeId,boolunlock)
33423389
{
33433390
if (replorigin_session_origin!=InvalidRepOriginId) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp