Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit31594e5

Browse files
knizhnikkelvich
authored andcommitted
Update XTM documentation
1 parentb5da515 commit31594e5

File tree

5 files changed

+141
-52
lines changed

5 files changed

+141
-52
lines changed

‎arbiter.c

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct
8383
TransactionIddxid;/* Transaction ID at destination node */
8484
TransactionIdsxid;/* Transaction IO at sender node */
8585
csn_tcsn;/* local CSN in case of sending data from replica to master, global CSN master->replica */
86+
int64disabledNodeMask;/* bitmask of disabled nodes at the sender of message */
8687
}MtmArbiterMessage;
8788

8889
typedefstruct
@@ -109,7 +110,8 @@ static char const* const messageText[] =
109110
"ABORT",
110111
"PREPARED",
111112
"COMMITTED",
112-
"ABORTED"
113+
"ABORTED",
114+
"STATUS"
113115
};
114116

115117

@@ -276,11 +278,28 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
276278
msg.dxid=HANDSHAKE_MAGIC;
277279
msg.sxid=ShmemVariableCache->nextXid;
278280
msg.csn=MtmGetCurrentTime();
281+
msg.disabledNodeMask=ds->disabledNodeMask;
279282
if (!MtmWriteSocket(sd,&msg,sizeofmsg)) {
280283
elog(WARNING,"Arbiter failed to send handshake message to %s:%d: %d",host,port,errno);
281284
close(sd);
282285
gotoRetry;
283286
}
287+
if (MtmReadSocket(sd,&msg,sizeofmsg)!=sizeof(msg)) {
288+
elog(WARNING,"Arbiter failed to receive response for handshake message from %s:%d: %d",host,port,errno);
289+
close(sd);
290+
gotoRetry;
291+
}
292+
if (msg.code!=MSG_STATUS||msg.dxid!=HANDSHAKE_MAGIC) {
293+
elog(WARNING,"Arbiter get unexpected response %d for handshake message from %s:%d: %d",msg.code,host,port,errno);
294+
close(sd);
295+
gotoRetry;
296+
}
297+
298+
if (BIT_CHECK(msg.disabledNodeMask,MtmNodeId-1)) {
299+
elog(WARNING,"Node is switched to recovery mode");
300+
ds->status=MTM_RECOVERY;
301+
}
302+
ds->disabledNodeMask=msg.disabledNodeMask;
284303
returnsd;
285304
}
286305
}
@@ -315,11 +334,16 @@ static void MtmOpenConnections()
315334
sockets[i]=MtmConnectSocket(host,MtmArbiterPort+i+1,MtmConnectAttempts);
316335
if (sockets[i]<0) {
317336
MtmDropNode(i+1, false);
318-
}
337+
}
319338
}else {
320339
sockets[i]=-1;
321340
}
322341
}
342+
if (ds->nNodes<MtmNodes/2+1) {/* no quorum */
343+
ds->status=MTM_OFFLINE;
344+
}elseif (ds->status==MTM_INITIALIZATION) {
345+
ds->status=MTM_CONNECTED;
346+
}
323347
}
324348

325349

@@ -362,8 +386,13 @@ static void MtmAcceptOneConnection()
362386
close(fd);
363387
}else{
364388
Assert(msg.node>0&&msg.node <=MtmNodes&&msg.node!=MtmNodeId);
365-
if (BIT_SET(ds->disabledNodeMask,msg.node-1)) {
366-
elog(WARNING,"Reject attempt to reconnect from disabled node %d",msg.node);
389+
msg.code=MSG_STATUS;
390+
msg.disabledNodeMask=ds->disabledNodeMask;
391+
msg.dxid=HANDSHAKE_MAGIC;
392+
msg.sxid=ShmemVariableCache->nextXid;
393+
msg.csn=MtmGetCurrentTime();
394+
if (!MtmWriteSocket(fd,&msg,sizeofmsg)) {
395+
elog(WARNING,"Arbiter failed to write response for handshake message from node %d",msg.node);
367396
close(fd);
368397
}else {
369398
elog(NOTICE,"Arbiter established connection with node %d",msg.node);
@@ -427,6 +456,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
427456
buf->data[buf->used].sxid=ts->xid;
428457
buf->data[buf->used].csn=ts->csn;
429458
buf->data[buf->used].node=MtmNodeId;
459+
buf->data[buf->used].disabledNodeMask=ds->disabledNodeMask;
430460
buf->used+=1;
431461
}
432462

@@ -659,12 +689,20 @@ static void MtmTransReceiver(Datum arg)
659689
switch (msg->code) {
660690
caseMSG_PREPARE:
661691
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
662-
ts->status=TRANSACTION_STATUS_UNKNOWN;
663-
ts->csn=MtmAssignCSN();
664-
ts->cmd=MSG_PREPARED;
692+
if ((msg->disabledNodeMask& ~ds->disabledNodeMask)!=0) {
693+
/* Coordinator's disabled mask is wider than my:so reject such transaction to avoid
694+
commit on smaller subset of nodes */
695+
ts->status=TRANSACTION_STATUS_ABORTED;
696+
ts->cmd=MSG_ABORT;
697+
MtmAdjustSubtransactions(ts);
698+
MtmWakeUpBackend(ts);
699+
}else {
700+
ts->status=TRANSACTION_STATUS_UNKNOWN;
701+
ts->csn=MtmAssignCSN();
702+
ts->cmd=MSG_PREPARED;
703+
}
665704
MtmSendNotificationMessage(ts);
666705
break;
667-
break;
668706
caseMSG_COMMIT:
669707
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN);
670708
Assert(ts->csn<msg->csn);

‎multimaster.c

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ typedef struct {
7070
#defineUSEC 1000000
7171
#defineMIN_WAIT_TIMEOUT 1000
7272
#defineMAX_WAIT_TIMEOUT 100000
73+
#defineSTATUS_POLL_DELAY USEC
7374

7475
void_PG_init(void);
7576
void_PG_fini(void);
@@ -147,7 +148,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
147148
voidMtmLock(LWLockModemode)
148149
{
149150
#ifdefUSE_SPINLOCK
150-
SpinLockAcquire(&dtm->hashSpinlock);
151+
SpinLockAcquire(&dtm->spinlock);
151152
#else
152153
LWLockAcquire(dtm->hashLock,mode);
153154
#endif
@@ -156,7 +157,7 @@ void MtmLock(LWLockMode mode)
156157
voidMtmUnlock(void)
157158
{
158159
#ifdefUSE_SPINLOCK
159-
SpinLockRelease(&dtm->hashSpinlock);
160+
SpinLockRelease(&dtm->spinlock);
160161
#else
161162
LWLockRelease(dtm->hashLock);
162163
#endif
@@ -409,20 +410,22 @@ static void MtmInitialize()
409410
dtm= (MtmState*)ShmemInitStruct(MULTIMASTER_NAME,sizeof(MtmState),&found);
410411
if (!found)
411412
{
413+
dtm->status=MTM_INITIALIZATION;
414+
dtm->recoverySlot=0;
412415
dtm->hashLock= (LWLock*)GetNamedLWLockTranche(MULTIMASTER_NAME);
413416
dtm->csn=MtmGetCurrentTime();
414417
dtm->oldestXid=FirstNormalTransactionId;
415418
dtm->nNodes=MtmNodes;
416419
dtm->disabledNodeMask=0;
420+
dtm->pglogicalNodeMask=0;
417421
dtm->votingTransactions=NULL;
418422
dtm->transListHead=NULL;
419-
dtm->transListTail=&dtm->transListHead;
420-
pg_atomic_write_u32(&dtm->nReceivers,0);
423+
dtm->transListTail=&dtm->transListHead;
424+
dtm->nReceivers=0;
421425
dtm->timeShift=0;
422-
dtm->initialized= false;
423426
PGSemaphoreCreate(&dtm->votingSemaphore);
424427
PGSemaphoreReset(&dtm->votingSemaphore);
425-
SpinLockInit(&dtm->hashSpinlock);
428+
SpinLockInit(&dtm->spinlock);
426429
BgwPoolInit(&dtm->pool,MtmExecutor,MtmDatabaseName,MtmQueueSize);
427430
RegisterXactCallback(MtmXactCallback,NULL);
428431
dtmTx.snapshot=INVALID_CSN;
@@ -463,7 +466,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
463466
MtmLock(LW_EXCLUSIVE);
464467
x->xid=GetCurrentTransactionIdIfAny();
465468
x->isReplicated= false;
466-
x->isDistributed=IsNormalProcessingMode()&&dtm->initialized&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
469+
x->isDistributed=IsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
467470
x->containsDML= false;
468471
x->snapshot=MtmAssignCSN();
469472
x->gtid.xid=InvalidTransactionId;
@@ -575,8 +578,6 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
575578
XidStatusprevStatus=TRANSACTION_STATUS_UNKNOWN;
576579
boolfound;
577580

578-
Assert(status==TRANSACTION_STATUS_ABORTED);
579-
580581
MtmLock(LW_EXCLUSIVE);
581582
ts=hash_search(xid2state,&xid,HASH_ENTER,&found);
582583
if (!found) {
@@ -590,7 +591,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
590591
ts->status=status;
591592
MtmAdjustSubtransactions(ts);
592593

593-
if (prevStatus!=TRANSACTION_STATUS_ABORTED) {
594+
if (dtm->status!=MTM_RECOVERY&&prevStatus!=TRANSACTION_STATUS_ABORTED) {
594595
ts->cmd=MSG_ABORTED;
595596
MtmSendNotificationMessage(ts);
596597
}
@@ -607,7 +608,7 @@ MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
607608
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n",getpid(),xid,dtmTx.xid,status,dtmTx.isDistributed);
608609
if (xid==dtmTx.xid&&dtmTx.isDistributed)
609610
{
610-
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML)
611+
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML||dtm->status==MTM_RECOVERY)
611612
{
612613
MtmFinishTransaction(xid,nsubxids,subxids,status);
613614
MTM_TRACE("Finish transaction %d, status=%d, DML=%d\n",xid,status,dtmTx.containsDML);
@@ -863,11 +864,17 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
863864
dtmTx.containsDML= true;
864865
}
865866

866-
voidMtmReceiverStarted()
867+
voidMtmReceiverStarted(intnodeId)
867868
{
868-
if (pg_atomic_fetch_add_u32(&dtm->nReceivers,1)==dtm->nNodes-2) {
869-
dtm->initialized= true;
869+
SpinLockAcquire(&dtm->spinlock);
870+
if (!BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
871+
dtm->pglogicalNodeMask |= (int64)1 << (nodeId-1);
872+
if (++dtm->nReceivers==dtm->nNodes-1) {
873+
Assert(dtm->status==MTM_CONNECTED);
874+
dtm->status=MTM_ONLINE;
875+
}
870876
}
877+
SpinLockRelease(&dtm->spinlock);
871878
}
872879

873880
csn_tMtmTransactionSnapshot(TransactionIdxid)
@@ -885,10 +892,23 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
885892
returnsnapshot;
886893
}
887894

888-
895+
MtmSlotModeMtmReceiverSlotMode(intnodeId)
896+
{
897+
while (dtm->status!=MTM_CONNECTED&&dtm->status!=MTM_ONLINE) {
898+
if (dtm->status==MTM_RECOVERY) {
899+
if (dtm->recoverySlot==0||dtm->recoverySlot==nodeId) {
900+
dtm->recoverySlot=nodeId;
901+
returnSLOT_OPEN_EXISTED;
902+
}
903+
}
904+
MtmSleep(STATUS_POLL_DELAY);
905+
}
906+
returndtm->recoverySlot ?SLOT_CREATE_NEW :SLOT_OPEN_ALWAYS;
907+
}
908+
889909
voidMtmDropNode(intnodeId,booldropSlot)
890910
{
891-
if (!BIT_SET(dtm->disabledNodeMask,nodeId-1))
911+
if (!BIT_CHECK(dtm->disabledNodeMask,nodeId-1))
892912
{
893913
if (nodeId <=0||nodeId>dtm->nNodes)
894914
{
@@ -969,7 +989,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
969989
p=conn_str_end;
970990
}
971991
*p='\0';
972-
if (!BIT_SET(disabledNodeMask,i))
992+
if (!BIT_CHECK(disabledNodeMask,i))
973993
{
974994
conns[i]=PQconnectdb(conn_str);
975995
if (PQstatus(conns[i])!=CONNECTION_OK)

‎multimaster.h

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#define MTM_TUPLE_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
*/
1414

15-
#defineBIT_SET(mask,bit) ((mask) & ((int64)1 << (bit)))
15+
#defineBIT_CHECK(mask,bit) ((mask) & ((int64)1 << (bit)))
1616

1717
#defineMULTIMASTER_NAME "mtm"
1818
#defineMULTIMASTER_SCHEMA_NAME "mtm"
@@ -46,9 +46,25 @@ typedef enum
4646
MSG_ABORT,
4747
MSG_PREPARED,
4848
MSG_COMMITTED,
49-
MSG_ABORTED
49+
MSG_ABORTED,
50+
MSG_STATUS
5051
}MtmMessageCode;
5152

53+
typedefenum
54+
{
55+
MTM_INITIALIZATION,/* Initial status */
56+
MTM_OFFLINE,/* Node is out of quorum */
57+
MTM_CONNECTED,/* Arbiter is established connections with other nodes */
58+
MTM_ONLINE,/* Ready to receive client's queries */
59+
MTM_RECOVERY/* Node is in recovery process */
60+
}MtmNodeStatus;
61+
62+
typedefenum
63+
{
64+
SLOT_CREATE_NEW,/* create new slot (drop existed) */
65+
SLOT_OPEN_EXISTED,/* open existed slot */
66+
SLOT_OPEN_ALWAYS,/* open existed slot or create new if noty exists */
67+
}MtmSlotMode;
5268

5369
typedefstructMtmTransState
5470
{
@@ -71,16 +87,18 @@ typedef struct MtmTransState
7187

7288
typedefstruct
7389
{
74-
volatileslock_thashSpinlock;/* spinlock used to protect access to hash table */
90+
MtmNodeStatusstatus;/* Status of this node */
91+
intrecoverySlot;/* NodeId of recovery slot or 0 if none */
92+
volatileslock_tspinlock;/* spinlock used to protect access to hash table */
7593
PGSemaphoreDatavotingSemaphore;/* semaphore used to notify mtm-sender about new responses to coordinator */
7694
LWLockIdhashLock;/* lock to synchronize access to hash table */
7795
TransactionIdoldestXid;/* XID of oldest transaction visible by any active transaction (local or global) */
78-
int64disabledNodeMask;/* bitmask of disable nodes (so no more than 64 nodes in multimaster:) */
96+
int64disabledNodeMask;/* bitmask of disabled nodes (so no more than 64 nodes in multimaster:) */
97+
int64pglogicalNodeMask;/* bitmask of started pglogic receviers */
7998
intnNodes;/* number of active nodes */
80-
pg_atomic_uint32nReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
81-
longtimeShift;/* local time correction */
82-
boolinitialized;
83-
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
99+
intnReceivers;/* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
100+
longtimeShift;/* local time correction */
101+
csn_tcsn;/* last obtained CSN: used to provide unique acending CSNs based on system time */
84102
MtmTransState*votingTransactions;/* L1-list of replicated transactions sendings notifications to coordinator.
85103
This list is used to pass information to mtm-sender BGW */
86104
MtmTransState*transListHead;/* L1 list of all finished transactions present in xid2state hash.
@@ -107,7 +125,8 @@ extern csn_t MtmTransactionSnapshot(TransactionId xid);
107125
externcsn_tMtmAssignCSN(void);
108126
externcsn_tMtmSyncClock(csn_tcsn);
109127
externvoidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tsnapshot);
110-
externvoidMtmReceiverStarted(void);
128+
externvoidMtmReceiverStarted(intnodeId);
129+
externMtmSlotModeMtmReceiverSlotMode(intnodeId);
111130
externvoidMtmExecute(void*work,intsize);
112131
externvoidMtmExecutor(intid,void*work,size_tsize);
113132
externHTAB*MtmCreateHash(void);

‎pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
108108
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
109109
csn_tcsn=MtmTransactionSnapshot(txn->xid);
110110
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
111-
if (csn==INVALID_CSN||BIT_SET(mm->state->disabledNodeMask,mm->nodeId-1)) {
111+
if (csn==INVALID_CSN||BIT_CHECK(mm->state->disabledNodeMask,mm->nodeId-1)) {
112112
mm->isLocal= true;
113113
}else {
114114
mm->isLocal= false;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp