Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5f3f8e2

Browse files
knizhnikkelvich
authored andcommitted
MMTS bug fixes
1 parentca55e19 commit5f3f8e2

File tree

3 files changed

+43
-28
lines changed

3 files changed

+43
-28
lines changed

‎arbiter.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
2-
*multimaster.c
2+
*arbiter.c
33
*
4-
*Multimaster based on logical replication
4+
*Coordinate global transaction commit
55
*
66
*/
77

@@ -99,15 +99,15 @@ static void DtmTransReceiver(Datum arg);
9999
staticBackgroundWorkerDtmSender= {
100100
"mm-sender",
101101
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
102-
BgWorkerStart_PostmasterStart,
102+
BgWorkerStart_ConsistentState,
103103
1,/* restrart in one second (is it possible to restort immediately?) */
104104
DtmTransSender
105105
};
106106

107107
staticBackgroundWorkerDtmRecevier= {
108108
"mm-receiver",
109109
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
110-
BgWorkerStart_PostmasterStart,
110+
BgWorkerStart_ConsistentState,
111111
1,/* restrart in one second (is it possible to restort immediately?) */
112112
DtmTransReceiver
113113
};
@@ -216,7 +216,7 @@ static int connectSocket(char const* host, int port)
216216

217217
staticvoidopenConnections()
218218
{
219-
intnNodes=ds->nNodes;
219+
intnNodes=MMNodes;
220220
inti;
221221
char*connStr=pstrdup(MMConnStrs);
222222

@@ -228,9 +228,14 @@ static void openConnections()
228228
if (host==NULL) {
229229
elog(ERROR,"Invalid connection string: '%s'",MMConnStrs);
230230
}
231-
for (end=host+5;*end!=' '&&*end!=','&&end!='\0';end++);
232-
*end='\0';
233-
connStr=end+1;
231+
host+=5;
232+
for (end=host;*end!=' '&&*end!=','&&*end!='\0';end++);
233+
if (*end!='\0') {
234+
*end='\0';
235+
connStr=end+1;
236+
}else {
237+
connStr=end;
238+
}
234239
sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i) :-1;
235240
}
236241
}
@@ -241,7 +246,7 @@ static void acceptConnections()
241246
inti;
242247
intsd;
243248
inton=1;
244-
intnNodes=ds->nNodes-1;
249+
intnNodes=MMNodes-1;
245250

246251
sockets= (int*)palloc(sizeof(int)*nNodes);
247252

@@ -359,7 +364,6 @@ static void DtmTransReceiver(Datum arg)
359364
{
360365
intnNodes=MMNodes-1;
361366
inti,j,rc;
362-
intrxBufPos=0;
363367
DtmBuffer*rxBuffer= (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes);
364368
HTAB*xid2state;
365369

@@ -408,7 +412,7 @@ static void DtmTransReceiver(Datum arg)
408412
#endif
409413
{
410414
intnResponses;
411-
rxBuffer[i].used+=readSocket(sockets[i], (char*)rxBuffer[i].data+rxBuffer[i].used,BUFFER_SIZE-rxBufPos);
415+
rxBuffer[i].used+=readSocket(sockets[i], (char*)rxBuffer[i].data+rxBuffer[i].used,BUFFER_SIZE-rxBuffer[i].used);
412416
nResponses=rxBuffer[i].used/sizeof(DtmCommitMessage);
413417

414418
LWLockAcquire(ds->hashLock,LW_SHARED);
@@ -426,6 +430,8 @@ static void DtmTransReceiver(Datum arg)
426430
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
427431
}
428432
}
433+
LWLockRelease(ds->hashLock);
434+
429435
rxBuffer[i].used-=nResponses*sizeof(DtmCommitMessage);
430436
if (rxBuffer[i].used!=0) {
431437
memmove(rxBuffer[i].data, (char*)rxBuffer[i].data+nResponses*sizeof(DtmCommitMessage),rxBuffer[i].used);

‎multimaster.c

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ static void DtmPrepareTransaction(DtmCurrentTrans* x)
453453
ts->status=TRANSACTION_STATUS_UNKNOWN;
454454
ts->csn=dtm_get_csn();
455455
ts->procno=MyProc->pgprocno;
456-
ts->nVotes=1;/*I voted myself */
456+
ts->nVotes=1;/*My own voice */
457457
for (i=0;i<MMNodes;i++) {
458458
ts->xids[i]=InvalidTransactionId;
459459
}
@@ -479,7 +479,8 @@ static XidStatus
479479
DtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids)
480480
{
481481
DtmTransState*ts;
482-
csn_tcsn;
482+
csn_tlocalCSN;
483+
csn_tglobalCSN;
483484
inti;
484485
XidStatusstatus;
485486

@@ -489,44 +490,45 @@ DtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
489490

490491
/* now transaction is in doubt state */
491492
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
492-
csn=dtm_get_csn();
493-
if (csn>ts->csn) {
494-
ts->csn=csn;
495-
}
493+
localCSN=dtm_get_csn();
494+
ts->csn=localCSN;
496495
DtmTransactionListAppend(ts);
497496
DtmAddSubtransactions(ts,subxids,nsubxids);
498497

499498
MMVoteForTransaction(ts);/* wait until transaction at all nodes are prepared */
500-
csn=ts->csn;
501-
if (csn!=INVALID_CSN) {
502-
dtm_sync(csn);
499+
globalCSN=ts->csn;
500+
Assert(globalCSN >=localCSN);
501+
502+
if (globalCSN!=INVALID_CSN) {
503+
dtm_sync(globalCSN);
503504
status=TRANSACTION_STATUS_COMMITTED;
504505
}else {
506+
ts->csn=globalCSN=localCSN;
505507
status=TRANSACTION_STATUS_ABORTED;
506508
}
507509
ts->status=status;
508510
for (i=0;i<nsubxids;i++) {
509511
ts=ts->next;
510512
ts->status=status;
511-
ts->csn=csn;
513+
ts->csn=globalCSN;
512514
}
513515
LWLockRelease(dtm->hashLock);
514516
returnstatus;
515517
}
516518

517519
staticvoid
518-
DtmAbortTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids)
520+
DtmFinishTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus)
519521
{
520522
inti;
521523
DtmTransState*ts;
522524

523525
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
524526
ts=hash_search(xid2state,&xid,HASH_FIND,NULL);
525527
Assert(ts!=NULL);/* should be created by DtmPrepareTransaction */
526-
ts->status=TRANSACTION_STATUS_ABORTED;
528+
ts->status=status;
527529
for (i=0;i<nsubxids;i++) {
528530
ts=ts->next;
529-
ts->status=TRANSACTION_STATUS_ABORTED;
531+
ts->status=status;
530532
}
531533
LWLockRelease(dtm->hashLock);
532534
}
@@ -539,9 +541,10 @@ DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
539541
DTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
540542
if (dtmTx.isDistributed)
541543
{
544+
Assert(xid==dtmTx.xid);
542545
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML)
543546
{
544-
DtmAbortTransaction(xid,nsubxids,subxids);
547+
DtmFinishTransaction(xid,nsubxids,subxids,status);
545548
DTM_INFO("Abort transaction %d\n",xid);
546549
}
547550
else
@@ -990,12 +993,18 @@ MMPoolConstructor(void)
990993
staticvoid
991994
SendCommitMessage(DtmTransState*ts)
992995
{
996+
DtmTransState*votingList;
997+
993998
SpinLockAcquire(&dtm->votingSpinlock);
994-
ts->nextVoting=dtm->votingTransactions;
999+
votingList=dtm->votingTransactions;
1000+
ts->nextVoting=votingList;
9951001
dtm->votingTransactions=ts;
9961002
SpinLockRelease(&dtm->votingSpinlock);
9971003

998-
PGSemaphoreUnlock(&dtm->votingSemaphore);
1004+
if (votingList==NULL) {
1005+
/* singal semaphreo only once for the whole list */
1006+
PGSemaphoreUnlock(&dtm->votingSemaphore);
1007+
}
9991008
}
10001009

10011010
staticvoid
@@ -1011,7 +1020,7 @@ MMVoteForTransaction(DtmTransState* ts)
10111020
/* ... and then send notifications to replicas */
10121021
SendCommitMessage(ts);
10131022
}else {
1014-
/* I am replica: first notifymaster... */
1023+
/* I am replica: first notifycoordinator... */
10151024
ts->nVotes=dtm->nNodes-1;/* I just need one confirmation from coordinator */
10161025
SendCommitMessage(ts);
10171026
/* ... and wait response from it */

‎tests/dtmbench

9.18 KB
Binary file not shown.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp