Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit25fe3cc

Browse files
committed
Update multimaster
1 parent33e709e commit25fe3cc

File tree

5 files changed

+37
-27
lines changed

5 files changed

+37
-27
lines changed

‎contrib/multimaster/dtmd/src/main.c‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ static void ondisconnect(client_t client) {
143143
}
144144
}else {
145145
shout(
146-
"[%d] DISCONNECT: transaction %u not found O_o\n",
146+
"[%d] DISCONNECT: transaction %uof disconnected client isnot found\n",
147147
CLIENT_ID(client),CLIENT_XID(client)
148148
);
149149
}
@@ -392,11 +392,12 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
392392
client,
393393
"VOTE: transaction failed to abort O_o"
394394
);
395+
#if0
395396
shout(
396397
"[%d] VOTE: abort xid %u\n",
397398
CLIENT_ID(client),xid
398399
);
399-
400+
#endif
400401
notify_listeners(t,NEGATIVE);
401402
free_transaction(t);
402403
client_message_shortcut(client,RES_TRANSACTION_ABORTED);

‎contrib/multimaster/multimaster.c‎

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
118118
staticHTAB*xid_in_doubt;
119119
staticHTAB*local_trans;
120120
staticDtmState*dtm;
121-
staticSnapshotCurrentTransactionSnapshot;
122121

123122
staticTransactionIdDtmNextXid;
124123
staticSnapshotDataDtmSnapshot= {HeapTupleSatisfiesMVCC };
@@ -609,8 +608,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
609608
{
610609
if (TransactionIdIsValid(DtmNextXid)&&snapshot!=&CatalogSnapshotData)
611610
{
612-
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid))
611+
if (!DtmHasGlobalSnapshot&& (snapshot!=DtmLastSnapshot||DtmCurcid!=snapshot->curcid)) {
613612
DtmGlobalGetSnapshot(DtmNextXid,&DtmSnapshot,&dtm->minXid);
613+
}
614614
DtmCurcid=snapshot->curcid;
615615
DtmLastSnapshot=snapshot;
616616
DtmMergeWithGlobalSnapshot(snapshot);
@@ -628,7 +628,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
628628
snapshot=PgGetSnapshotData(snapshot);
629629
}
630630
DtmUpdateRecentXmin(snapshot);
631-
CurrentTransactionSnapshot=snapshot;
632631
returnsnapshot;
633632
}
634633

@@ -651,7 +650,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
651650
{
652651
if (TransactionIdIsValid(DtmNextXid))
653652
{
654-
CurrentTransactionSnapshot=NULL;
655653
if (status==TRANSACTION_STATUS_ABORTED|| !MMIsDistributedTrans)
656654
{
657655
PgTransactionIdSetTreeStatus(xid,nsubxids,subxids,status,lsn);
@@ -662,7 +660,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
662660
else
663661
{
664662
XTM_INFO("Begin commit transaction %d\n",xid);
665-
/* Mark transaction ason-doubt in xid_in_doubt hash table */
663+
/* Mark transaction asin-doubt in xid_in_doubt hash table */
666664
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
667665
hash_search(xid_in_doubt,&DtmNextXid,HASH_ENTER,NULL);
668666
LWLockRelease(dtm->hashLock);
@@ -673,20 +671,22 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
673671
MarkAsAborted();
674672
END_CRIT_SECTION();
675673
elog(ERROR,"Transaction commit rejected by XTM");
674+
}else {
675+
XTM_INFO("Commit transaction %d\n",xid);
676676
}
677-
XTM_INFO("Commit transaction %d\n",xid);
678677
}
679678
}
680679
else
681680
{
682681
XTM_INFO("Set transaction %u status in local CLOG" ,xid);
683682
}
684683
}
685-
else
684+
elseif (status!=TRANSACTION_STATUS_ABORTED)
686685
{
687686
XidStatusgs;
688687
gs=DtmGlobalGetTransStatus(xid, false);
689688
if (gs!=TRANSACTION_STATUS_UNKNOWN) {
689+
Assert(gs!=TRANSACTION_STATUS_IN_PROGRESS);
690690
status=gs;
691691
}
692692
}
@@ -753,23 +753,21 @@ static void DtmInitialize()
753753
staticvoid
754754
DtmXactCallback(XactEventevent,void*arg)
755755
{
756-
XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n",getpid(),event,DtmNextXid);
756+
//XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
757757
switch (event)
758758
{
759759
caseXACT_EVENT_START:
760-
XTM_INFO("%d: normal=%d, initialized=%d, replication=%d, bgw=%d, vacuum=%d\n",
761-
getpid(),IsNormalProcessingMode(),dtm->initialized,MMDoReplication,IsBackgroundWorker,IsAutoVacuumWorkerProcess());
760+
//XTM_INFO("%d: normal=%d, initialized=%d, replication=%d, bgw=%d, vacuum=%d\n",
761+
// getpid(), IsNormalProcessingMode(), dtm->initialized, MMDoReplication, IsBackgroundWorker, IsAutoVacuumWorkerProcess());
762762
if (IsNormalProcessingMode()&&dtm->initialized&&MMDoReplication&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess()) {
763763
MMBeginTransaction();
764764
}
765765
break;
766766
caseXACT_EVENT_PRE_COMMIT:
767767
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
768-
if (!MMIsDistributedTrans&&TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
769-
XTM_INFO("%d: Will ignore transaction %u\n",getpid(),GetCurrentTransactionIdIfAny());
770-
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
771-
}else {
772-
XTM_INFO("%d: Transaction %u will be replicated\n",getpid(),GetCurrentTransactionIdIfAny());
768+
if (!MMIsDistributedTrans&&TransactionIdIsValid(DtmNextXid)) {
769+
XTM_INFO("%d: Will ignore transaction %u\n",getpid(),DtmNextXid);
770+
MMMarkTransAsLocal(DtmNextXid);
773771
}
774772
break;
775773
caseXACT_EVENT_COMMIT:

‎contrib/multimaster/receiver_raw.c‎

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,11 @@ receiver_raw_main(Datum main_arg)
417417
Assert(!insideTrans);
418418
SetCurrentStatementStartTimestamp();
419419
MMJoinTransaction(xid);
420+
420421
StartTransactionCommand();
422+
BeginTransactionBlock();
423+
CommitTransactionCommand();
424+
421425
SPI_connect();
422426
PushActiveSnapshot(GetTransactionSnapshot());
423427
insideTrans= true;
@@ -427,11 +431,19 @@ receiver_raw_main(Datum main_arg)
427431
insideTrans= false;
428432
SPI_finish();
429433
PopActiveSnapshot();
434+
StartTransactionCommand();
430435
if (rollbackTransaction) {
431-
AbortCurrentTransaction();
432-
}else {
436+
UserAbortTransactionBlock();
437+
}
438+
PG_TRY();
439+
{
433440
CommitTransactionCommand();
434441
}
442+
PG_CATCH();
443+
{
444+
elog(WARNING,"%s: Current transaction is aborted at receiver",worker_name);
445+
}
446+
PG_END_TRY();
435447
}elseif (!rollbackTransaction) {
436448
Assert(insideTrans);
437449
/* Execute query */
@@ -448,11 +460,12 @@ receiver_raw_main(Datum main_arg)
448460
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
449461
worker_name,stmt)));
450462
else
451-
ereport(LOG, (errmsg("%s: Error when applying change: %s",
463+
ereport(WARNING, (errmsg("%s: Error when applying change: %s",
452464
worker_name,stmt)));
453465
}
454466
PG_CATCH();
455467
{
468+
elog(WARNING,"%s: %s failed at receiver",worker_name,stmt);
456469
rollbackTransaction= true;
457470
}
458471
PG_END_TRY();
@@ -595,8 +608,4 @@ int MMStartReceivers(char* conns, int node_id)
595608
worker.bgw_main_arg= (Datum)ctx;
596609
RegisterBackgroundWorker(&worker);
597610
}
598-
conn_str=p+1;
599-
}
600-
601-
returni;
602-
}
611+
con

‎contrib/multimaster/tests/dtmbench‎

208 Bytes
Binary file not shown.

‎contrib/multimaster/tests/dtmbench.cpp‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ int main (int argc, char* argv[])
210210

211211
if (initialize) {
212212
initializeDatabase();
213-
printf("%daccount inserted\n", cfg.nAccounts);
213+
printf("%daccounts inserted\n", cfg.nAccounts);
214214
}
215215

216216
time_t start =getCurrentTime();
@@ -246,12 +246,14 @@ int main (int argc, char* argv[])
246246

247247
printf(
248248
"{\"update_tps\":%f,\"read_tps\":%f,"
249-
"\"readers\":%d,\"writers\":%d,"
249+
"\"readers\":%d,\"writers\":%d,\"aborts\":%ld,\"abort_percent\": %d,"
250250
"\"accounts\":%d,\"iterations\":%d,\"hosts\":%ld}\n",
251251
(double)(nWrites*USEC)/elapsed,
252252
(double)(nReads*USEC)/elapsed,
253253
cfg.nReaders,
254254
cfg.nWriters,
255+
nAborts,
256+
(int)(nAborts*100/cfg.nWriters),
255257
cfg.nAccounts,
256258
cfg.nIterations,
257259
cfg.connections.size()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp