Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfc8a364

Browse files
knizhnikkelvich
authored andcommitted
Fix wait socket function
1 parent7d8a4d6 commitfc8a364

File tree

5 files changed

+37
-20
lines changed

5 files changed

+37
-20
lines changed

‎arbiter.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,20 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
220220
fd_setset;
221221
intrc;
222222
timestamp_tdeadline=MtmGetSystemTime()+MSEC_TO_USEC(timeoutMsec);
223-
FD_ZERO(&set);
224-
FD_SET(sd,&set);
223+
225224
do {
226225
timestamp_tnow;
227226
MtmCheckHeartbeat();
228-
now=MtmGetSystemTime();
229-
if (now>deadline) {
227+
now=MtmGetSystemTime();
228+
if (now>deadline) {
230229
now=deadline;
231230
}
231+
FD_ZERO(&set);
232+
FD_SET(sd,&set);
232233
tv.tv_sec= (deadline-now)/USECS_PER_SEC;
233234
tv.tv_usec= (deadline-now)%USECS_PER_SEC;
234235
}while ((rc=select(sd+1,forWrite ?NULL :&set,forWrite ?&set :NULL,NULL,&tv))<0&&errno==EINTR);
236+
235237
returnrc;
236238
}
237239

@@ -384,7 +386,7 @@ static void MtmSendHeartbeat()
384386
|| !BIT_CHECK(Mtm->disabledNodeMask,i)
385387
||BIT_CHECK(Mtm->reconnectMask,i)))
386388
{
387-
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatSendTimeout)) {
389+
if (!MtmSendToNode(i,&msg,sizeof(msg),MtmHeartbeatRecvTimeout)) {
388390
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
389391
}else {
390392
if (last_heartbeat_to_node[i]+MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2<now) {
@@ -402,7 +404,7 @@ static void MtmSendHeartbeat()
402404
MTM_LOG4("Send heartbeat to node %d with timestamp %lld",i+1,now);
403405
}
404406
}else {
405-
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s",i+1,(long long)busy_mask,MtmNodeStatusMnem[Mtm->status]);
407+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s",i+1,busy_mask,MtmNodeStatusMnem[Mtm->status]);
406408
}
407409
}
408410
}
@@ -828,7 +830,7 @@ static void MtmMonitor(Datum arg)
828830
BackgroundWorkerInitializeConnection(MtmDatabaseName,NULL);
829831

830832
while (!stop) {
831-
intrc=WaitLatch(&MyProc->procLatch,WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatSendTimeout);
833+
intrc=WaitLatch(&MyProc->procLatch,WL_TIMEOUT |WL_POSTMASTER_DEATH,MtmHeartbeatRecvTimeout);
832834
if (rc&WL_POSTMASTER_DEATH) {
833835
break;
834836
}
@@ -938,7 +940,7 @@ static void MtmReceiver(Datum arg)
938940
Assert(node>0&&node <=nNodes&&node!=MtmNodeId);
939941

940942
if (Mtm->nodes[node-1].connectivityMask!=msg->connectivityMask) {
941-
elog(LOG,"Node %d changes it connectivity mask from %llx to %llx",node,(long long)Mtm->nodes[node-1].connectivityMask,(long long)msg->connectivityMask);
943+
elog(LOG,"Node %d changes it connectivity mask from %llx to %llx",node,Mtm->nodes[node-1].connectivityMask,msg->connectivityMask);
942944
}
943945

944946
Mtm->nodes[node-1].oldestSnapshot=msg->oldestSnapshot;
@@ -1002,11 +1004,11 @@ static void MtmReceiver(Datum arg)
10021004
replorigin_session_origin=InvalidRepOriginId;
10031005
}else {
10041006
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
1005-
msg->gid,MtmTxnStatusMnem[msg->status],(long long)ts->participantsMask,(long long)ts->votedMask);
1007+
msg->gid,MtmTxnStatusMnem[msg->status],ts->participantsMask,ts->votedMask);
10061008
}
10071009
}else {
10081010
elog(LOG,"Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
1009-
MtmTxnStatusMnem[msg->status],msg->gid,node,(long long)ts->votedMask,(long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
1011+
MtmTxnStatusMnem[msg->status],msg->gid,node,ts->votedMask,ts->participantsMask& ~Mtm->disabledNodeMask);
10101012
continue;
10111013
}
10121014
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {
@@ -1015,7 +1017,7 @@ static void MtmReceiver(Datum arg)
10151017
elog(WARNING,"Transaction %s is committed at node %d but aborted at node %d",msg->gid,MtmNodeId,node);
10161018
}else {
10171019
elog(LOG,"Receive response %s for transaction %s status %s for node %d, votedMask %llx, participantsMask %llx",
1018-
MtmTxnStatusMnem[msg->status],msg->gid,MtmTxnStatusMnem[ts->status],node,(long long)ts->votedMask,(long long)(ts->participantsMask& ~Mtm->disabledNodeMask));
1020+
MtmTxnStatusMnem[msg->status],msg->gid,MtmTxnStatusMnem[ts->status],node,ts->votedMask,ts->participantsMask& ~Mtm->disabledNodeMask);
10191021
}
10201022
}
10211023
continue;

‎multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
4343
LANGUAGE C;
4444

4545
CREATETYPEmtm.cluster_stateAS ("status"text,"disabledNodeMask"bigint,"disconnectedNodeMask"bigint,"catchUpNodeMask"bigint,"liveNodes"integer,"allNodes"integer,"nActiveQueries"integer,"nPendingQueries"integer,"queueSize"bigint,"transCount"bigint,"timeShift"bigint,"recoverySlot"integer,
46-
"xidHashSize"bigint,"gidHashSize"bigint,"oldestXid"bigint,"configChanges"integer,"stalledNodeMask"bigint,"stoppedNodeMask"bigint);
46+
"xidHashSize"bigint,"gidHashSize"bigint,"oldestXid"bigint,"configChanges"integer,"stalledNodeMask"bigint,"stoppedNodeMask"bigint,"lastStatusChange"timestamp);
4747

4848
CREATETYPEmtm.trans_stateAS ("status"text,"gid"text,"xid"bigint,"coordinator"integer,"gxid"bigint,"csn"timestamp,"snapshot"timestamp,"local"boolean,"prepared"boolean,"active"boolean,"twophase"boolean,"votingCompleted"boolean,"participants"bigint,"voted"bigint,"configChanges"integer);
4949

‎multimaster.c

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,7 +2128,7 @@ void MtmRefreshClusterStatus()
21282128
* connectivity graph is stabilized.
21292129
*/
21302130
oldClique=newClique;
2131-
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2);/* double timeout tocondiderworst case when heartbeatsend interval is added with refresh cluster status interval */
2131+
MtmSleep(MSEC_TO_USEC(MtmHeartbeatRecvTimeout)*2);/* double timeout toconsider theworst case when heartbeatreceive interval is added with refresh cluster status interval */
21322132
MtmBuildConnectivityMatrix(matrix);
21332133
newClique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&cliqueSize);
21342134
}while (newClique!=oldClique);
@@ -2232,7 +2232,7 @@ void MtmOnNodeDisconnect(int nodeId)
22322232
BIT_SET(SELF_CONNECTIVITY_MASK,nodeId-1);
22332233
BIT_SET(Mtm->reconnectMask,nodeId-1);
22342234
elog(LOG,"Disconnect node %d connectivity mask %llx",
2235-
nodeId,(long long)SELF_CONNECTIVITY_MASK);
2235+
nodeId,SELF_CONNECTIVITY_MASK);
22362236
MtmUnlock();
22372237
}
22382238

@@ -2242,7 +2242,7 @@ void MtmOnNodeDisconnect(int nodeId)
22422242
voidMtmOnNodeConnect(intnodeId)
22432243
{
22442244
MtmLock(LW_EXCLUSIVE);
2245-
elog(LOG,"Connect node %d connectivity mask %llx",nodeId,(long long)SELF_CONNECTIVITY_MASK);
2245+
elog(LOG,"Connect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
22462246
BIT_CLEAR(SELF_CONNECTIVITY_MASK,nodeId-1);
22472247
BIT_SET(Mtm->reconnectMask,nodeId-1);/* force sender to reestablish connection and send heartbeat */
22482248
MtmUnlock();
@@ -2254,6 +2254,7 @@ void MtmOnNodeConnect(int nodeId)
22542254
voidMtmReconnectNode(intnodeId)
22552255
{
22562256
MtmLock(LW_EXCLUSIVE);
2257+
elog(LOG,"Reconnect node %d connectivity mask %llx",nodeId,SELF_CONNECTIVITY_MASK);
22572258
BIT_SET(Mtm->reconnectMask,nodeId-1);
22582259
MtmUnlock();
22592260
}
@@ -3289,7 +3290,9 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
32893290
MtmSetCurrentTransactionGID(ts->gid);
32903291
MtmTx.isActive= true;
32913292
FinishPreparedTransaction(ts->gid,commit);
3292-
3293+
if (commit) {
3294+
MTM_LOG1("Distributed transaction %s is committed",ts->gid);
3295+
}
32933296
if (!insideTransaction) {
32943297
CommitTransactionCommand();
32953298
Assert(!MtmTx.isActive);
@@ -3326,15 +3329,21 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33263329
MtmUnlock();
33273330
returnREPLMODE_EXIT;
33283331
}
3329-
/* We are not interested in receiving any deteriorated logical messages from recovered node,do recreate slot */
3332+
/* We are not interested in receiving any deteriorated logical messages from recovered node,so recreate slot */
33303333
if (BIT_CHECK(Mtm->disabledNodeMask,nodeId-1)) {
33313334
mode=REPLMODE_CREATE_NEW;
33323335
}
33333336
MTM_LOG2("%d: receiver slot mode %s",MyProcPid,MtmNodeStatusMnem[Mtm->status]);
33343337
if (Mtm->status==MTM_RECOVERY) {
33353338
mode=REPLMODE_RECOVERED;
3336-
if ((Mtm->recoverySlot==0&& (Mtm->donorNodeId==MtmNodeId||Mtm->donorNodeId==nodeId))
3337-
||Mtm->recoverySlot==nodeId)
3339+
/* Choose node for recovery if
3340+
* 1. It is not chosen yet or the same node was chosen before
3341+
* 2. It is donor node or there is no donor node
3342+
* 3. Connections with all other live nodes were established
3343+
*/
3344+
if ((Mtm->recoverySlot==0||Mtm->recoverySlot==nodeId)
3345+
&& (Mtm->donorNodeId==MtmNodeId||Mtm->donorNodeId==nodeId)
3346+
&& (SELF_CONNECTIVITY_MASK& ~Mtm->disabledNodeMask)==0)
33383347
{
33393348
/* Choose for recovery first available slot or slot of donor node (if any) */
33403349
if (Mtm->nAllNodes >=3) {
@@ -3354,6 +3363,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33543363
returnREPLMODE_RECOVERY;
33553364
}
33563365
}
3366+
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3367+
nodeId,Mtm->recoverySlot,Mtm->donorNodeId,SELF_CONNECTIVITY_MASK,Mtm->disabledNodeMask);
33573368
MtmUnlock();
33583369
/* delay opening of other slots until recovery is completed */
33593370
MtmSleep(STATUS_POLL_DELAY);
@@ -3492,6 +3503,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
34923503
}
34933504
}
34943505
}
3506+
MTM_LOG1("Startup of logical replication to node %d",MtmReplicationNodeId);
34953507
MtmLock(LW_EXCLUSIVE);
34963508
if (BIT_CHECK(Mtm->stoppedNodeMask,MtmReplicationNodeId-1)) {
34973509
elog(WARNING,"Stopped node %d tries to initiate recovery",MtmReplicationNodeId);
@@ -4074,6 +4086,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
40744086
values[15]=Int32GetDatum(Mtm->nConfigChanges);
40754087
values[16]=Int64GetDatum(Mtm->stalledNodeMask);
40764088
values[17]=Int64GetDatum(Mtm->stoppedNodeMask);
4089+
values[18]=TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[MtmNodeId-1].lastStatusChangeTime/USECS_PER_SEC));
40774090

40784091
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
40794092
}
@@ -4456,6 +4469,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
44564469
elog(ERROR,"Transaction %s (%llu) is aborted by DTM",x->gid, (long64)x->xid);
44574470
}else {
44584471
FinishPreparedTransaction(x->gid, true);
4472+
MTM_LOG1("Distributed transaction %s is committed",x->gid);
44594473
}
44604474
}
44614475
}

‎multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282

8383
#defineNatts_mtm_trans_state 15
8484
#defineNatts_mtm_nodes_state 16
85-
#defineNatts_mtm_cluster_state18
85+
#defineNatts_mtm_cluster_state19
8686

8787
typedefulong64csn_t;/* commit serial number */
8888
#defineINVALID_CSN ((csn_t)-1)

‎pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ process_remote_commit(StringInfo in)
703703
MtmSetCurrentTransactionCSN(csn);
704704
MtmSetCurrentTransactionGID(gid);
705705
FinishPreparedTransaction(gid, true);
706+
MTM_LOG1("Distributed transaction %s is committed",gid);
706707
CommitTransactionCommand();
707708
Assert(!MtmTransIsActive());
708709
MtmEndSession(origin_node, true);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp