Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1161aab

Browse files
knizhnikkelvich
authored andcommitted
Support read-committed isolation level
1 parentc1785c0 commit1161aab

File tree

5 files changed

+49
-5
lines changed

5 files changed

+49
-5
lines changed

‎multimaster.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,22 @@ csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_
504504
returnsnapshot;
505505
}
506506

507+
voidMtmSetSnapshot(csn_tglobalSnapshot)
508+
{
509+
MtmLock(LW_EXCLUSIVE);
510+
MtmSyncClock(globalSnapshot);
511+
MtmTx.snapshot=globalSnapshot;
512+
MtmUnlock();
513+
}
507514

515+
508516
SnapshotMtmGetSnapshot(Snapshotsnapshot)
509517
{
510518
snapshot=PgGetSnapshotData(snapshot);
519+
if (XactIsoLevel==XACT_READ_COMMITTED&&MtmTx.snapshot!=INVALID_CSN&&TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
520+
MtmTx.snapshot=MtmGetCurrentTime();
521+
LogLogicalMessage("S", (char*)&MtmTx.snapshot,sizeof(MtmTx.snapshot), true);
522+
}
511523
RecentGlobalDataXmin=RecentGlobalXmin=Mtm->oldestXid;
512524
returnsnapshot;
513525
}
@@ -540,7 +552,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
540552

541553
Assert(xid!=InvalidTransactionId);
542554

543-
if (!MtmUseDtm) {
555+
if (!MtmUseDtm||TransactionIdPrecedes(xid,Mtm->oldestXid)) {
544556
returnPgXidInMVCCSnapshot(xid,snapshot);
545557
}
546558
MtmLock(LW_SHARED);
@@ -681,6 +693,10 @@ MtmAdjustOldestXid(TransactionId xid)
681693
hash_search(MtmGid2State,&prev->gid,HASH_REMOVE,NULL);
682694
}
683695
}
696+
if (ts!=NULL) {
697+
MTM_LOG2("Adjust(%lld) stop at snashot %lld, xid %lld, pinned=%d, oldestSnaphsot=%lld\n",
698+
(long64)xid,ts->csn, (long64)ts->xid,ts->isPinned,oldestSnapshot);
699+
}
684700
}
685701

686702
if (MtmUseDtm&& !MtmVolksWagenMode)
@@ -2827,11 +2843,13 @@ static bool ConfigIsSane(void)
28272843
{
28282844
boolok= true;
28292845

2846+
#if0
28302847
if (DefaultXactIsoLevel!=XACT_REPEATABLE_READ)
28312848
{
28322849
MTM_ELOG(WARNING,"multimaster requires default_transaction_isolation = 'repeatable read'");
28332850
ok= false;
28342851
}
2852+
#endif
28352853

28362854
if (MtmMaxNodes<1)
28372855
{
@@ -5123,11 +5141,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51235141
standard_ProcessUtility(parsetree,queryString,context,
51245142
params,dest,completionTag);
51255143
}
5126-
5144+
#if0
51275145
if (!MtmVolksWagenMode&&MtmTx.isDistributed&&XactIsoLevel!=XACT_REPEATABLE_READ) {
51285146
MTM_ELOG(ERROR,"Isolation level %s is not supported by multimaster",isoLevelStr[XactIsoLevel]);
51295147
}
5130-
5148+
#endif
51315149
if (MyXactAccessedTempRel)
51325150
{
51335151
MTM_LOG1("Xact accessed temp table, stopping replication");

‎multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,5 +424,6 @@ extern bool MtmTransIsActive(void);
424424
externMtmTransState*MtmGetActiveTransaction(MtmL2List*list);
425425
externvoidMtmReleaseLocks(void);
426426
externvoidMtmInitMessage(MtmArbiterMessage*msg,MtmMessageCodecode);
427+
externvoidMtmSetSnapshot(csn_tsnapshot);
427428

428429
#endif

‎pglogical_apply.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,15 @@ process_remote_message(StringInfo s)
483483
standalone= true;
484484
break;
485485
}
486-
487-
}
486+
case'S':
487+
{
488+
Assert(messageSize==sizeof(csn_t));
489+
MtmSetSnapshot(*(csn_t*)messageBody);
490+
break;
491+
}
492+
default:
493+
Assert(false);
494+
}
488495
returnstandalone;
489496
}
490497

‎pglogical_proto.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ pglogical_write_message(StringInfo out,
180180
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
181181
}
182182
break;
183+
case'S':
184+
if (MtmIsFilteredTxn) {
185+
return;
186+
}
187+
break;
183188
case'D':
184189
if (MtmIsFilteredTxn) {
185190
MTM_LOG2("%d: pglogical_write_message filtered",MyProcPid);

‎tests/dtmbench.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct config
6969
int updatePercent;
7070
vector<string> connections;
7171
bool scatter;
72+
bool avoidDeadlocks;
7273

7374
config() {
7475
nReaders =1;
@@ -77,6 +78,7 @@ struct config
7778
nAccounts =100000;
7879
updatePercent =100;
7980
scatter =false;
81+
avoidDeadlocks =false;
8082
}
8183
};
8284

@@ -157,6 +159,12 @@ void* writer(void* arg)
157159
if (cfg.scatter) {
158160
srcAcc = srcAcc/cfg.nWriters*cfg.nWriters + t.id;
159161
dstAcc = dstAcc/cfg.nWriters*cfg.nWriters + t.id;
162+
}elseif (cfg.avoidDeadlocks) {
163+
if (dstAcc < srcAcc) {
164+
int tmp = srcAcc;
165+
srcAcc = dstAcc;
166+
dstAcc = tmp;
167+
}
160168
}
161169
try {
162170
if (random() %100 < cfg.updatePercent) {
@@ -240,6 +248,9 @@ int main (int argc, char* argv[])
240248
case'i':
241249
initialize =true;
242250
continue;
251+
case'd':
252+
cfg.avoidDeadlocks =true;
253+
continue;
243254
}
244255
}
245256
printf("Options:\n"
@@ -249,6 +260,8 @@ int main (int argc, char* argv[])
249260
"\t-n N\tnumber of iterations (1000)\n"
250261
"\t-p N\tupdate percent (100)\n"
251262
"\t-c STR\tdatabase connection string\n"
263+
"\t-s\tscattern avoid deadlocks\n"
264+
"\t-d\tavoid deadlocks\n"
252265
"\t-i\tinitialize database\n");
253266
return1;
254267
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp