Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit23aa459

Browse files
committed
Add start/stop replication functions
1 parentefe7ec4 commit23aa459

File tree

6 files changed

+88
-47
lines changed

6 files changed

+88
-47
lines changed

‎contrib/multimaster/decoder_raw.c‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ extern void_PG_output_plugin_init(OutputPluginCallbacks *cb);
3939
typedefstruct
4040
{
4141
MemoryContextcontext;
42-
boolisExternal;
42+
boolisLocal;
4343
}DecoderRawData;
4444

4545
staticvoiddecoder_raw_startup(LogicalDecodingContext*ctx,
@@ -82,7 +82,7 @@ decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
8282
ALLOCSET_DEFAULT_MINSIZE,
8383
ALLOCSET_DEFAULT_INITSIZE,
8484
ALLOCSET_DEFAULT_MAXSIZE);
85-
data->isExternal= false;
85+
data->isLocal= false;
8686
ctx->output_plugin_private=data;
8787

8888
opt->output_type=OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -104,8 +104,8 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
104104
{
105105
DecoderRawData*data=ctx->output_plugin_private;
106106

107-
if (MultimasterIsExternalTransaction(txn->xid)) {
108-
data->isExternal= true;
107+
if (MMIsLocalTransaction(txn->xid)) {
108+
data->isLocal= true;
109109
}else {
110110
OutputPluginPrepareWrite(ctx, true);
111111
appendStringInfo(ctx->out,"BEGIN %u;",txn->xid);
@@ -119,7 +119,7 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
119119
XLogRecPtrcommit_lsn)
120120
{
121121
DecoderRawData*data=ctx->output_plugin_private;
122-
if (!data->isExternal) {
122+
if (!data->isLocal) {
123123
OutputPluginPrepareWrite(ctx, true);
124124
appendStringInfoString(ctx->out,"COMMIT;");
125125
OutputPluginWrite(ctx, true);
@@ -473,7 +473,7 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
473473
boolis_rel_non_selective;
474474

475475
data=ctx->output_plugin_private;
476-
if (data->isExternal) {
476+
if (data->isLocal) {
477477
return;
478478
}
479479
/* Avoid leaking memory by using and resetting our own context */
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use"CREATE EXTENSION multimaster" to load this file. \quit
3+
4+
CREATEFUNCTIONmm_start_replication() RETURNS void
5+
AS'MODULE_PATHNAME','mm_start_replication'
6+
LANGUAGE C;
7+
8+
CREATEFUNCTIONmm_stop_replication() RETURNS void
9+
AS'MODULE_PATHNAME','mm_stop_replication'
10+
LANGUAGE C;
11+

‎contrib/multimaster/multimaster.c‎

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ typedef struct
6767
{
6868
TransactionIdxid;
6969
intcount;
70-
}ExternalTransaction;
70+
}LocalTransaction;
7171

7272
#defineDTM_SHMEM_SIZE (1024*1024)
7373
#defineDTM_HASH_SIZE 1003
@@ -77,6 +77,9 @@ void _PG_fini(void);
7777

7878
PG_MODULE_MAGIC;
7979

80+
PG_FUNCTION_INFO_V1(mm_start_replication);
81+
PG_FUNCTION_INFO_V1(mm_stop_replication);
82+
8083
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
8184
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
8285
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
@@ -103,10 +106,11 @@ static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
103106
staticvoidByteBufferAppendInt32(ByteBuffer*buf,intdata);
104107
staticvoidByteBufferFree(ByteBuffer*buf);
105108

109+
staticvoidMMMarkTransAsLocal(TransactionIdxid);
106110

107111
staticshmem_startup_hook_typeprev_shmem_startup_hook;
108112
staticHTAB*xid_in_doubt;
109-
staticHTAB*external_trans;
113+
staticHTAB*local_trans;
110114
staticDtmState*dtm;
111115
staticSnapshotCurrentTransactionSnapshot;
112116

@@ -128,9 +132,10 @@ static TransactionManager DtmTM = {
128132
DtmDetectGlobalDeadLock
129133
};
130134

131-
staticchar*MultimasterConnStrs;
132-
staticintMultimasterNodeId;
133-
staticintMultimasterNodes;
135+
staticchar*MMConnStrs;
136+
staticintMMNodeId;
137+
staticintMMNodes;
138+
staticboolMMDoReplication= true;
134139

135140
staticchar*DtmHost;
136141
staticintDtmPort;
@@ -145,8 +150,8 @@ static BackgroundWorker DtmWorker = {
145150
};
146151

147152
#defineXTM_TRACE(fmt, ...)
148-
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
149-
#defineXTM_INFO(fmt, ...)
153+
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
154+
//#define XTM_INFO(fmt, ...)
150155

151156
staticvoidDumpSnapshot(Snapshots,char*name)
152157
{
@@ -697,7 +702,7 @@ static void DtmInitialize()
697702
dtm->xidLock=LWLockAssign();
698703
dtm->nReservedXids=0;
699704
dtm->minXid=InvalidTransactionId;
700-
dtm->nNodes=MultimasterNodes;
705+
dtm->nNodes=MMNodes;
701706
RegisterXactCallback(DtmXactCallback,NULL);
702707
}
703708
LWLockRelease(AddinShmemInitLock);
@@ -714,11 +719,11 @@ static void DtmInitialize()
714719
);
715720

716721
info.keysize=sizeof(TransactionId);
717-
info.entrysize=sizeof(ExternalTransaction);
722+
info.entrysize=sizeof(LocalTransaction);
718723
info.hash=dtm_xid_hash_fn;
719724
info.match=dtm_xid_match_fn;
720-
external_trans=ShmemInitHash(
721-
"external_trans",
725+
local_trans=ShmemInitHash(
726+
"local_trans",
722727
DTM_HASH_SIZE,DTM_HASH_SIZE,
723728
&info,
724729
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
@@ -734,14 +739,19 @@ DtmXactCallback(XactEvent event, void *arg)
734739
XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n",getpid(),event,DtmNextXid);
735740
switch (event)
736741
{
737-
caseXACT_EVENT_START:
738-
if (MyProc&&MyProc->backendId!=InvalidBackendId) {
739-
printf("getpid=%d, MyProc=%d, MyProc->backendId=%d\n",getpid(),MyProc->pid,MyProc->backendId);
740-
MultimasterBeginTransaction();
742+
caseXACT_EVENT_START:
743+
if (MyBackendId!=InvalidBackendId&&MMDoReplication) {
744+
printf("getpid=%d, backendId=%d\n",getpid(),MyBackendId);
745+
MMBeginTransaction();
746+
}
747+
break;
748+
caseXACT_EVENT_PRE_COMMIT:
749+
caseXACT_EVENT_PARALLEL_PRE_COMMIT:
750+
if (!MMDoReplication&&TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
751+
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
741752
}
742753
break;
743-
caseXACT_EVENT_COMMIT:
744-
caseXACT_EVENT_ABORT:
754+
caseXACT_EVENT_ABORT:
745755
if (TransactionIdIsValid(DtmNextXid))
746756
{
747757
if (event==XACT_EVENT_COMMIT)
@@ -862,7 +872,7 @@ _PG_init(void)
862872
"multimaster.conn_strings",
863873
"Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'",
864874
NULL,
865-
&MultimasterConnStrs,
875+
&MMConnStrs,
866876
"",
867877
PGC_POSTMASTER,// context
868878
0,// flags,
@@ -875,7 +885,7 @@ _PG_init(void)
875885
"multimaster.node_id",
876886
"Multimaster node ID",
877887
NULL,
878-
&MultimasterNodeId,
888+
&MMNodeId,
879889
1,
880890
1,
881891
INT_MAX,
@@ -886,7 +896,7 @@ _PG_init(void)
886896
NULL
887897
);
888898

889-
MultimasterNodes=LogicalReplicationStartReceivers(MultimasterConnStrs,MultimasterNodeId);
899+
MMNodes=MMStartReceivers(MMConnStrs,MMNodeId);
890900

891901
if (DtmBufferSize!=0)
892902
{
@@ -924,10 +934,10 @@ static void DtmShmemStartup(void)
924934
* ***************************************************************************
925935
*/
926936

927-
voidMultimasterBeginTransaction(void)
937+
voidMMBeginTransaction(void)
928938
{
929939
if (TransactionIdIsValid(DtmNextXid))
930-
elog(ERROR,"MultimasterBeginTransaction should be called only once for global transaction");
940+
elog(ERROR,"MMBeginTransaction should be called only once for global transaction");
931941
if (dtm==NULL)
932942
elog(ERROR,"DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
933943
DtmNextXid=DtmGlobalStartTransaction(&DtmSnapshot,&dtm->minXid);
@@ -939,10 +949,8 @@ void MultimasterBeginTransaction(void)
939949
DtmLastSnapshot=NULL;
940950
}
941951

942-
voidMultimasterJoinTransaction(TransactionIdxid)
952+
voidMMJoinTransaction(TransactionIdxid)
943953
{
944-
ExternalTransaction*et;
945-
946954
if (TransactionIdIsValid(DtmNextXid))
947955
elog(ERROR,"dtm_begin/join_transaction should be called only once for global transaction");
948956
DtmNextXid=xid;
@@ -955,28 +963,52 @@ void MultimasterJoinTransaction(TransactionId xid)
955963
DtmHasGlobalSnapshot= true;
956964
DtmLastSnapshot=NULL;
957965

966+
MMMarkTransAsLocal(DtmNextXid);
967+
}
968+
969+
970+
voidMMMarkTransAsLocal(TransactionIdxid)
971+
{
972+
LocalTransaction*lt;
973+
974+
Assert(TransactionIdIsValid(xid));
958975
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
959-
et=hash_search(external_trans,&DtmNextXid,HASH_ENTER,NULL);
960-
et->count=dtm->nNodes-1;
976+
lt=hash_search(local_trans,&xid,HASH_ENTER,NULL);
977+
lt->count=dtm->nNodes-1;
961978
LWLockRelease(dtm->hashLock);
962979
}
963980

964-
boolMultimasterIsExternalTransaction(TransactionIdxid)
981+
boolMMIsLocalTransaction(TransactionIdxid)
965982
{
966-
ExternalTransaction*et;
983+
LocalTransaction*lt;
967984
boolresult= false;
968985
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
969-
et=hash_search(external_trans,&xid,HASH_FIND,NULL);
970-
if (et!=NULL) {
986+
lt=hash_search(local_trans,&xid,HASH_FIND,NULL);
987+
if (lt!=NULL) {
971988
result= true;
972-
if (--et->count==0) {
973-
hash_search(external_trans,&xid,HASH_REMOVE,NULL);
989+
if (--lt->count==0) {
990+
hash_search(local_trans,&xid,HASH_REMOVE,NULL);
974991
}
975992
}
976993
LWLockRelease(dtm->hashLock);
977994
returnresult;
978995
}
979996

997+
Datum
998+
mm_start_replication(PG_FUNCTION_ARGS)
999+
{
1000+
MMDoReplication= true;
1001+
PG_RETURN_VOID();
1002+
}
1003+
1004+
Datum
1005+
mm_stop_replication(PG_FUNCTION_ARGS)
1006+
{
1007+
MMDoReplication= false;
1008+
PG_RETURN_VOID();
1009+
}
1010+
1011+
9801012

9811013

9821014
voidDtmBackgroundWorker(Datumarg)

‎contrib/multimaster/multimaster.h‎

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#ifndef__MULTIMASTER_H__
22
#define__MULTIMASTER_H__
33

4-
externintLogicalReplicationStartReceivers(char*nodes,intnode_id);
5-
externvoidMultimasterBeginTransaction(void);
6-
externvoidMultimasterJoinTransaction(TransactionIdxid);
7-
externboolMultimasterIsExternalTransaction(TransactionIdxid);
4+
externintMMStartReceivers(char*nodes,intnode_id);
5+
externvoidMMBeginTransaction(void);
6+
externvoidMMJoinTransaction(TransactionIdxid);
7+
externboolMMIsLocalTransaction(TransactionIdxid);
88

99
externboolisBackgroundWorker;
1010

‎contrib/multimaster/receiver_raw.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ receiver_raw_main(Datum main_arg)
402402
intrc=sscanf(stmt+6,"%u",&xid);
403403
Assert(rc==1);
404404
Assert(!insideTrans);
405-
MultimasterJoinTransaction(xid);
405+
MMJoinTransaction(xid);
406406
insideTrans= true;
407407
}elseif (strncmp(stmt,"COMMIT;",7)==0) {
408408
Assert(insideTrans);
@@ -526,7 +526,7 @@ receiver_raw_main(Datum main_arg)
526526
}
527527

528528

529-
intLogicalReplicationStartReceivers(char*conn_strs,intnode_id)
529+
intMMStartReceivers(char*conn_strs,intnode_id)
530530
{
531531
inti=0;
532532
BackgroundWorkerworker;

‎contrib/multimaster/sockhub/sockhub.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,11 @@ static void die(int sig) {
255255
voidShubLoop(Shub*shub)
256256
{
257257
intbuffer_size=shub->params->buffer_size;
258+
sigset_tsset;
258259
signal(SIGINT,die);
259260
signal(SIGQUIT,die);
260261
signal(SIGTERM,die);
261-
// signal(SIGHUP, die);
262-
sigset_tsset;
262+
/* signal(SIGHUP, die); */
263263
sigfillset(&sset);
264264
sigprocmask(SIG_UNBLOCK,&sset,NULL);
265265

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp