Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit33e709e

Browse files
committed
Update multimaster
1 parent96f798c commit33e709e

File tree

13 files changed

+766
-179
lines changed

13 files changed

+766
-179
lines changed

‎contrib/multimaster/decoder_raw.c‎

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,20 @@ decoder_raw_shutdown(LogicalDecodingContext *ctx)
9999
}
100100

101101
/* BEGIN callback */
102+
staticTransactionIdlastXid;
103+
102104
staticvoid
103105
decoder_raw_begin_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn)
104106
{
105107
DecoderRawData*data=ctx->output_plugin_private;
106-
108+
Assert(lastXid!=txn->xid);
109+
lastXid=txn->xid;
107110
if (MMIsLocalTransaction(txn->xid)) {
111+
XTM_INFO("Skip local transaction %u\n",txn->xid);
108112
data->isLocal= true;
109113
}else {
110114
OutputPluginPrepareWrite(ctx, true);
111-
elog(WARNING,"Sendtransation to%u to replica",txn->xid);
115+
XTM_INFO("Sendtransaction%u to replica\n",txn->xid);
112116
appendStringInfo(ctx->out,"BEGIN %u;",txn->xid);
113117
OutputPluginWrite(ctx, true);
114118
data->isLocal= false;
@@ -122,10 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
122126
{
123127
DecoderRawData*data=ctx->output_plugin_private;
124128
if (!data->isLocal) {
125-
elog(WARNING,"Send commit of %u to replica",txn->xid);
129+
XTM_INFO("Send commit oftransaction%u to replica\n",txn->xid);
126130
OutputPluginPrepareWrite(ctx, true);
127131
appendStringInfoString(ctx->out,"COMMIT;");
128132
OutputPluginWrite(ctx, true);
133+
}else {
134+
XTM_INFO("Skip commit of transaction %u\n",txn->xid);
129135
}
130136
}
131137

@@ -291,7 +297,7 @@ print_where_clause(StringInfo s,
291297
intkey;
292298

293299
/* Use all the values associated with the index */
294-
indexRel=index_open(relation->rd_replidindex,ShareLock);
300+
indexRel=index_open(relation->rd_replidindex,AccessShareLock);
295301
for (key=0;key<indexRel->rd_index->indnatts;key++)
296302
{
297303
intrelattr=indexRel->rd_index->indkey.values[key];
@@ -477,9 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477483

478484
data=ctx->output_plugin_private;
479485
if (data->isLocal) {
486+
XTM_INFO("Skip action %d in transaction %u\n",change->action,txn->xid);
480487
return;
481488
}
482-
elog(WARNING,"Send action %d in transaction%u to replica",change->action,txn->xid);
489+
XTM_INFO("Send action %d in transaction %u to replica\n",change->action,txn->xid);
483490

484491
/* Avoid leaking memory by using and resetting our own context */
485492
old=MemoryContextSwitchTo(data->context);

‎contrib/multimaster/dtmd/src/main.c‎

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -357,13 +357,13 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
357357
// Check the arguments
358358
xid_txid=argv[1];
359359
boolwait=argv[2];
360-
360+
#if0
361361
CHECK(
362362
CLIENT_XID(client)==xid,
363363
client,
364364
"VOTE: voting for a transaction not participated in"
365365
);
366-
366+
#endif
367367
Transaction*t=find_transaction(xid);
368368
if (t==NULL) {
369369
shout(
@@ -392,6 +392,10 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
392392
client,
393393
"VOTE: transaction failed to abort O_o"
394394
);
395+
shout(
396+
"[%d] VOTE: abort xid %u\n",
397+
CLIENT_ID(client),xid
398+
);
395399

396400
notify_listeners(t,NEGATIVE);
397401
free_transaction(t);
@@ -427,45 +431,43 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
427431
}
428432

429433
staticvoidonsnapshot(client_tclient,intargc,xid_t*argv) {
434+
Snapshotsnapshot_now;
430435
CHECK(
431436
argc==2,
432437
client,
433438
"SNAPSHOT: wrong number of arguments"
434439
);
435440

436441
xid_txid=argv[1];
437-
442+
Snapshot*snap;
438443
Transaction*t=find_transaction(xid);
439444
if (t==NULL) {
440445
shout(
441-
"[%d] SNAPSHOT: xid %u not found\n",
446+
"[%d] SNAPSHOT: xid %u not found: use curent snapshot\n",
442447
CLIENT_ID(client),xid
443448
);
444-
client_message_shortcut(client,RES_FAILED);
445-
return;
446-
}
447-
448-
if (CLIENT_XID(client)==INVALID_XID) {
449-
CLIENT_SNAPSENT(client)=0;
450-
CLIENT_XID(client)=t->xid;
451-
}
452-
453-
CHECK(
454-
CLIENT_XID(client)==t->xid,
455-
client,
456-
"SNAPSHOT: getting snapshot for a transaction not participated in"
457-
);
458-
459-
assert(CLIENT_SNAPSENT(client) <=t->snapshots_count);// who sent an inexistent snapshot?!
460-
461-
if (CLIENT_SNAPSENT(client)==t->snapshots_count) {
462-
// a fresh snapshot is needed
463-
gen_snapshot(transaction_next_snapshot(t));
464-
}
465-
466-
Snapshot*snap=transaction_snapshot(t,CLIENT_SNAPSENT(client)++);
467-
snap->times_sent+=1;// FIXME: does times_sent get used anywhere? see also 4765234987
468-
449+
gen_snapshot(&snapshot_now);
450+
snap=&snapshot_now;
451+
}else {
452+
if (CLIENT_XID(client)==INVALID_XID) {
453+
CLIENT_SNAPSENT(client)=0;
454+
CLIENT_XID(client)=t->xid;
455+
}
456+
CHECK(
457+
CLIENT_XID(client)==t->xid,
458+
client,
459+
"SNAPSHOT: getting snapshot for a transaction not participated in"
460+
);
461+
assert(CLIENT_SNAPSENT(client) <=t->snapshots_count);// who sent an inexistent snapshot?!
462+
463+
if (CLIENT_SNAPSENT(client)==t->snapshots_count) {
464+
// a fresh snapshot is needed
465+
gen_snapshot(transaction_next_snapshot(t));
466+
}
467+
468+
snap=transaction_snapshot(t,CLIENT_SNAPSENT(client)++);
469+
snap->times_sent+=1;// FIXME: does times_sent get used anywhere? see also 4765234987
470+
}
469471
xid_tok=RES_OK;
470472
client_message_start(client); {
471473
client_message_append(client,sizeof(xid_t),&ok);

‎contrib/multimaster/dtmd/src/transaction.c‎

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ typedef struct list_node_t {
1111

1212
inttransaction_status(Transaction*t) {
1313
assert(t->votes_for+t->votes_against <=t->size);
14-
14+
#if0/* report ABORTED status immediately: do not wait for all responses */
1515
if (t->votes_for+t->votes_against<t->size) {
1616
returnDOUBT;
1717
}
18-
19-
if (t->votes_against) {
20-
returnNEGATIVE;
21-
}else {
22-
returnPOSITIVE;
23-
}
18+
#endif
19+
returnt->votes_against
20+
?NEGATIVE
21+
: (t->votes_for==t->size)
22+
?POSITIVE
23+
:DOUBT;
2424
}
2525

2626
voidtransaction_clear(Transaction*t) {

‎contrib/multimaster/init.sql‎

Lines changed: 0 additions & 2 deletions
This file was deleted.

‎contrib/multimaster/multimaster.c‎

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ static char* DtmHost;
148148
staticintDtmPort;
149149
staticintDtmBufferSize;
150150

151-
staticExecutorRun_hook_typePreviousExecutorRunHook=NULL;
152-
staticvoidMMExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,longcount);
151+
staticExecutorFinish_hook_typePreviousExecutorFinishHook=NULL;
152+
staticvoidMMExecutorFinish(QueryDesc*queryDesc);
153153
staticboolMMIsDistributedTrans;
154154

155155
staticBackgroundWorkerDtmWorker= {
@@ -160,9 +160,6 @@ static BackgroundWorker DtmWorker = {
160160
DtmBackgroundWorker
161161
};
162162

163-
#defineXTM_TRACE(fmt, ...)
164-
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
165-
//#define XTM_INFO(fmt, ...)
166163

167164
staticvoidDumpSnapshot(Snapshots,char*name)
168165
{
@@ -235,7 +232,10 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
235232
TransactionIdxid;
236233
Snapshotsrc=&DtmSnapshot;
237234

238-
Assert(TransactionIdIsValid(src->xmin)&&TransactionIdIsValid(src->xmax));
235+
if (!(TransactionIdIsValid(src->xmin)&&TransactionIdIsValid(src->xmax))) {
236+
PgGetSnapshotData(dst);
237+
return;
238+
}
239239

240240
GetLocalSnapshot:
241241
/*
@@ -667,8 +667,11 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
667667
hash_search(xid_in_doubt,&DtmNextXid,HASH_ENTER,NULL);
668668
LWLockRelease(dtm->hashLock);
669669
if (DtmGlobalSetTransStatus(xid,status, true)!=status) {
670-
END_CRIT_SECTION();
670+
DtmNextXid=InvalidTransactionId;
671+
DtmLastSnapshot=NULL;
672+
MMIsDistributedTrans= false;
671673
MarkAsAborted();
674+
END_CRIT_SECTION();
672675
elog(ERROR,"Transaction commit rejected by XTM");
673676
}
674677
XTM_INFO("Commit transaction %d\n",xid);
@@ -763,7 +766,10 @@ DtmXactCallback(XactEvent event, void *arg)
763766
caseXACT_EVENT_PRE_COMMIT:
764767
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
765768
if (!MMIsDistributedTrans&&TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
769+
XTM_INFO("%d: Will ignore transaction %u\n",getpid(),GetCurrentTransactionIdIfAny());
766770
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
771+
}else {
772+
XTM_INFO("%d: Transaction %u will be replicated\n",getpid(),GetCurrentTransactionIdIfAny());
767773
}
768774
break;
769775
caseXACT_EVENT_COMMIT:
@@ -794,9 +800,9 @@ DtmXactCallback(XactEvent event, void *arg)
794800
}
795801
DtmNextXid=InvalidTransactionId;
796802
DtmLastSnapshot=NULL;
797-
MMIsDistributedTrans= false;
798-
break;
799-
}
803+
}
804+
MMIsDistributedTrans= false;
805+
break;
800806
default:
801807
break;
802808
}
@@ -933,8 +939,8 @@ _PG_init(void)
933939
prev_shmem_startup_hook=shmem_startup_hook;
934940
shmem_startup_hook=DtmShmemStartup;
935941

936-
PreviousExecutorRunHook=ExecutorRun_hook;
937-
ExecutorRun_hook=MMExecutorRun;
942+
PreviousExecutorFinishHook=ExecutorFinish_hook;
943+
ExecutorFinish_hook=MMExecutorFinish;
938944
}
939945

940946
/*
@@ -944,7 +950,7 @@ void
944950
_PG_fini(void)
945951
{
946952
shmem_startup_hook=prev_shmem_startup_hook;
947-
ExecutorRun_hook=PreviousExecutorRunHook;
953+
ExecutorFinish_hook=PreviousExecutorFinishHook;
948954
}
949955

950956

@@ -1021,6 +1027,7 @@ bool MMIsLocalTransaction(TransactionId xid)
10211027
lt=hash_search(local_trans,&xid,HASH_FIND,NULL);
10221028
if (lt!=NULL) {
10231029
result= true;
1030+
Assert(lt->count>0);
10241031
if (--lt->count==0) {
10251032
hash_search(local_trans,&xid,HASH_REMOVE,NULL);
10261033
}
@@ -1154,19 +1161,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
11541161
}
11551162

11561163
staticvoid
1157-
MMExecutorRun(QueryDesc*queryDesc,ScanDirectiondirection,longcount)
1164+
MMExecutorFinish(QueryDesc*queryDesc)
11581165
{
11591166
if (MMDoReplication) {
11601167
CmdTypeoperation=queryDesc->operation;
1161-
MMIsDistributedTrans |=operation==CMD_INSERT||operation==CMD_UPDATE||operation==CMD_DELETE;
1168+
EState*estate=queryDesc->estate;
1169+
if (estate->es_processed!=0) {
1170+
MMIsDistributedTrans |=operation==CMD_INSERT||operation==CMD_UPDATE||operation==CMD_DELETE;
1171+
}
11621172
}
1163-
if (PreviousExecutorRunHook!=NULL)
1173+
if (PreviousExecutorFinishHook!=NULL)
11641174
{
1165-
PreviousExecutorRunHook(queryDesc,direction,count);
1175+
PreviousExecutorFinishHook(queryDesc);
11661176
}
11671177
else
11681178
{
1169-
standard_ExecutorRun(queryDesc,direction,count);
1179+
standard_ExecutorFinish(queryDesc);
11701180
}
11711181
}
11721182

‎contrib/multimaster/multimaster.h‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
#ifndef__MULTIMASTER_H__
22
#define__MULTIMASTER_H__
33

4+
#defineXTM_TRACE(fmt, ...)
5+
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6+
//#define XTM_INFO(fmt, ...)
7+
48
externintMMStartReceivers(char*nodes,intnode_id);
59
externvoidMMBeginTransaction(void);
610
externvoidMMJoinTransaction(TransactionIdxid);

‎contrib/multimaster/receiver_raw.c‎

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ receiver_raw_main(Datum main_arg)
210210
PGconn*conn;
211211
PGresult*res;
212212
boolinsideTrans= false;
213-
213+
boolrollbackTransaction= false;
214+
214215
/* Register functions for SIGTERM/SIGHUP management */
215216
pqsignal(SIGHUP,receiver_raw_sighup);
216217
pqsignal(SIGTERM,receiver_raw_sigterm);
@@ -414,37 +415,47 @@ receiver_raw_main(Datum main_arg)
414415
intrc=sscanf(stmt+6,"%u",&xid);
415416
Assert(rc==1);
416417
Assert(!insideTrans);
417-
elog(WARNING,"Receiver begin transaction %u",xid);
418418
SetCurrentStatementStartTimestamp();
419+
MMJoinTransaction(xid);
419420
StartTransactionCommand();
420421
SPI_connect();
421422
PushActiveSnapshot(GetTransactionSnapshot());
422-
MMJoinTransaction(xid);
423423
insideTrans= true;
424+
rollbackTransaction= false;
424425
}elseif (strncmp(stmt,"COMMIT;",7)==0) {
425-
elog(WARNING,"Receiver commit transaction");
426426
Assert(insideTrans);
427427
insideTrans= false;
428428
SPI_finish();
429429
PopActiveSnapshot();
430-
CommitTransactionCommand();
431-
}else {
430+
if (rollbackTransaction) {
431+
AbortCurrentTransaction();
432+
}else {
433+
CommitTransactionCommand();
434+
}
435+
}elseif (!rollbackTransaction) {
432436
Assert(insideTrans);
433437
/* Execute query */
434-
rc=SPI_execute(stmt, false,0);
435-
436-
if (rc==SPI_OK_INSERT)
437-
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
438-
worker_name,stmt)));
439-
elseif (rc==SPI_OK_UPDATE)
440-
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
441-
worker_name,stmt)));
442-
elseif (rc==SPI_OK_DELETE)
443-
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
444-
worker_name,stmt)));
445-
else
446-
ereport(LOG, (errmsg("%s: Error when applying change: %s",
447-
worker_name,stmt)));
438+
PG_TRY();
439+
{
440+
rc=SPI_execute(stmt, false,0);
441+
if (rc==SPI_OK_INSERT)
442+
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
443+
worker_name,stmt)));
444+
elseif (rc==SPI_OK_UPDATE)
445+
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
446+
worker_name,stmt)));
447+
elseif (rc==SPI_OK_DELETE)
448+
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
449+
worker_name,stmt)));
450+
else
451+
ereport(LOG, (errmsg("%s: Error when applying change: %s",
452+
worker_name,stmt)));
453+
}
454+
PG_CATCH();
455+
{
456+
rollbackTransaction= true;
457+
}
458+
PG_END_TRY();
448459
}
449460
/* Update written position */
450461
output_written_lsn=Max(walEnd,output_written_lsn);

‎contrib/multimaster/tests/core‎

-90.7 MB
Binary file not shown.

‎contrib/multimaster/tests/dtmbench‎

-46.8 KB
Binary file not shown.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp