Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd0e0458

Browse files
knizhnikkelvich
authored andcommitted
Continue work on MMTS
1 parentba48376 commitd0e0458

File tree

9 files changed

+58
-35
lines changed

9 files changed

+58
-35
lines changed

‎arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ static void MtmAppendBuffer(MessageCode code, MtmBuffer* txBuffer, TransactionId
334334
MtmWriteSocket(sockets[node],buf->data,buf->used*sizeof(MtmCommitMessage));
335335
buf->used=0;
336336
}
337-
DTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
337+
MTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
338338
messageText[code],ts->csn,node,MtmNodeId,ts->gtid.xid,ts->xid);
339339
buf->data[buf->used].code=code;
340340
buf->data[buf->used].dxid=xid;

‎decoder_raw.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
108108
Assert(lastXid!=txn->xid);
109109
lastXid=txn->xid;
110110
if (MMIsLocalTransaction(txn->xid)) {
111-
XTM_INFO("Skip local transaction %u\n",txn->xid);
111+
MTM_INFO("Skip local transaction %u\n",txn->xid);
112112
data->isLocal= true;
113113
}else {
114114
OutputPluginPrepareWrite(ctx, true);
115-
XTM_INFO("Send transaction %u to replica\n",txn->xid);
115+
MTM_INFO("Send transaction %u to replica\n",txn->xid);
116116
appendStringInfo(ctx->out,"BEGIN %u;",txn->xid);
117117
OutputPluginWrite(ctx, true);
118118
data->isLocal= false;
@@ -126,12 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
126126
{
127127
DecoderRawData*data=ctx->output_plugin_private;
128128
if (!data->isLocal) {
129-
XTM_INFO("Send commit of transaction %u to replica\n",txn->xid);
129+
MTM_INFO("Send commit of transaction %u to replica\n",txn->xid);
130130
OutputPluginPrepareWrite(ctx, true);
131131
appendStringInfoString(ctx->out,"COMMIT;");
132132
OutputPluginWrite(ctx, true);
133133
}else {
134-
XTM_INFO("Skip commit of transaction %u\n",txn->xid);
134+
MTM_INFO("Skip commit of transaction %u\n",txn->xid);
135135
}
136136
}
137137

@@ -483,10 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
483483

484484
data=ctx->output_plugin_private;
485485
if (data->isLocal) {
486-
XTM_INFO("Skip action %d in transaction %u\n",change->action,txn->xid);
486+
MTM_INFO("Skip action %d in transaction %u\n",change->action,txn->xid);
487487
return;
488488
}
489-
XTM_INFO("Send action %d in transaction %u to replica\n",change->action,txn->xid);
489+
MTM_INFO("Send action %d in transaction %u to replica\n",change->action,txn->xid);
490490

491491
/* Avoid leaking memory by using and resetting our own context */
492492
old=MemoryContextSwitchTo(data->context);

‎multimaster.c

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ static char const* MtmGetName(void)
194194
SnapshotMtmGetSnapshot(Snapshotsnapshot)
195195
{
196196
snapshot=PgGetSnapshotData(snapshot);
197-
RecentGlobalDataXmin=RecentGlobalXmin=MtmAdjustOldestXid(RecentGlobalDataXmin);
197+
RecentGlobalDataXmin=RecentGlobalXmin=dtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
198198
returnsnapshot;
199199
}
200200

@@ -499,10 +499,8 @@ void MtmSendNotificationMessage(MtmTransState* ts)
499499
ts->nextVoting=votingList;
500500
dtm->votingTransactions=ts;
501501
SpinLockRelease(&dtm->votingSpinlock);
502-
MTM_TRACE("Register commit message\n");
503502
if (votingList==NULL) {
504503
/* singal semaphore only once for the whole list */
505-
MTM_TRACE("Signal semaphore\n");
506504
PGSemaphoreUnlock(&dtm->votingSemaphore);
507505
}
508506
}
@@ -519,9 +517,11 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
519517
MtmTransactionListAppend(ts);
520518
MtmAddSubtransactions(ts,subxids,nsubxids);
521519

520+
MtmVoteForTransaction(ts);
521+
522522
LWLockRelease(dtm->hashLock);
523523

524-
MtmVoteForTransaction(ts);
524+
MTM_TRACE("%d: MtmCommitTransaction status=%d\n",getpid(),ts->status);
525525

526526
returnts->status==TRANSACTION_STATUS_COMMITTED;
527527
}
@@ -530,15 +530,32 @@ static void
530530
MtmFinishTransaction(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus)
531531
{
532532
MtmTransState*ts;
533+
MtmCurrentTrans*x=&dtmTx;
534+
boolfound;
533535

534536
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
535-
ts=hash_search(xid2state,&xid,HASH_FIND,NULL);
536-
if (ts!=NULL) {
537+
ts=hash_search(xid2state,&xid,HASH_ENTER,&found);
538+
if (!found) {
537539
ts->status=status;
538-
MtmAdjustSubtransactions(ts);
539-
if (dtmTx.isReplicated) {
540-
MtmSendNotificationMessage(ts);
540+
ts->csn=MtmAssignCSN();
541+
ts->procno=MyProc->pgprocno;
542+
ts->snapshot=INVALID_CSN;
543+
if (!TransactionIdIsValid(x->gtid.xid))
544+
{
545+
ts->gtid.xid=x->xid;
546+
ts->gtid.node=MtmNodeId;
547+
}else {
548+
ts->gtid=x->gtid;
541549
}
550+
MtmTransactionListAppend(ts);
551+
MtmAddSubtransactions(ts,subxids,nsubxids);
552+
}
553+
ts->status=status;
554+
MtmAdjustSubtransactions(ts);
555+
556+
if (dtmTx.isReplicated) {
557+
ts->gtid=x->gtid;
558+
MtmSendNotificationMessage(ts);
542559
}
543560
LWLockRelease(dtm->hashLock);
544561
}
@@ -548,13 +565,13 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
548565
staticvoid
549566
MtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
550567
{
551-
MTM_TRACE("%d: MtmSetTransactionStatus %u = %u\n",getpid(),xid,status);
568+
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n",getpid(),xid,dtmTx.xid,status,dtmTx.isDistributed);
552569
if (xid==dtmTx.xid&&dtmTx.isDistributed)
553570
{
554571
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML)
555572
{
556573
MtmFinishTransaction(xid,nsubxids,subxids,status);
557-
MTM_TRACE("Abort transaction %d\n",xid);
574+
MTM_TRACE("Abort transaction %d, status=%d, DML=%d\n",xid,status,dtmTx.containsDML);
558575
}
559576
else
560577
{
@@ -1006,10 +1023,14 @@ MtmVoteForTransaction(MtmTransState* ts)
10061023
MtmSendNotificationMessage(ts);/* send READY message to coordinator */
10071024
}
10081025

1009-
MTM_TRACE("Node %d waiting latch...\n",MtmNodeId);
1010-
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
1011-
ResetLatch(&MyProc->procLatch);
1012-
MTM_TRACE("Node %d receives response...\n",MtmNodeId);
1026+
MTM_TRACE("%d: Node %d waiting latch...\n",getpid(),MtmNodeId);
1027+
while (ts->status!=TRANSACTION_STATUS_COMMITTED&&ts->status!=TRANSACTION_STATUS_ABORTED) {
1028+
LWLockRelease(dtm->hashLock);
1029+
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
1030+
ResetLatch(&MyProc->procLatch);
1031+
LWLockAcquire(dtm->hashLock,LW_SHARED);
1032+
}
1033+
MTM_TRACE("%d: Node %d receives response...\n",getpid(),MtmNodeId);
10131034
}
10141035

10151036
HTAB*MtmCreateHash(void)

‎multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
#include"bytebuf.h"
55
#include"bgwpool.h"
66

7-
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
7+
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
88
//#define MTM_TRACE(fmt, ...)
9+
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
910
#defineMTM_TUPLE_TRACE(fmt, ...)
1011

1112
#defineBIT_SET(mask,bit) ((mask) & ((int64)1 << (bit)))

‎pglogical_apply.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include<unistd.h>
12
#include"postgres.h"
23

34
#include"funcapi.h"
@@ -333,7 +334,8 @@ process_remote_begin(StringInfo s)
333334
SetCurrentStatementStartTimestamp();
334335
StartTransactionCommand();
335336
MtmJoinTransaction(&gtid,snapshot);
336-
fprintf(stderr,"REMOTE begin node=%d xid=%d snapshot=%ld\n",gtid.node,gtid.xid,snapshot);
337+
338+
MTM_TRACE("REMOTE begin node=%d xid=%d snapshot=%ld\n",gtid.node,gtid.xid,snapshot);
337339
}
338340

339341
staticvoid
@@ -815,6 +817,7 @@ void MtmExecutor(int id, void* work, size_t size)
815817
PG_CATCH();
816818
{
817819
FlushErrorState();
820+
MTM_TRACE("%d: REMOTE abort transaction %d\n",getpid(),GetCurrentTransactionId());
818821
AbortCurrentTransaction();
819822
}
820823
PG_END_TRY();

‎pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
105105
{
106106
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
107107
csn_tcsn=MtmTransactionSnapshot(txn->xid);
108-
fprintf(stderr,"pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
108+
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n",txn->xid,csn);
109109
if (csn==INVALID_CSN) {
110110
mm->isLocal= true;
111111
}else {

‎pglogical_receiver.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ pglogical_receiver_main(Datum main_arg)
216216
BackgroundWorkerUnblockSignals();
217217

218218
/* Connect to a database */
219-
BackgroundWorkerInitializeConnection(MMDatabaseName,NULL);
219+
BackgroundWorkerInitializeConnection(MtmDatabaseName,NULL);
220220

221221
/* Establish connection to remote server */
222222
conn=PQconnectdb(args->receiver_conn_string);
@@ -260,7 +260,7 @@ pglogical_receiver_main(Datum main_arg)
260260
PQclear(res);
261261
resetPQExpBuffer(query);
262262

263-
MMReceiverStarted();
263+
MtmReceiverStarted();
264264
ByteBufferAlloc(&buf);
265265

266266
while (!got_sigterm)
@@ -392,7 +392,7 @@ pglogical_receiver_main(Datum main_arg)
392392
ByteBufferAppend(&buf,stmt,rc-hdr_len);
393393
if (stmt[0]=='C')/* commit */
394394
{
395-
MMExecute(buf.data,buf.used);
395+
MtmExecute(buf.data,buf.used);
396396
ByteBufferReset(&buf);
397397
}
398398
#else
@@ -511,7 +511,7 @@ pglogical_receiver_main(Datum main_arg)
511511
}
512512

513513

514-
intMMStartReceivers(char*conns,intnode_id)
514+
intMtmStartReceivers(char*conns,intnode_id)
515515
{
516516
inti=0;
517517
BackgroundWorkerworker;
@@ -530,17 +530,17 @@ int MMStartReceivers(char* conns, int node_id)
530530
}
531531
if (++i!=node_id) {
532532
ReceiverArgs*ctx= (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
533-
if (MMDatabaseName==NULL) {
533+
if (MtmDatabaseName==NULL) {
534534
char*dbname=strstr(conn_str,"dbname=");
535535
char*eon;
536536
intlen;
537537
Assert(dbname!=NULL);
538538
dbname+=7;
539539
eon=strchr(dbname,' ');
540540
len=eon-dbname;
541-
MMDatabaseName= (char*)malloc(len+1);
542-
memcpy(MMDatabaseName,dbname,len);
543-
MMDatabaseName[len]='\0';
541+
MtmDatabaseName= (char*)malloc(len+1);
542+
memcpy(MtmDatabaseName,dbname,len);
543+
MtmDatabaseName[len]='\0';
544544
}
545545
ctx->receiver_conn_string=psprintf("replication=database %.*s", (int)(p-conn_str),conn_str);
546546
sprintf(ctx->receiver_slot,"mm_slot_%d",node_id);

‎tests/dtmbench

-9.36 KB
Binary file not shown.

‎tests/dtmbench.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ void initializeDatabase()
184184
nontransactiontxn(conn);
185185
exec(txn,"drop extension if exists multimaster");
186186
exec(txn,"create extension multimaster");
187-
txn.commit();
188187
}
189188
printf("extension created\n");
190189

@@ -193,7 +192,6 @@ void initializeDatabase()
193192
nontransactiontxn(conn);
194193
exec(txn,"drop table if exists t");
195194
exec(txn,"create table t(u int primary key, v int)");
196-
txn.commit();
197195
}
198196
printf("table t created\n");
199197
printf("inserting stuff into t\n");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp