Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1d9e382

Browse files
knizhnikkelvich
authored andcommitted
Fix handling of connectivity mask
1 parent1d2cfd1 commit1d9e382

File tree

6 files changed

+82
-46
lines changed

6 files changed

+82
-46
lines changed

‎arbiter.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,11 @@ static void MtmSendHeartbeat()
369369
if (!MtmSendToNode(i,&msg,sizeof(msg))) {
370370
elog(LOG,"Arbiter failed to send heartbeat to node %d",i+1);
371371
}else {
372+
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
373+
if (BIT_CHECK(Mtm->connectivityMask,i)) {
374+
MtmDisconnect(i);
375+
//MtmOnNodeConnect(i+1);
376+
}
372377
MTM_LOG4("Send heartbeat to node %d with timestamp %ld",i+1,now);
373378
}
374379
}else {
@@ -560,7 +565,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
560565
result= false;
561566
break;
562567
}
563-
MTM_LOG3("Arbiter reestablish connection with node %d",node+1);
568+
MTM_LOG1("Arbiter reestablish connection with node %d",node+1);
564569
}else {
565570
result= true;
566571
break;
@@ -718,7 +723,7 @@ static void MtmSender(Datum arg)
718723
* Use shared lock to improve locality,
719724
* because all other process modifying this list are using exclusive lock
720725
*/
721-
MtmLock(LW_SHARED);
726+
SpinLockAcquire(&Mtm->queueSpinlock);
722727

723728
for (curr=Mtm->sendQueue;curr!=NULL;curr=next) {
724729
next=curr->next;
@@ -728,7 +733,7 @@ static void MtmSender(Datum arg)
728733
}
729734
Mtm->sendQueue=NULL;
730735

731-
MtmUnlock();
736+
SpinLockRelease(&Mtm->queueSpinlock);
732737

733738
for (i=0;i<Mtm->nAllNodes;i++) {
734739
if (txBuffer[i].used!=0) {
@@ -892,6 +897,11 @@ static void MtmReceiver(Datum arg)
892897
intnode=msg->node;
893898

894899
Assert(node>0&&node <=nNodes&&node!=MtmNodeId);
900+
901+
if (Mtm->nodes[node-1].connectivityMask!=msg->connectivityMask) {
902+
elog(LOG,"Node %d changes it connectivity mask from %llx to %llx",node, (long long)Mtm->nodes[node-1].connectivityMask, (long long)msg->connectivityMask);
903+
}
904+
895905
Mtm->nodes[node-1].oldestSnapshot=msg->oldestSnapshot;
896906
Mtm->nodes[node-1].disabledNodeMask=msg->disabledNodeMask;
897907
Mtm->nodes[node-1].connectivityMask=msg->connectivityMask;

‎multimaster.c

Lines changed: 52 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ typedef struct {
8989
pgid_tgid;/* global transaction identifier (used by 2pc) */
9090
}MtmCurrentTrans;
9191

92-
/* #define USE_SPINLOCK 1 */
93-
9492
typedefenum
9593
{
9694
MTM_STATE_LOCK_ID
@@ -245,6 +243,7 @@ static int MtmMaxRecoveryLag;
245243
staticintMtmGcPeriod;
246244
staticboolMtmIgnoreTablesWithoutPk;
247245
staticintMtmLockCount;
246+
staticintMtmSenderStarted;
248247

249248
staticExecutorStart_hook_typePreviousExecutorStartHook;
250249
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
@@ -273,16 +272,12 @@ void MtmLock(LWLockMode mode)
273272
return;
274273
}
275274
}
276-
#ifdefUSE_SPINLOCK
277-
SpinLockAcquire(&Mtm->spinlock);
278-
#else
279275
start=MtmGetSystemTime();
280276
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID],mode);
281277
stop=MtmGetSystemTime();
282278
if (stop>start+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
283279
MTM_LOG1("%d: obtaining %s lock takes %ld microseconds",MyProcPid, (mode==LW_EXCLUSIVE ?"exclusive" :"shared"),stop-start);
284280
}
285-
#endif
286281
Mtm->lastLockHolder=MyProcPid;
287282
}
288283

@@ -291,11 +286,7 @@ void MtmUnlock(void)
291286
if (MtmLockCount!=0&&--MtmLockCount!=0) {
292287
return;
293288
}
294-
#ifdefUSE_SPINLOCK
295-
SpinLockRelease(&Mtm->spinlock);
296-
#else
297289
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
298-
#endif
299290
Mtm->lastLockHolder=0;
300291
}
301292

@@ -1231,7 +1222,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12311222
if (commit) {
12321223
if (!(ts->status==TRANSACTION_STATUS_UNKNOWN
12331224
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY)))
1234-
{Assert(false);
1225+
{
12351226
elog(ERROR,"Attempt to commit %s transaction %d (%s)",
12361227
MtmTxnStatusMnem[ts->status],ts->xid,ts->gid);
12371228
}
@@ -1304,20 +1295,24 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
13041295

13051296
voidMtmSendMessage(MtmArbiterMessage*msg)
13061297
{
1307-
MtmMessageQueue*mq=Mtm->freeQueue;
1308-
MtmMessageQueue*sendQueue=Mtm->sendQueue;
1309-
if (mq==NULL) {
1310-
mq= (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1311-
}else {
1312-
Mtm->freeQueue=mq->next;
1313-
}
1314-
mq->msg=*msg;
1315-
mq->next=sendQueue;
1316-
Mtm->sendQueue=mq;
1317-
if (sendQueue==NULL) {
1318-
/* singal semaphore only once for the whole list */
1319-
PGSemaphoreUnlock(&Mtm->sendSemaphore);
1298+
SpinLockAcquire(&Mtm->queueSpinlock);
1299+
{
1300+
MtmMessageQueue*mq=Mtm->freeQueue;
1301+
MtmMessageQueue*sendQueue=Mtm->sendQueue;
1302+
if (mq==NULL) {
1303+
mq= (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1304+
}else {
1305+
Mtm->freeQueue=mq->next;
1306+
}
1307+
mq->msg=*msg;
1308+
mq->next=sendQueue;
1309+
Mtm->sendQueue=mq;
1310+
if (sendQueue==NULL) {
1311+
/* singal semaphore only once for the whole list */
1312+
PGSemaphoreUnlock(&Mtm->sendSemaphore);
1313+
}
13201314
}
1315+
SpinLockRelease(&Mtm->queueSpinlock);
13211316
}
13221317

13231318
voidMtmSend2PCMessage(MtmTransState*ts,MtmMessageCodecmd)
@@ -1667,8 +1662,8 @@ void MtmRecoveryCompleted(void)
16671662
Mtm->nodes[i].lastHeartbeat=0;/* defuse watchdog until first heartbeat is received */
16681663
}
16691664
/* Mode will be changed to online once all logical receiver are connected */
1670-
elog(LOG,"Recovery completed with %d active receivers from %d",Mtm->nReceivers,Mtm->nLiveNodes-1);
1671-
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1 ?MTM_ONLINE :MTM_CONNECTED);
1665+
elog(LOG,"Recovery completed with %d active receiversand %d started sendersfrom %d",Mtm->nReceivers,Mtm->nSenders,Mtm->nLiveNodes-1);
1666+
MtmSwitchClusterMode(Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1?MTM_ONLINE :MTM_CONNECTED);
16721667
MtmUnlock();
16731668
}
16741669

@@ -2010,7 +2005,7 @@ void MtmOnNodeDisconnect(int nodeId)
20102005
MtmLock(LW_EXCLUSIVE);
20112006
BIT_SET(Mtm->connectivityMask,nodeId-1);
20122007
BIT_SET(Mtm->reconnectMask,nodeId-1);
2013-
MTM_LOG1("Disconnect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
2008+
elog(LOG,"Disconnect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
20142009
MtmUnlock();
20152010

20162011
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
@@ -2020,6 +2015,7 @@ void MtmOnNodeDisconnect(int nodeId)
20202015
voidMtmOnNodeConnect(intnodeId)
20212016
{
20222017
MtmLock(LW_EXCLUSIVE);
2018+
elog(LOG,"Connect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
20232019
BIT_CLEAR(Mtm->connectivityMask,nodeId-1);
20242020
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
20252021
MtmUnlock();
@@ -2198,6 +2194,7 @@ static void MtmInitialize()
21982194
Mtm->transListHead=NULL;
21992195
Mtm->transListTail=&Mtm->transListHead;
22002196
Mtm->nReceivers=0;
2197+
Mtm->nSenders=0;
22012198
Mtm->timeShift=0;
22022199
Mtm->transCount=0;
22032200
Mtm->gcCount=0;
@@ -2229,7 +2226,7 @@ static void MtmInitialize()
22292226
Mtm->nodes[MtmNodeId-1].restartLSN= (XLogRecPtr)PG_UINT64_MAX;
22302227
PGSemaphoreCreate(&Mtm->sendSemaphore);
22312228
PGSemaphoreReset(&Mtm->sendSemaphore);
2232-
SpinLockInit(&Mtm->spinlock);
2229+
SpinLockInit(&Mtm->queueSpinlock);
22332230
BgwPoolInit(&Mtm->pool,MtmExecutor,MtmDatabaseName,MtmDatabaseUser,MtmQueueSize,MtmWorkers);
22342231
RegisterXactCallback(MtmXactCallback,NULL);
22352232
MtmTx.snapshot=INVALID_CSN;
@@ -2906,11 +2903,9 @@ void MtmReceiverStarted(int nodeId)
29062903
MtmEnableNode(nodeId);
29072904
MtmCheckQuorum();
29082905
}
2909-
elog(LOG,"Start %d receivers from %d cluster status %s",Mtm->nReceivers+1,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2910-
if (++Mtm->nReceivers==Mtm->nLiveNodes-1) {
2911-
if (Mtm->status==MTM_CONNECTED) {
2912-
MtmSwitchClusterMode(MTM_ONLINE);
2913-
}
2906+
elog(LOG,"Start %d receivers and %d senders from %d cluster status %s",Mtm->nReceivers+1,Mtm->nSenders,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
2907+
if (++Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
2908+
MtmSwitchClusterMode(MTM_ONLINE);
29142909
}
29152910
}
29162911
MtmUnlock();
@@ -2946,18 +2941,23 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
29462941

29472942
voidMtmFinishPreparedTransaction(MtmTransState*ts,boolcommit)
29482943
{
2944+
boolinsideTransaction=IsTransactionState();
29492945
Assert(ts->votingCompleted);
2950-
Assert(!IsTransactionState());
29512946
MtmResetTransaction();
2952-
StartTransactionCommand();
2953-
2954-
MtmBeginSession(MtmNodeId);
2947+
2948+
if (!insideTransaction) {
2949+
StartTransactionCommand();
2950+
}
2951+
//MtmBeginSession(MtmNodeId);
29552952
MtmSetCurrentTransactionCSN(ts->csn);
29562953
MtmSetCurrentTransactionGID(ts->gid);
29572954
FinishPreparedTransaction(ts->gid,commit);
2958-
CommitTransactionCommand();
2959-
MtmEndSession(MtmNodeId, true);
2960-
Assert(ts->status==commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED);
2955+
2956+
if (!insideTransaction) {
2957+
CommitTransactionCommand();
2958+
//MtmEndSession(MtmNodeId, true);
2959+
Assert(ts->status==commit ?TRANSACTION_STATUS_COMMITTED :TRANSACTION_STATUS_ABORTED);
2960+
}
29612961
}
29622962

29632963
/*
@@ -2997,6 +2997,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
29982998
Mtm->recoverySlot=nodeId;
29992999
Mtm->nReceivers=0;
3000+
Mtm->nSenders=0;
30003001
Mtm->recoveryCount+=1;
30013002
Mtm->pglogicalNodeMask=0;
30023003
MtmUnlock();
@@ -3015,6 +3016,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
30153016
}else {
30163017
MTM_LOG1("%d: Continue replication from node %d",MyProcPid,nodeId);
30173018
}
3019+
BIT_SET(Mtm->reconnectMask,nodeId-1);/* arbiter should try to reestblish connection with this node */
30183020
MtmUnlock();
30193021
returnmode;
30203022
}
@@ -3144,6 +3146,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
31443146
}else {
31453147
MTM_LOG1("Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
31463148
}
3149+
elog(LOG,"Start %d senders and %d receivers from %d cluster status %s",Mtm->nSenders+1,Mtm->nReceivers,Mtm->nLiveNodes-1,MtmNodeStatusMnem[Mtm->status]);
3150+
MtmSenderStarted=1;
3151+
if (++Mtm->nSenders==Mtm->nLiveNodes-1&&Mtm->nReceivers==Mtm->nLiveNodes-1&&Mtm->status==MTM_CONNECTED) {
3152+
MtmSwitchClusterMode(MTM_ONLINE);
3153+
}
3154+
BIT_SET(Mtm->reconnectMask,MtmReplicationNodeId-1);/* arbiter should try to reestblish connection with this node */
31473155
MtmUnlock();
31483156
on_shmem_exit(MtmOnProcExit,0);
31493157
}
@@ -3192,6 +3200,9 @@ static void
31923200
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
31933201
{
31943202
if (MtmReplicationNodeId >=0) {
3203+
MtmLock(LW_EXCLUSIVE);
3204+
Mtm->nSenders-=MtmSenderStarted;
3205+
MtmUnlock();
31953206
MTM_LOG1("Logical replication to node %d is stopped",MtmReplicationNodeId);
31963207
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
31973208
MtmReplicationNodeId=-1;/* defuse on_proc_exit hook */
@@ -3290,7 +3301,7 @@ bool MtmFilterTransaction(char* record, int size)
32903301
}
32913302

32923303
if (duplicate) {
3293-
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
3304+
MTM_LOG2("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
32943305
gid,replication_node,flags,Mtm->nodes[origin_node-1].restartLSN,origin_node,MtmReplicationNodeId,end_lsn,origin_lsn);
32953306
}else {
32963307
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",

‎multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ typedef struct
257257
{
258258
MtmNodeStatusstatus;/* Status of this node */
259259
intrecoverySlot;/* NodeId of recovery slot or 0 if none */
260-
volatileslock_tspinlock;/* spinlock used to protectaccess to hash table */
260+
volatileslock_tqueueSpinlock;/* spinlock used to protectsender queue */
261261
PGSemaphoreDatasendSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
262262
LWLockPadded*locks;/* multimaster lock tranche */
263263
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
@@ -273,7 +273,8 @@ typedef struct
273273
intinject2PCError;/* Simulate error during 2PC commit at this node */
274274
intnLiveNodes;/* Number of active nodes */
275275
intnAllNodes;/* Total numbber of nodes */
276-
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
276+
intnReceivers;/* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277+
intnSenders;/* Number of started WAL senders (used to determine moment when recovery) */
277278
intnLockers;/* Number of lockers */
278279
intnActiveTransactions;/* Nunmber of active 2PC transactions */
279280
intnConfigChanges;/* Number of cluster configuration changes */

‎pglogical_apply.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,9 @@ process_remote_commit(StringInfo in)
708708
default:
709709
Assert(false);
710710
}
711+
if (Mtm->status==MTM_RECOVERY) {
712+
MTM_LOG1("Recover transaction %s flags=%d",gid,flags);
713+
}
711714
MtmUpdateLsnMapping(MtmReplicationNodeId,end_lsn);
712715
if (flags&PGLOGICAL_CAUGHT_UP) {
713716
MtmRecoveryCompleted();

‎pglogical_proto.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
210210
Assert(MtmTransactionRecords==0);
211211
return;
212212
}
213+
if (isRecovery) {
214+
MTM_LOG1("PGLOGICAL_SEND recover transaction: event=%d, gid=%s, xid=%d, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",
215+
flags,txn->gid,txn->xid,commit_lsn,txn->end_lsn,GetXLogInsertRecPtr());
216+
}
213217
if (flags==PGLOGICAL_ABORT_PREPARED) {
214218
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
215219
txn->xid,txn->gid,txn->end_lsn,MtmReplicationNodeId,isRecovery,txn->origin_id,csn);

‎tests2/lib/bank_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
166166
# In case of continuous excetions we can loop here without returning
167167
# back to event loop and block it
168168
yieldfromasyncio.sleep(0.01)
169+
exceptBaseExceptionase:
170+
agg.finish_tx(str(e).strip())
171+
print('Catch exception ',e)
172+
# Give evloop some free time.
173+
# In case of continuous excetions we can loop here without returning
174+
# back to event loop and block it
175+
yieldfromasyncio.sleep(0.01)
169176

170177
print("We've count to infinity!")
171178

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp