Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5e35877

Browse files
knizhnikkelvich
authored andcommitted
Correctly store LSN
1 parentc9f6ec1 commit5e35877

File tree

8 files changed

+214
-117
lines changed

8 files changed

+214
-117
lines changed

‎arbiter.c

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,7 @@ static char const* const messageText[] =
106106
"HANDSHAKE",
107107
"READY",
108108
"PREPARE",
109-
"COMMIT",
110-
"ABORT",
111109
"PREPARED",
112-
"COMMITTED",
113110
"ABORTED",
114111
"STATUS"
115112
};
@@ -456,8 +453,10 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
456453
buf->used=0;
457454
}
458455
MTM_TRACE("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
459-
ts->status==TRANSACTION_STATUS_ABORTED ?"abort" :"commit",ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
460-
buf->data[buf->used].code=ts->status==TRANSACTION_STATUS_ABORTED ?MSG_ABORTED :MSG_PREPARED;
456+
messageText[ts->cmd],ts->csn,node+1,MtmNodeId,ts->gtid.xid,ts->xid);
457+
458+
Assert(ts->cmd!=MSG_INVALID);
459+
buf->data[buf->used].code=ts->cmd;
461460
buf->data[buf->used].dxid=xid;
462461
buf->data[buf->used].sxid=ts->xid;
463462
buf->data[buf->used].csn=ts->csn;
@@ -466,6 +465,22 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
466465
buf->used+=1;
467466
}
468467

468+
staticvoidMtmBroadcastMessage(MtmBuffer*txBuffer,MtmTransState*ts)
469+
{
470+
inti;
471+
intn=1;
472+
for (i=0;i<MtmNodes;i++)
473+
{
474+
if (TransactionIdIsValid(ts->xids[i])) {
475+
Assert(i+1!=MtmNodeId);
476+
MtmAppendBuffer(txBuffer,ts->xids[i],i,ts);
477+
n+=1;
478+
}
479+
}
480+
Assert(n==ds->nNodes);
481+
}
482+
483+
469484
staticvoidMtmTransSender(Datumarg)
470485
{
471486
intnNodes=MtmNodes;
@@ -492,7 +507,11 @@ static void MtmTransSender(Datum arg)
492507
MtmLock(LW_SHARED);
493508

494509
for (ts=ds->votingTransactions;ts!=NULL;ts=ts->nextVoting) {
495-
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
510+
if (MtmIsCoordinator(ts)) {
511+
MtmBroadcastMessage(txBuffer,ts);
512+
}else {
513+
MtmAppendBuffer(txBuffer,ts->gtid.xid,ts->gtid.node-1,ts);
514+
}
496515
}
497516
ds->votingTransactions=NULL;
498517

@@ -510,6 +529,7 @@ static void MtmTransSender(Datum arg)
510529
staticvoidMtmWakeUpBackend(MtmTransState*ts)
511530
{
512531
MTM_TRACE("Wakeup backed procno=%d, pid=%d\n",ts->procno,ProcGlobal->allProcs[ts->procno].pid);
532+
ts->votingCompleted= true;
513533
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
514534
}
515535

@@ -565,6 +585,9 @@ static void MtmTransReceiver(Datum arg)
565585
#ifUSE_EPOLL
566586
n=epoll_wait(epollfd,events,nNodes,MtmKeepaliveTimeout/1000);
567587
if (n<0) {
588+
if (errno==EINTR) {
589+
continue;
590+
}
568591
elog(ERROR,"Arbiter failed to poll sockets: %d",errno);
569592
}
570593
for (j=0;j<n;j++) {
@@ -581,7 +604,9 @@ static void MtmTransReceiver(Datum arg)
581604
events=inset;
582605
tv.tv_sec=MtmKeepaliveTimeout/USEC;
583606
tv.tv_usec=MtmKeepaliveTimeout%USEC;
584-
n=select(max_fd+1,&events,NULL,NULL,&tv);
607+
do {
608+
n=select(max_fd+1,&events,NULL,NULL,&tv);
609+
}while (n<0&&errno==ENINTR);
585610
}while (n<0&&MtmRecovery());
586611

587612
if (rc<0) {
@@ -612,31 +637,62 @@ static void MtmTransReceiver(Datum arg)
612637
MtmTransState*ts= (MtmTransState*)hash_search(MtmXid2State,&msg->dxid,HASH_FIND,NULL);
613638
Assert(ts!=NULL);
614639
Assert(msg->node>0&&msg->node <=nNodes&&msg->node!=MtmNodeId);
615-
Assert (MtmIsCoordinator(ts));
616-
switch (msg->code) {
617-
caseMSG_PREPARED:
618-
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
619-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS||ts->status==TRANSACTION_STATUS_UNKNOWN);
620-
if (msg->csn>ts->csn) {
621-
ts->csn=msg->csn;
622-
MtmSyncClock(ts->csn);
623-
}
624-
if (++ts->nVotes==ds->nNodes) {
625-
MtmWakeUpBackend(ts);
640+
if (MtmIsCoordinator(ts)) {
641+
switch (msg->code) {
642+
caseMSG_READY:
643+
Assert(ts->nVotes<ds->nNodes);
644+
ds->nodeTransDelay[msg->node-1]+=MtmGetCurrentTime()-ts->csn;
645+
ts->xids[msg->node-1]=msg->sxid;
646+
if (++ts->nVotes==ds->nNodes) {
647+
/* All nodes are finished their transactions */
648+
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
649+
ts->nVotes=1;/* I voted myself */
650+
MtmSendNotificationMessage(ts,MSG_PREPARE);
651+
}else {
652+
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
653+
MtmWakeUpBackend(ts);
626654
}
627655
}
628-
break;
629-
caseMSG_ABORTED:
656+
break;
657+
caseMSG_ABORTED:
658+
Assert(ts->nVotes<ds->nNodes);
630659
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
631-
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS||ts->status==TRANSACTION_STATUS_UNKNOWN);
660+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
632661
ts->status=TRANSACTION_STATUS_ABORTED;
633662
MtmAdjustSubtransactions(ts);
663+
}
664+
if (++ts->nVotes==ds->nNodes) {
634665
MtmWakeUpBackend(ts);
635666
}
636667
break;
637-
default:
668+
caseMSG_PREPARED:
669+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
670+
Assert(ts->nVotes<ds->nNodes);
671+
if (msg->csn>ts->csn) {
672+
ts->csn=msg->csn;
673+
MtmSyncClock(ts->csn);
674+
}
675+
if (++ts->nVotes==ds->nNodes) {
676+
ts->csn=MtmAssignCSN();
677+
ts->status=TRANSACTION_STATUS_UNKNOWN;
678+
MtmWakeUpBackend(ts);
679+
}
680+
default:
681+
Assert(false);
682+
}
683+
}else {
684+
switch (msg->code) {
685+
caseMSG_PREPARE:
686+
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
687+
ts->status=TRANSACTION_STATUS_UNKNOWN;
688+
ts->csn=MtmAssignCSN();
689+
MtmAdjustSubtransactions(ts);
690+
MtmSendNotificationMessage(ts,MSG_PREPARED);
691+
break;
692+
default:
638693
Assert(false);
639-
}
694+
}
695+
}
640696
}
641697
MtmUnlock();
642698

‎bgwpool.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ static void BgwPoolMainLoop(Datum arg)
3737
SpinLockAcquire(&pool->lock);
3838
size=*(int*)&pool->queue[pool->head];
3939
Assert(size<pool->size);
40-
work=palloc(size);
40+
work=malloc(size);
4141
pool->active-=1;
4242
if (pool->head+size+4>pool->size) {
4343
memcpy(work,pool->queue,size);
@@ -55,7 +55,7 @@ static void BgwPoolMainLoop(Datum arg)
5555
}
5656
SpinLockRelease(&pool->lock);
5757
pool->executor(id,work,size);
58-
pfree(work);
58+
free(work);
5959
}
6060
}
6161

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp