Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit814af3e

Browse files
knizhnikkelvich
authored andcommitted
Continue work on MMTS
1 parent5f3f8e2 commit814af3e

File tree

7 files changed

+77
-51
lines changed

7 files changed

+77
-51
lines changed

‎arbiter.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ static int connectSocket(char const* host, int port)
200200
}
201201
if (rc<0) {
202202
if ((errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS)||max_attempts==0) {
203-
elog(ERROR,"Sockhub failed to connect to %s:%d: %d",host,port,errno);
203+
elog(ERROR,"Arbiter failed to connect to %s:%d: %d",host,port,errno);
204204
}else {
205205
max_attempts-=1;
206206
sleep(1);
@@ -236,7 +236,7 @@ static void openConnections()
236236
}else {
237237
connStr=end;
238238
}
239-
sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i) :-1;
239+
sockets[i]=i+1!=MMNodeId ?connectSocket(host,MMArbiterPort+i+1) :-1;
240240
}
241241
}
242242

@@ -260,9 +260,12 @@ static void acceptConnections()
260260
}
261261
setsockopt(sd,SOL_SOCKET,SO_REUSEADDR, (char*)&on,sizeofon);
262262

263-
if (bind(sd, (structsockaddr*)&sock_inet,nNodes-1)<0) {
263+
if (bind(sd, (structsockaddr*)&sock_inet,sizeof(sock_inet))<0) {
264264
elog(ERROR,"Failed to bind socket: %d",errno);
265265
}
266+
if (listen(sd,MMNodes-1)<0) {
267+
elog(ERROR,"Failed to listen socket: %d",errno);
268+
}
266269

267270
for (i=0;i<nNodes;i++) {
268271
intfd=accept(sd,NULL,NULL);
@@ -332,8 +335,13 @@ static void DtmTransSender(Datum arg)
332335
writeSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitMessage));
333336
txBuffer[i].used=0;
334337
}
338+
DTM_TRACE("Send notification %ld to replica %d from coordinator %d for transaction %d (local transaction %d)\n",
339+
ts->csn,i+1,MMNodeId,ts->xid,ts->xids[i]);
340+
335341
txBuffer[i].data[txBuffer[i].used].dxid=ts->xids[i];
342+
txBuffer[i].data[txBuffer[i].used].sxid=ts->xid;
336343
txBuffer[i].data[txBuffer[i].used].csn=ts->csn;
344+
txBuffer[i].data[txBuffer[i].used].node=MMNodeId;
337345
txBuffer[i].used+=1;
338346
}
339347
}
@@ -344,6 +352,8 @@ static void DtmTransSender(Datum arg)
344352
writeSocket(sockets[i],txBuffer[i].data,txBuffer[i].used*sizeof(DtmCommitMessage));
345353
txBuffer[i].used=0;
346354
}
355+
DTM_TRACE("Send notification %ld to coordinator %d from node %d for transaction %d (local transaction %d)\n",
356+
ts->csn,ts->gtid.node,MMNodeId,ts->gtid.xid,ts->xid);
347357
txBuffer[i].data[txBuffer[i].used].dxid=ts->gtid.xid;
348358
txBuffer[i].data[txBuffer[i].used].sxid=ts->xid;
349359
txBuffer[i].data[txBuffer[i].used].node=MMNodeId;
@@ -426,6 +436,9 @@ static void DtmTransReceiver(Datum arg)
426436
}
427437
Assert((unsigned)(msg->node-1) <= (unsigned)nNodes);
428438
ts->xids[msg->node-1]=msg->sxid;
439+
DTM_TRACE("Receive response %ld for transaction %d votes %d from node %d (transaction %d)\n",
440+
msg->csn,msg->dxid,ts->nVotes+1,msg->node,msg->sxid);
441+
Assert(ts->nVotes>0&&ts->nVotes<ds->nNodes);
429442
if (++ts->nVotes==ds->nNodes) {
430443
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
431444
}

‎multimaster.c

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
230230
if (ts!=NULL)
231231
{
232232
if (ts->csn>dtmTx.snapshot) {
233-
DTM_TRACE((stderr,"%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
234-
getpid(),xid,ts->csn,dtmTx.snapshot));
233+
DTM_TUPLE_TRACE("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld\n",
234+
getpid(),xid,ts->csn,dtmTx.snapshot);
235235
LWLockRelease(dtm->hashLock);
236236
return true;
237237
}
238238
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS)
239239
{
240-
DTM_TRACE((stderr,"%d: wait for in-doubt transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot));
240+
DTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot);
241241
LWLockRelease(dtm->hashLock);
242242
#ifTRACE_SLEEP_TIME
243243
{
@@ -255,7 +255,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
255255
if (firstReportTime==0) {
256256
firstReportTime=now;
257257
}else {
258-
fprintf(stderr,"Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n",totalSleepTime,now-firstReportTime,totalSleepTime*100.0/(now-firstReportTime),maxSleepTime);
258+
DTM_TRACE("Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n",totalSleepTime,now-firstReportTime,totalSleepTime*100.0/(now-firstReportTime),maxSleepTime);
259259
}
260260
}
261261
}
@@ -268,15 +268,15 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
268268
else
269269
{
270270
boolinvisible=ts->status!=TRANSACTION_STATUS_COMMITTED;
271-
DTM_TRACE((stderr,"%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
272-
getpid(),xid,ts->csn,invisible ?"rollbacked" :"committed",dtmTx.snapshot));
271+
DTM_TUPLE_TRACE("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld\n",
272+
getpid(),xid,ts->csn,invisible ?"rollbacked" :"committed",dtmTx.snapshot);
273273
LWLockRelease(dtm->hashLock);
274274
returninvisible;
275275
}
276276
}
277277
else
278278
{
279-
DTM_TRACE((stderr,"%d: visibility check is skept for transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot));
279+
DTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot);
280280
break;
281281
}
282282
}
@@ -342,14 +342,15 @@ DtmAdjustOldestXid(TransactionId xid)
342342
ts= (DtmTransState*)hash_search(xid2state,&xid,HASH_FIND,NULL);
343343
if (ts!=NULL) {
344344
timestamp_tcutoff_time=ts->csn-DtmVacuumDelay*USEC;
345-
345+
#if0
346346
for (ts=dtm->transListHead;ts!=NULL&&ts->csn<cutoff_time;prev=ts,ts=ts->next) {
347347
Assert(ts->status==TRANSACTION_STATUS_COMMITTED||ts->status==TRANSACTION_STATUS_ABORTED);
348348
if (prev!=NULL) {
349349
/* Remove information about too old transactions */
350350
hash_search(xid2state,&prev->xid,HASH_REMOVE,NULL);
351351
}
352352
}
353+
#endif
353354
}
354355
if (prev!=NULL) {
355356
dtm->transListHead=prev;
@@ -398,7 +399,6 @@ static void DtmInitialize()
398399
staticvoid
399400
DtmXactCallback(XactEventevent,void*arg)
400401
{
401-
//XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
402402
switch (event)
403403
{
404404
caseXACT_EVENT_START:
@@ -427,7 +427,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
427427
x->snapshot=dtm_get_csn();
428428
x->gtid.xid=InvalidTransactionId;
429429
LWLockRelease(dtm->hashLock);
430-
DTM_TRACE((stderr,"DtmLocalTransaction: transaction %u uses local snapshot %lu\n",x->xid,x->snapshot));
430+
DTM_TRACE("DtmLocalTransaction: transaction %u uses local snapshot %lu\n",x->xid,x->snapshot);
431431
}
432432
}
433433

@@ -438,6 +438,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
438438
staticvoidDtmPrepareTransaction(DtmCurrentTrans*x)
439439
{
440440
DtmTransState*ts;
441+
boolfound;
441442
inti;
442443

443444
if (!x->isDistributed) {
@@ -448,8 +449,9 @@ static void DtmPrepareTransaction(DtmCurrentTrans* x)
448449
x->xid=GetCurrentTransactionId();
449450
}
450451
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
451-
ts=hash_search(xid2state,&x->xid,HASH_ENTER,NULL);
452-
ts->snapshot=x->isReplicated ?x->snapshot :INVALID_CSN;
452+
ts=hash_search(xid2state,&x->xid,HASH_ENTER,&found);
453+
Assert(!found);
454+
ts->snapshot=x->isReplicated ?INVALID_CSN :x->snapshot;
453455
ts->status=TRANSACTION_STATUS_UNKNOWN;
454456
ts->csn=dtm_get_csn();
455457
ts->procno=MyProc->pgprocno;
@@ -475,6 +477,24 @@ DtmEndTransaction(DtmCurrentTrans* x)
475477
x->gtid.xid=InvalidTransactionId;
476478
}
477479

480+
staticvoid
481+
SendNotificationMessage(DtmTransState*ts)
482+
{
483+
DtmTransState*votingList;
484+
485+
SpinLockAcquire(&dtm->votingSpinlock);
486+
votingList=dtm->votingTransactions;
487+
ts->nextVoting=votingList;
488+
dtm->votingTransactions=ts;
489+
SpinLockRelease(&dtm->votingSpinlock);
490+
DTM_TRACE("Register commit message\n");
491+
if (votingList==NULL) {
492+
/* singal semaphore only once for the whole list */
493+
DTM_TRACE("Signal semaphore\n");
494+
PGSemaphoreUnlock(&dtm->votingSemaphore);
495+
}
496+
}
497+
478498
staticXidStatus
479499
DtmCommitTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids)
480500
{
@@ -524,12 +544,16 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
524544

525545
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
526546
ts=hash_search(xid2state,&xid,HASH_FIND,NULL);
527-
Assert(ts!=NULL);/* should be created by DtmPrepareTransaction */
528-
ts->status=status;
529-
for (i=0;i<nsubxids;i++) {
530-
ts=ts->next;
547+
if (ts!=NULL) {/* should be created by DtmPrepareTransaction */
531548
ts->status=status;
532-
}
549+
for (i=0;i<nsubxids;i++) {
550+
ts=ts->next;
551+
ts->status=status;
552+
}
553+
if (dtmTx.isReplicated) {
554+
SendNotificationMessage(ts);
555+
}
556+
}
533557
LWLockRelease(dtm->hashLock);
534558
}
535559

@@ -538,19 +562,18 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
538562
staticvoid
539563
DtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
540564
{
541-
DTM_INFO("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
542-
if (dtmTx.isDistributed)
565+
DTM_TRACE("%d: DtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
566+
if (xid==dtmTx.xid&&dtmTx.isDistributed)
543567
{
544-
Assert(xid==dtmTx.xid);
545568
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML)
546569
{
547570
DtmFinishTransaction(xid,nsubxids,subxids,status);
548-
DTM_INFO("Abort transaction %d\n",xid);
571+
DTM_TRACE("Abort transaction %d\n",xid);
549572
}
550573
else
551574
{
552575
if (DtmCommitTransaction(xid,nsubxids,subxids)==TRANSACTION_STATUS_COMMITTED) {
553-
DTM_INFO("Commit transaction %d\n",xid);
576+
DTM_TRACE("Commit transaction %d\n",xid);
554577
}else {
555578
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,TRANSACTION_STATUS_ABORTED,lsn);
556579
dtmTx.isDistributed= false;
@@ -643,7 +666,7 @@ _PG_init(void)
643666
);
644667

645668
DefineCustomIntVariable(
646-
"multimaster.arpiter_port",
669+
"multimaster.arbiter_port",
647670
"Base value for assigning arbiter ports",
648671
NULL,
649672
&MMArbiterPort,
@@ -990,42 +1013,29 @@ MMPoolConstructor(void)
9901013
return&dtm->pool;
9911014
}
9921015

993-
staticvoid
994-
SendCommitMessage(DtmTransState*ts)
995-
{
996-
DtmTransState*votingList;
997-
998-
SpinLockAcquire(&dtm->votingSpinlock);
999-
votingList=dtm->votingTransactions;
1000-
ts->nextVoting=votingList;
1001-
dtm->votingTransactions=ts;
1002-
SpinLockRelease(&dtm->votingSpinlock);
1003-
1004-
if (votingList==NULL) {
1005-
/* singal semaphreo only once for the whole list */
1006-
PGSemaphoreUnlock(&dtm->votingSemaphore);
1007-
}
1008-
}
1009-
10101016
staticvoid
10111017
MMVoteForTransaction(DtmTransState*ts)
10121018
{
10131019
LWLockRelease(dtm->hashLock);
10141020
if (ts->gtid.node==MMNodeId) {
10151021
/* I am coordinator: wait responses from all replicas for transaction replicated using logical decoding */
1022+
DTM_TRACE("Coordinator waiting latch...\n");
10161023
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
10171024
ResetLatch(&MyProc->procLatch);
1025+
DTM_TRACE("Coordinator receive %d votes\n",ts->nVotes);
10181026
Assert(ts->nVotes==dtm->nNodes);
10191027

10201028
/* ... and then send notifications to replicas */
1021-
SendCommitMessage(ts);
1029+
SendNotificationMessage(ts);
10221030
}else {
10231031
/* I am replica: first notify coordinator... */
10241032
ts->nVotes=dtm->nNodes-1;/* I just need one confirmation from coordinator */
1025-
SendCommitMessage(ts);
1033+
SendNotificationMessage(ts);
10261034
/* ... and wait response from it */
1035+
DTM_TRACE("Node %d waiting latch...\n",MMNodeId);
10271036
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
10281037
ResetLatch(&MyProc->procLatch);
1038+
DTM_TRACE("Node %d receive response...\n",MMNodeId);
10291039
}
10301040
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
10311041
}
@@ -1034,6 +1044,7 @@ HTAB* MMCreateHash(void)
10341044
{
10351045
HASHCTLinfo;
10361046
HTAB*htab;
1047+
Assert(MMNodes>0);
10371048
memset(&info,0,sizeof(info));
10381049
info.keysize=sizeof(TransactionId);
10391050
info.entrysize=sizeof(DtmTransState)+ (MMNodes-1)*sizeof(TransactionId);

‎multimaster.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
#include"bytebuf.h"
55
#include"bgwpool.h"
66

7-
#defineDTM_TRACE(fmt, ...)
8-
/*#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__) */
9-
#defineDTM_INFO(fmt, ...)
7+
#defineDTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8+
//#defineDTM_TRACE(fmt, ...)
9+
#defineDTM_TUPLE_TRACE(fmt, ...)
1010

1111
#defineBIT_SET(mask,bit) ((mask) & ((int64)1 << (bit)))
1212

‎pglogical_apply.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,10 @@ process_remote_begin(StringInfo s)
330330
gtid.node=pq_getmsgint(s,4);
331331
gtid.xid=pq_getmsgint(s,4);
332332
snapshot=pq_getmsgint64(s);
333-
MMJoinTransaction(&gtid,snapshot);
334333
SetCurrentStatementStartTimestamp();
335334
StartTransactionCommand();
335+
MMJoinTransaction(&gtid,snapshot);
336+
fprintf(stderr,"REMOTE begin node=%d xid=%d snapshot=%ld\n",gtid.node,gtid.xid,snapshot);
336337
}
337338

338339
staticvoid

‎pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
105105
{
106106
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
107107
csn_tcsn=MMTransactionSnapshot(txn->xid);
108+
fprintf(stderr,"pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
108109
if (csn==INVALID_CSN) {
109110
mm->isLocal= true;
110111
}else {

‎tests/dtmbench

-26.3 KB
Binary file not shown.

‎tests/dtmbench.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void* writer(void* arg)
179179
voidinitializeDatabase()
180180
{
181181
connectionconn(cfg.connections[0]);
182-
182+
#if0
183183
printf("creating extension\n");
184184
{
185185
nontransaction txn(conn);
@@ -197,7 +197,7 @@ void initializeDatabase()
197197
txn.commit();
198198
}
199199
printf("table t created\n");
200-
200+
#endif
201201
printf("inserting stuff into t\n");
202202
{
203203
worktxn(conn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp