Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd3a8cdc

Browse files
knizhnikkelvich
authored andcommitted
Add more comments
1 parentf5abea5 commitd3a8cdc

File tree

3 files changed

+76
-15
lines changed

3 files changed

+76
-15
lines changed

‎arbiter.c

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ static void MtmRegisterSocket(int fd, int node)
176176
ev.events=EPOLLIN;
177177
ev.data.u32=node;
178178
if (epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev)<0) {
179-
elog(ERROR,"Arbuter failed to add socket to epoll set: %d",errno);
179+
elog(ERROR,"Arbiter failed to add socket to epoll set: %d",errno);
180180
}
181181
#else
182182
FD_SET(fd,&inset);
@@ -285,21 +285,23 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
285285
gotoRetry;
286286
}
287287
if (MtmReadSocket(sd,&msg,sizeofmsg)!=sizeof(msg)) {
288-
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: %d",host,port,errno);
288+
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d:errno=%d",host,port,errno);
289289
close(sd);
290290
gotoRetry;
291291
}
292292
if (msg.code!=MSG_STATUS||msg.dxid!=HANDSHAKE_MAGIC) {
293-
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d: %d",msg.code,host,port,errno);
293+
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d",msg.code,host,port);
294294
close(sd);
295295
gotoRetry;
296296
}
297297

298+
/* Some node cnosidered that I am dead, so switch to recovery mode */
298299
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
299300
elog(WARNING,"Node is switched to recovery mode");
300301
ds->status=MTM_RECOVERY;
301302
}
302-
ds->disabledNodeMask=msg.disabledNodeMask;
303+
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
304+
ds->disabledNodeMask |=msg.disabledNodeMask;
303305
returnsd;
304306
}
305307
}
@@ -340,8 +342,10 @@ static void MtmOpenConnections()
340342
}
341343
}
342344
if (ds->nNodes<MtmNodes/2+1) {/* no quorum */
345+
elog(WARNING,"Node is out of quorum: only %d nodes from %d are accssible",ds->nNodes,MtmNodes);
343346
ds->status=MTM_OFFLINE;
344347
}elseif (ds->status==MTM_INITIALIZATION) {
348+
elog(WARNING,"Switch to CONNECTED mode");
345349
ds->status=MTM_CONNECTED;
346350
}
347351
}
@@ -392,7 +396,7 @@ static void MtmAcceptOneConnection()
392396
msg.sxid=ShmemVariableCache->nextXid;
393397
msg.csn=MtmGetCurrentTime();
394398
if (!MtmWriteSocket(fd,&msg,sizeofmsg)) {
395-
elog(WARNING,"Arbiter failed to write response for handshake messagefrom node %d",msg.node);
399+
elog(WARNING,"Arbiter failed to write response for handshake messageto node %d",msg.node);
396400
close(fd);
397401
}else {
398402
elog(NOTICE,"Arbiter established connection with node %d",msg.node);
@@ -690,8 +694,8 @@ static void MtmTransReceiver(Datum arg)
690694
caseMSG_PREPARE:
691695
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
692696
if ((msg->disabledNodeMask& ~ds->disabledNodeMask)!=0) {
693-
/* Coordinator's disabled mask is wider than my:so reject such transaction to avoid
694-
commiton smaller subset of nodes */
697+
/* Coordinator's disabled mask is wider than my:so reject such transaction to avoid
698+
commit on smaller subset of nodes */
695699
ts->status=TRANSACTION_STATUS_ABORTED;
696700
ts->cmd=MSG_ABORT;
697701
MtmAdjustSubtransactions(ts);

‎multimaster.c

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
154154
ProcessUtilityContextcontext,ParamListInfoparams,
155155
DestReceiver*dest,char*completionTag);
156156

157+
/*
158+
* Using LWLock seems to be more efficient (at our benchmarks)
159+
*/
157160
voidMtmLock(LWLockModemode)
158161
{
159162
#ifdefUSE_SPINLOCK
@@ -197,6 +200,9 @@ void MtmSleep(timestamp_t interval)
197200
}
198201
}
199202

203+
/**
204+
* Return ascending unique timestamp which is used as CSN
205+
*/
200206
csn_tMtmAssignCSN()
201207
{
202208
csn_tcsn=MtmGetCurrentTime();
@@ -208,6 +214,9 @@ csn_t MtmAssignCSN()
208214
returncsn;
209215
}
210216

217+
/**
218+
* "Adjust" system clock if we receive message from future
219+
*/
211220
csn_tMtmSyncClock(csn_tglobal_csn)
212221
{
213222
csn_tlocal_csn;
@@ -471,14 +480,23 @@ MtmXactCallback(XactEvent event, void *arg)
471480
}
472481
}
473482

483+
/*
484+
* Check if this is "normal" user trnsaction which shoudl be distributed to other nodes
485+
*/
486+
staticbool
487+
MtmIsUserTransaction()
488+
{
489+
returnIsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
490+
}
491+
474492
staticvoid
475493
MtmBeginTransaction(MtmCurrentTrans*x)
476494
{
477495
if (x->snapshot==INVALID_CSN) {
478496
MtmLock(LW_EXCLUSIVE);
479497
x->xid=GetCurrentTransactionIdIfAny();
480498
x->isReplicated= false;
481-
x->isDistributed=IsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
499+
x->isDistributed=MtmIsUserTransaction();
482500
x->containsDML= false;
483501
x->snapshot=MtmAssignCSN();
484502
x->gtid.xid=InvalidTransactionId;
@@ -489,7 +507,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
489507
}
490508

491509

492-
/* This function is called at transaction start with multimaster ock set */
510+
/*
511+
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
512+
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
513+
* This function is called at transaction start with multimaster lock set
514+
*/
493515
staticvoid
494516
MtmCheckClusterLock()
495517
{
@@ -507,6 +529,7 @@ MtmCheckClusterLock()
507529
break;
508530
}else {
509531
/* recovered replica catched up with master */
532+
elog(WARNING,"WAL-sender %d complete receovery",i);
510533
dtm->walSenderLockerMask &= ~((nodemask_t)1 <<i);
511534
}
512535
}
@@ -524,6 +547,7 @@ MtmCheckClusterLock()
524547
}else {
525548
/* All lockers are synchronized their logs */
526549
/* Remove lock and mark them as receovered */
550+
elog(WARNING,"Complete recovery of %d nodes (node mask %lx)",dtm->nLockers,dtm->nodeLockerMask);
527551
Assert(dtm->walSenderLockerMask==0);
528552
Assert((dtm->nodeLockerMask&dtm->disabledNodeMask)==dtm->nodeLockerMask);
529553
dtm->disabledNodeMask &= ~dtm->nodeLockerMask;
@@ -552,6 +576,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
552576
x->xid=GetCurrentTransactionId();
553577

554578
MtmLock(LW_EXCLUSIVE);
579+
580+
/*
581+
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to cache-up
582+
*/
555583
MtmCheckClusterLock();
556584

557585
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
@@ -580,6 +608,10 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
580608
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",getpid(),x->xid,ts->csn);
581609
}
582610

611+
/**
612+
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613+
* WAL overflow
614+
*/
583615
staticvoidMtmCheckSlots()
584616
{
585617
if (MtmMaxRecoveryLag!=0&&dtm->disabledNodeMask!=0)
@@ -636,17 +668,23 @@ void MtmSendNotificationMessage(MtmTransState* ts)
636668
}
637669

638670
/*
639-
* This function is called by WAL sender when start sending new transaction
671+
* This function is called by WAL sender when start sending new transaction.
672+
* It returns true if specified node is in recovery mode. In this case we should send all transactions from WAL,
673+
* not only coordinated by self node as in normal mode.
640674
*/
641675
boolMtmIsRecoveredNode(intnodeId)
642676
{
643677
if (BIT_CHECK(dtm->disabledNodeMask,nodeId-1)) {
644-
Assert(MyWalSnd!=NULL);
678+
Assert(MyWalSnd!=NULL);/* This function is called by WAL-sender, so it should not be NULL */
645679
if (!BIT_CHECK(dtm->nodeLockerMask,nodeId-1)
646680
&&MyWalSnd->sentPtr+MtmMinRecoveryLag>GetXLogInsertRecPtr())
647681
{
648-
/* Wal sender almost catched up */
649-
/* Lock cluster preventing new transaction to start until wal is completely replayed */
682+
/*
683+
* Wal sender almost catched up.
684+
* Lock cluster preventing new transaction to start until wal is completely replayed.
685+
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
686+
* Is there some better way to establish mapping between nodes ad WAL-seconder?
687+
*/
650688
MtmLock(LW_EXCLUSIVE);
651689
dtm->nodeLockerMask |= (nodemask_t)1 << (nodeId-1);
652690
dtm->walSenderLockerMask |= (nodemask_t)1 << (MyWalSnd-WalSndCtl->walsnds);
@@ -793,8 +831,8 @@ _PG_init(void)
793831
DefineCustomIntVariable(
794832
"multimaster.max_recovery_lag",
795833
"Maximal lag of replication slot of failed node after which this slot is dropped to avoid transaction log overflow",
796-
"Dropping slog makes it not possible to recover node using logical replication mechanism, it willeb ncessary to completely copy content of some other nodes "
797-
"usimg basebackup or similar tool",
834+
"Dropping slog makes it not possible to recover node using logical replication mechanism, it willbe ncessary to completely copy content of some other nodes "
835+
"usimg basebackup or similar tool. Zero value of parameter disable droipping slot.",
798836
&MtmMaxRecoveryLag,
799837
100000000,
800838
0,
@@ -990,6 +1028,7 @@ _PG_fini(void)
9901028
staticvoidMtmSwitchFromRecoveryToNormalMode()
9911029
{
9921030
dtm->status=MTM_ONLINE;
1031+
elog(WARNING,"Switch to normal mode");
9931032
/* ??? Something else to do here? */
9941033
}
9951034

@@ -1008,8 +1047,10 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
10081047
}
10091048

10101049
if (!TransactionIdIsValid(gtid->xid)) {
1050+
/* In case of recovery InvalidTransactionId is passed */
10111051
Assert(dtm->status==MTM_RECOVERY);
10121052
}elseif (dtm->status==MTM_RECOVERY) {
1053+
/* When recovery is completed we get normal transaction ID and switch to normal mode */
10131054
MtmSwitchFromRecoveryToNormalMode();
10141055
}
10151056
dtmTx.gtid=*gtid;
@@ -1026,6 +1067,7 @@ void MtmReceiverStarted(int nodeId)
10261067
if (!BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
10271068
dtm->pglogicalNodeMask |= (int64)1 << (nodeId-1);
10281069
if (++dtm->nReceivers==dtm->nNodes-1) {
1070+
elog(WARNING,"All receivers are started, switch to normal mode");
10291071
Assert(dtm->status==MTM_CONNECTED);
10301072
dtm->status=MTM_ONLINE;
10311073
}
@@ -1048,17 +1090,25 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
10481090
returnsnapshot;
10491091
}
10501092

1093+
/*
1094+
* Determine when and how we should open replication slot.
1095+
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
1096+
* Slots at other nodes should be removed
1097+
*/
10511098
MtmSlotModeMtmReceiverSlotMode(intnodeId)
10521099
{
10531100
while (dtm->status!=MTM_CONNECTED&&dtm->status!=MTM_ONLINE) {
10541101
if (dtm->status==MTM_RECOVERY) {
10551102
if (dtm->recoverySlot==0||dtm->recoverySlot==nodeId) {
1103+
/* Choose for recovery first available slot */
10561104
dtm->recoverySlot=nodeId;
10571105
returnSLOT_OPEN_EXISTED;
10581106
}
10591107
}
1108+
/* delay opening of other slots until recovery is completed */
10601109
MtmSleep(STATUS_POLL_DELAY);
10611110
}
1111+
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
10621112
returndtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
10631113
}
10641114

‎pglogical_receiver.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ pglogical_receiver_main(Datum main_arg)
223223
/* Connect to a database */
224224
BackgroundWorkerInitializeConnection(MtmDatabaseName,NULL);
225225

226+
/*
227+
* Determine when and how we should open replication slot.
228+
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
229+
* Slots at other nodes should be removed
230+
*/
226231
mode=MtmReceiverSlotMode(args->receiver_node);
227232

228233
/* Establish connection to remote server */
@@ -375,6 +380,8 @@ pglogical_receiver_main(Datum main_arg)
375380
* If the server requested an immediate reply, send one.
376381
* If sync mode is sent reply in all cases to ensure that
377382
* server knows how far replay has been done.
383+
* In recovery mode also always send reply to provide master with more precise information
384+
* about recovery progress
378385
*/
379386
if (replyRequested||receiver_sync_mode||ds->status==MTM_RECOVERY)
380387
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp