Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf0b478b

Browse files
knizhnikkelvich
authored andcommitted
reduce in-doubt stage in MMTS
1 parent814af3e commitf0b478b

File tree

5 files changed

+68
-40
lines changed

5 files changed

+68
-40
lines changed

‎arbiter.c

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,20 @@
7676
#defineMAX_ROUTES 16
7777
#defineBUFFER_SIZE 1024
7878

79+
typedefenum
80+
{
81+
MSG_PREPARE,
82+
MSG_COMMIT,
83+
MSG_ABORT
84+
}MessageCode;
85+
86+
7987
typedefstruct
8088
{
89+
MessageCodecode;/* Message code: MSG_PREPARE, MSG_COMMIT, MSG_ABORT
90+
int node; /* Sender node ID */
8191
TransactionIddxid;/* Transaction ID at destination node */
8292
TransactionIdsxid;/* Transaction IO at sender node */
83-
intnode;/* Sender node ID */
8493
csn_tcsn;/* local CSN in case of sending data from replica to master, global CSN master->replica */
8594
}DtmCommitMessage;
8695

@@ -100,15 +109,15 @@ static BackgroundWorker DtmSender = {
100109
"mm-sender",
101110
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
102111
BgWorkerStart_ConsistentState,
103-
1,/*restrart in one second (is it possible torestort immediately?) */
112+
1,/*restart in one second (is it possible torestart immediately?) */
104113
DtmTransSender
105114
};
106115

107116
staticBackgroundWorkerDtmRecevier= {
108117
"mm-receiver",
109118
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,/* do not need connection to the database */
110119
BgWorkerStart_ConsistentState,
111-
1,/*restrart in one second (is it possible torestort immediately?) */
120+
1,/*restart in one second (is it possible torestart immediately?) */
112121
DtmTransReceiver
113122
};
114123

@@ -300,6 +309,25 @@ static int readSocket(int sd, void* buf, int buf_size)
300309
returnrc;
301310
}
302311

312+
staticboolIsCoordinator(DtmTransState*ts)
313+
{
314+
returnts->dsid.node==MMNodeId;
315+
}
316+
317+
staticvoidDtmAppendBuffer(MessageCodecode,DtmBuffer*txBuffer,TransactionIdxid,intnode,DtmTransState*ts)
318+
{
319+
DtmBuffer*buf=&txBuffer[node];
320+
if (buf->used==BUFFER_SIZE) {
321+
writeSocket(sockets[node],buf->data,buf->used*sizeof(DtmCommitMessage));
322+
buf->used=0;
323+
}
324+
buf->data[buf->used].code=code;
325+
buf->data[buf->used].dxid=xid;
326+
buf->data[buf->used].sxid=ts->xid;
327+
buf->data[buf->used].csn=ts->status==TRANSACTION_STATUS_ABORTED ?INVALID_CSN :ts->csn;
328+
buf->data[buf->used].node=MMNodeId;
329+
buf->used+=1;
330+
}
303331

304332
staticvoidDtmTransSender(Datumarg)
305333
{
@@ -327,38 +355,18 @@ static void DtmTransSender(Datum arg)
327355
SpinLockRelease(&ds->votingSpinlock);
328356

329357
for (;ts!=NULL;ts=ts->nextVoting) {
330-
if (ts->gtid.node==MMNodeId) {
331-
/* Coordinator is broadcastingconfirmations to replicas */
358+
if (IsCoordinator(ts)) {
359+
/* Coordinator is broadcastingPREPARE message to replicas */
332360
for (i=0;i<nNodes;i++) {
333361
if (TransactionIdIsValid(ts->xids[i])) {
334-
if (txBuffer[i].used==BUFFER_SIZE) {
335-
writeSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitMessage));
336-
txBuffer[i].used=0;
337-
}
338-
DTM_TRACE("Send notification %ld to replica %d from coordinator %d for transaction %d (local transaction %d)\n",
339-
ts->csn,i+1,MMNodeId,ts->xid,ts->xids[i]);
340-
341-
txBuffer[i].data[txBuffer[i].used].dxid=ts->xids[i];
342-
txBuffer[i].data[txBuffer[i].used].sxid=ts->xid;
343-
txBuffer[i].data[txBuffer[i].used].csn=ts->csn;
344-
txBuffer[i].data[txBuffer[i].used].node=MMNodeId;
345-
txBuffer[i].used+=1;
362+
DtmAppendBuffer(CMD_PREPARE,txBuffer,ts->xids[i],i,ts);
346363
}
347364
}
348365
}else {
349-
/* Replica is notifying master */
350-
i=ts->gtid.node-1;
351-
if (txBuffer[i].used==BUFFER_SIZE) {
352-
writeSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitMessage));
353-
txBuffer[i].used=0;
354-
}
366+
/* Replica is notifying master that it is ready to PREPARE */
355367
DTM_TRACE("Send notification %ld to coordinator %d from node %d for transaction %d (local transaction %d)\n",
356368
ts->csn,ts->gtid.node,MMNodeId,ts->gtid.xid,ts->xid);
357-
txBuffer[i].data[txBuffer[i].used].dxid=ts->gtid.xid;
358-
txBuffer[i].data[txBuffer[i].used].sxid=ts->xid;
359-
txBuffer[i].data[txBuffer[i].used].node=MMNodeId;
360-
txBuffer[i].data[txBuffer[i].used].csn=ts->csn;
361-
txBuffer[i].used+=1;
369+
DtmAppendBuffer(CMD_PREPARE,txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
362370
}
363371
}
364372
for (i=0;i<nNodes;i++) {
@@ -431,9 +439,33 @@ static void DtmTransReceiver(Datum arg)
431439
DtmCommitMessage*msg=&rxBuffer[i].data[j];
432440
DtmTransState*ts= (DtmTransState*)hash_search(xid2state,&msg->dxid,HASH_FIND,NULL);
433441
Assert(ts!=NULL);
434-
if (msg->csn>ts->csn) {
435-
ts->csn=msg->csn;
436-
}
442+
switch (msg->code) {
443+
caseCMD_PREPARE:
444+
if (IsCoordinator(ts)) {
445+
switch (msg->command) {
446+
caseCMD_PREPARE:
447+
448+
if (ts->state==TRANSACTION_STATUS_IN_PROGRESS:
449+
/* transaction is in-prepared stage (in-doubt): calculate max CSN */
450+
if (msg->csn>ts->csn) {
451+
ts->csn=msg->csn;
452+
}
453+
Assert(ts->nVotes<dtm->nNodes);
454+
if (++ts->nVotes==dtm->nNodes) {/* receive responses from all nodes */
455+
ts->status=TRANSACTION_STATUS_COMMIT;
456+
457+
if (ts->state==TRANSACTION_STATUS_UNKNOWN) {
458+
/* All nodes are ready to prepare: switch transaction to in-doubt state */
459+
ts->csn=dtm_get_csn();
460+
ts->status=TRANSACTION_STATUS_IN_PROGRESS;
461+
/* and broadcast PREPARE message */
462+
MMSendNotificationMessage(ts);
463+
}elseif (ts->state==CMD_ABORT) {
464+
ts->status=TRANSACTION_STATUS_ABORTED;
465+
466+
}else {
467+
Assert(ts->state==TRANSACTION_STATUS_IN_PROGRESS);
468+
437469
Assert((unsigned)(msg->node-1) <= (unsigned)nNodes);
438470
ts->xids[msg->node-1]=msg->sxid;
439471
DTM_TRACE("Receive response %ld for transaction %d votes %d from node %d (transaction %d)\n",

‎multimaster.c

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -342,15 +342,13 @@ DtmAdjustOldestXid(TransactionId xid)
342342
ts= (DtmTransState*)hash_search(xid2state,&xid,HASH_FIND,NULL);
343343
if (ts!=NULL) {
344344
timestamp_tcutoff_time=ts->csn-DtmVacuumDelay*USEC;
345-
#if0
346345
for (ts=dtm->transListHead;ts!=NULL&&ts->csn<cutoff_time;prev=ts,ts=ts->next) {
347346
Assert(ts->status==TRANSACTION_STATUS_COMMITTED||ts->status==TRANSACTION_STATUS_ABORTED);
348347
if (prev!=NULL) {
349348
/* Remove information about too old transactions */
350349
hash_search(xid2state,&prev->xid,HASH_REMOVE,NULL);
351350
}
352351
}
353-
#endif
354352
}
355353
if (prev!=NULL) {
356354
dtm->transListHead=prev;
@@ -477,8 +475,7 @@ DtmEndTransaction(DtmCurrentTrans* x)
477475
x->gtid.xid=InvalidTransactionId;
478476
}
479477

480-
staticvoid
481-
SendNotificationMessage(DtmTransState*ts)
478+
voidMMSendNotificationMessage(DtmTransState*ts)
482479
{
483480
DtmTransState*votingList;
484481

@@ -551,7 +548,7 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
551548
ts->status=status;
552549
}
553550
if (dtmTx.isReplicated) {
554-
SendNotificationMessage(ts);
551+
MMSendNotificationMessage(ts);
555552
}
556553
}
557554
LWLockRelease(dtm->hashLock);
@@ -1026,11 +1023,11 @@ MMVoteForTransaction(DtmTransState* ts)
10261023
Assert(ts->nVotes==dtm->nNodes);
10271024

10281025
/* ... and then send notifications to replicas */
1029-
SendNotificationMessage(ts);
1026+
MMSendNotificationMessage(ts);
10301027
}else {
10311028
/* I am replica: first notify coordinator... */
10321029
ts->nVotes=dtm->nNodes-1;/* I just need one confirmation from coordinator */
1033-
SendNotificationMessage(ts);
1030+
MMSendNotificationMessage(ts);
10341031
/* ... and wait response from it */
10351032
DTM_TRACE("Node %d waiting latch...\n",MMNodeId);
10361033
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ extern void MMReceiverStarted(void);
7373
externvoidMMExecute(void*work,intsize);
7474
externvoidMMExecutor(intid,void*work,size_tsize);
7575
externHTAB*MMCreateHash(void);
76+
externvoidMMSendNotificationMessage(DtmTransState*ts);
7677
externDtmState*MMGetState(void);
7778

7879
#endif

‎tests/dtmbench

26.3 KB
Binary file not shown.

‎tests/dtmbench.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ void* writer(void* arg)
179179
voidinitializeDatabase()
180180
{
181181
connectionconn(cfg.connections[0]);
182-
#if0
183182
printf("creating extension\n");
184183
{
185184
nontransactiontxn(conn);
@@ -197,7 +196,6 @@ void initializeDatabase()
197196
txn.commit();
198197
}
199198
printf("table t created\n");
200-
#endif
201199
printf("inserting stuff into t\n");
202200
{
203201
worktxn(conn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp