Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitcd6b6d2

Browse files
knizhnikkelvich
authored andcommitted
Update MMTS
1 parentbf78f0b commitcd6b6d2

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

‎arbiter.c

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ typedef struct
6161

6262
typedefstruct
6363
{
64-
DtmCOmmitMessagebuf[TX_BUFFER_SIZE];
64+
DtmCommitMessagedata[TX_BUFFER_SIZE];
6565
intused;
6666
}DtmTxBuffer;
6767

6868
staticint*sockets;
69-
staticDtmCommitMessage**txBuffers;
69+
staticDtmTxBuffer*txBuffers;
7070

7171
staticBackgroundWorkerDtmSender= {
7272
"mm-sender",
@@ -216,18 +216,40 @@ static void acceptConnections()
216216

217217
staticvoidDtmTransSender(Datumarg)
218218
{
219-
txBuffer= (DtmCommitMessage*)
219+
intnNodes=dtm->nNodes;
220+
inti;
221+
DtmCommitMessage*txBuffer= (DtmCommitMessage*)palloc(sizeof(DtmTxBuffer)*(nNodes));
222+
220223
openConnections();
221224

225+
for (i=0;i<nNodes;i++) {
226+
txBuffer[i].used=0;
227+
}
228+
222229
while (true) {
223-
DtmTransState*ts;
230+
DtmTransState*ts;
224231
PGSemaphoreLock(&dtm->semphore);
225232

226-
LWLockAcquire(&dtm->hashLock,LW_EXCLUSIVE);
233+
SpinLockAcquire(&dtm->spinlock);
234+
ts=dtm->pendingTransactions;
235+
dtm->pendingTransactions=NULL;
236+
SpinLockRelease(&dtm->spinlock);
237+
227238
for (ts=dtm->pendingTransactions;ts!=NULL;ts=ts->nextPending) {
228239
intnode=ts->gtid.node;
229240
Assert(node!=MMNodeId);
230-
sockets
241+
node-=1;
242+
if (txBuffer[node].used==TX_BUFFER_SIZE) {
243+
WriteSocket(sockets[node],txBuffer[node].data,txBuffer[node].used*sizeof(DtmCommitRequest));
244+
txBuffer[node].used=0;
245+
}
246+
txBuffer[node].data[txBuffer[node].used].xid=ts->xid;
247+
txBuffer[node].data[txBuffer[node].used].csn=ts->csn;
248+
txBuffer[node].used+=1;
249+
}
250+
dtm->pendingTransactions=NULL;
251+
252+
}
231253
}
232254

233255
staticvoidDtmTransReceiver(Datumarg)

‎multimaster.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -983,14 +983,17 @@ void MMVoteForTransaction(DtmTransState* ts)
983983
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
984984
ResetLatch(&MyProc->procLatch);
985985
}
986-
LWLockAcquire(&dtm->hashLock<LW_EXCLUSIVE);
987986
}else {
988-
/* I am replica: firrst notify master... */
987+
/* I am replica: first notify master... */
988+
SpinLockAcquire(&dtm->spinlock);
989989
ts->nextPending=dtm->pendingTransactions;
990990
dtm->pendingTransactions=ts;
991+
SpinLockRelease(&dtm->spinlock);
992+
991993
PGSemaphoreUnlock(&dtm->semapahore);
992994
/* ... and wait reposnse from it */
993995
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
994996
ResetLatch(&MyProc->procLatch);
995997
}
998+
LWLockAcquire(&dtm->hashLock<LW_EXCLUSIVE);
996999
}

‎multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ typedef struct DtmTransState
3232

3333
typedefstruct
3434
{
35+
volatileslock_tspinlock;
36+
PGSemaphoreDatasemaphore;
3537
LWLockIdhashLock;
3638
TransactionIdminXid;/* XID of oldest transaction visible by any active transaction (local or global) */
3739
int64disabledNodeMask;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp