Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitff1651b

Browse files
committed
Eliminate recursion
1 parent4e84ffc commitff1651b

File tree

4 files changed

+131
-113
lines changed

4 files changed

+131
-113
lines changed

‎contrib/multimaster/decoder_raw.c‎

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ extern void_PG_output_plugin_init(OutputPluginCallbacks *cb);
4141
typedefstruct
4242
{
4343
MemoryContextcontext;
44-
boolinclude_transaction;
44+
boolisExternal;
4545
}DecoderRawData;
4646

4747
staticvoiddecoder_raw_startup(LogicalDecodingContext*ctx,
@@ -79,67 +79,16 @@ decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
7979
ListCell*option;
8080
DecoderRawData*data;
8181

82-
data=palloc(sizeof(DecoderRawData));
82+
data=(DecoderRawData*)palloc(sizeof(DecoderRawData));
8383
data->context=AllocSetContextCreate(ctx->context,
8484
"Raw decoder context",
8585
ALLOCSET_DEFAULT_MINSIZE,
8686
ALLOCSET_DEFAULT_INITSIZE,
8787
ALLOCSET_DEFAULT_MAXSIZE);
88-
data->include_transaction= false;
89-
88+
data->isExternal= false;
9089
ctx->output_plugin_private=data;
9190

92-
/* Default output format */
9391
opt->output_type=OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
94-
95-
foreach(option,ctx->output_plugin_options)
96-
{
97-
DefElem*elem=lfirst(option);
98-
99-
Assert(elem->arg==NULL||IsA(elem->arg,String));
100-
101-
if (strcmp(elem->defname,"include_transaction")==0)
102-
{
103-
/* if option does not provide a value, it means its value is true */
104-
if (elem->arg==NULL)
105-
data->include_transaction= true;
106-
elseif (!parse_bool(strVal(elem->arg),&data->include_transaction))
107-
ereport(ERROR,
108-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109-
errmsg("could not parse value \"%s\" for parameter \"%s\"",
110-
strVal(elem->arg),elem->defname)));
111-
}
112-
elseif (strcmp(elem->defname,"output_format")==0)
113-
{
114-
char*format=NULL;
115-
116-
if (elem->arg==NULL)
117-
ereport(ERROR,
118-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119-
errmsg("No value specified for parameter \"%s\"",
120-
elem->defname)));
121-
122-
format=strVal(elem->arg);
123-
124-
if (strcmp(format,"textual")==0)
125-
opt->output_type=OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
126-
elseif (strcmp(format,"binary")==0)
127-
opt->output_type=OUTPUT_PLUGIN_BINARY_OUTPUT;
128-
else
129-
ereport(ERROR,
130-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131-
errmsg("Incorrect value \"%s\" for parameter \"%s\"",
132-
format,elem->defname)));
133-
}
134-
else
135-
{
136-
ereport(ERROR,
137-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
138-
errmsg("option \"%s\" = \"%s\" is unknown",
139-
elem->defname,
140-
elem->arg ?strVal(elem->arg) :"(null)")));
141-
}
142-
}
14392
}
14493

14594
/* cleanup this plugin's resources */
@@ -155,16 +104,16 @@ decoder_raw_shutdown(LogicalDecodingContext *ctx)
155104
/* BEGIN callback */
156105
staticvoid
157106
decoder_raw_begin_txn(LogicalDecodingContext*ctx,ReorderBufferTXN*txn)
158-
{
107+
{
159108
DecoderRawData*data=ctx->output_plugin_private;
160-
161-
/* Write to the plugin onlyifthere is */
162-
if (data->include_transaction)
163-
{
164-
OutputPluginPrepareWrite(ctx, true);
165-
appendStringInfoString(ctx->out,"BEGIN;");
166-
OutputPluginWrite(ctx, true);
167-
}
109+
110+
if(MultimasterIsExternalTransaction(txn->xid)) {
111+
data->isExternal= true;
112+
}else {
113+
OutputPluginPrepareWrite(ctx, true);
114+
appendStringInfoString(ctx->out,"BEGIN %u;",txn->xid);
115+
OutputPluginWrite(ctx, true);
116+
}
168117
}
169118

170119
/* COMMIT callback */
@@ -173,14 +122,11 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
173122
XLogRecPtrcommit_lsn)
174123
{
175124
DecoderRawData*data=ctx->output_plugin_private;
176-
177-
/* Write to the plugin only if there is */
178-
if (data->include_transaction)
179-
{
180-
OutputPluginPrepareWrite(ctx, true);
181-
appendStringInfoString(ctx->out,"COMMIT;");
182-
OutputPluginWrite(ctx, true);
183-
}
125+
if (!data->isExternal) {
126+
OutputPluginPrepareWrite(ctx, true);
127+
appendStringInfoString(ctx->out,"COMMIT;");
128+
OutputPluginWrite(ctx, true);
129+
}
184130
}
185131

186132
/*
@@ -530,7 +476,9 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
530476
boolis_rel_non_selective;
531477

532478
data=ctx->output_plugin_private;
533-
479+
if (data->isExternal) {
480+
return;
481+
}
534482
/* Avoid leaking memory by using and resetting our own context */
535483
old=MemoryContextSwitchTo(data->context);
536484

‎contrib/multimaster/multimaster.c‎

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include"sockhub/sockhub.h"
4545

4646
#include"libdtm.h"
47+
#include"multimaster.h"
4748

4849
typedefstruct
4950
{
@@ -52,6 +53,7 @@ typedef struct
5253
TransactionIdminXid;/* XID of oldest transaction visible by any active transaction (local or global) */
5354
TransactionIdnextXid;/* next XID for local transaction */
5455
size_tnReservedXids;/* number of XIDs reserved for local transactions */
56+
intnNodes;
5557
}DtmState;
5658

5759
typedefstruct
@@ -61,19 +63,18 @@ typedef struct
6163
intused;
6264
}ByteBuffer;
6365

66+
typedefstruct
67+
{
68+
TransactionIdxid;
69+
intcount;
70+
}ExternalTransaction;
6471

6572
#defineDTM_SHMEM_SIZE (1024*1024)
6673
#defineDTM_HASH_SIZE 1003
6774

6875
void_PG_init(void);
6976
void_PG_fini(void);
7077

71-
externvoidLogicalReplicationStartReceivers(char*nodes,intnode_id);
72-
externvoidLogicalReplicationBroadcastXid(TransactonIdXid);
73-
74-
voidMultimasterBeginTransaction(void);
75-
voidMultimasterJoinTransaction(TransactionIdxid);
76-
7778
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
7879
staticvoidDtmMergeWithGlobalSnapshot(Snapshotsnapshot);
7980
staticXidStatusDtmGetTransactionStatus(TransactionIdxid,XLogRecPtr*lsn);
@@ -103,6 +104,7 @@ static void ByteBufferFree(ByteBuffer* buf);
103104

104105
staticshmem_startup_hook_typeprev_shmem_startup_hook;
105106
staticHTAB*xid_in_doubt;
107+
staticHTAB*external_trans;
106108
staticDtmState*dtm;
107109
staticSnapshotCurrentTransactionSnapshot;
108110

@@ -126,11 +128,14 @@ static TransactionManager DtmTM = {
126128

127129
staticchar*MultimasterConnStrs;
128130
staticintMultimasterNodeId;
131+
staticintMultimasterNodes;
129132

130133
staticchar*DtmHost;
131134
staticintDtmPort;
132135
staticintDtmBufferSize;
133136

137+
boolisBackgroundWorker;
138+
134139
staticBackgroundWorkerDtmWorker= {
135140
"DtmWorker",
136141
0,/* do not need connection to the database */
@@ -694,6 +699,7 @@ static void DtmInitialize()
694699
dtm->xidLock=LWLockAssign();
695700
dtm->nReservedXids=0;
696701
dtm->minXid=InvalidTransactionId;
702+
dtm->nNodes=MultimasterNodes;
697703
RegisterXactCallback(DtmXactCallback,NULL);
698704
}
699705
LWLockRelease(AddinShmemInitLock);
@@ -709,6 +715,17 @@ static void DtmInitialize()
709715
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
710716
);
711717

718+
info.keysize=sizeof(TransactionId);
719+
info.entrysize=sizeof(ExternalTransaction);
720+
info.hash=dtm_xid_hash_fn;
721+
info.match=dtm_xid_match_fn;
722+
external_trans=ShmemInitHash(
723+
"external_trans",
724+
DTM_HASH_SIZE,DTM_HASH_SIZE,
725+
&info,
726+
HASH_ELEM |HASH_FUNCTION |HASH_COMPARE
727+
);
728+
712729

713730
TM=&DtmTM;
714731
}
@@ -720,7 +737,9 @@ DtmXactCallback(XactEvent event, void *arg)
720737
switch (event)
721738
{
722739
caseXACT_EVENT_BEGIN:
723-
MultimasterBeginTransaction();
740+
if (!isBackgroundWorker) {
741+
MultimasterBeginTransaction();
742+
}
724743
break;
725744
caseXACT_EVENT_COMMIT:
726745
caseXACT_EVENT_ABORT:
@@ -865,7 +884,7 @@ _PG_init(void)
865884
NULL
866885
);
867886

868-
LogicalReplicationStartReceivers(MultimasterConnStrs,MultimasterNodeId);
887+
MultimasterNodes=LogicalReplicationStartReceivers(MultimasterConnStrs,MultimasterNodeId);
869888

870889
if (DtmBufferSize!=0)
871890
{
@@ -931,10 +950,8 @@ dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
931950

932951
voidMultimasterBeginTransaction(void)
933952
{
934-
if (TransactionIdIsValid(DtmNextXid)) {
935-
/* slave transaction */
936-
return;
937-
}
953+
if (TransactionIdIsValid(DtmNextXid))
954+
elog(ERROR,"MultimasterBeginTransaction should be called only once for global transaction");
938955
if (dtm==NULL)
939956
elog(ERROR,"DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
940957
DtmNextXid=DtmGlobalStartTransaction(&DtmSnapshot,&dtm->minXid);
@@ -944,12 +961,12 @@ void MultimasterBeginTransaction(void)
944961

945962
DtmHasGlobalSnapshot= true;
946963
DtmLastSnapshot=NULL;
947-
948-
LogicalReplicationBroadcastXid(DtmNextXid);
949964
}
950965

951966
voidMultimasterJoinTransaction(TransactionIdxid)
952967
{
968+
ExternalTrans*et;
969+
953970
if (TransactionIdIsValid(DtmNextXid))
954971
elog(ERROR,"dtm_begin/join_transaction should be called only once for global transaction");
955972
DtmNextXid=xid;
@@ -961,14 +978,39 @@ void MultimasterJoinTransaction(TransactionId xid)
961978

962979
DtmHasGlobalSnapshot= true;
963980
DtmLastSnapshot=NULL;
981+
982+
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
983+
et=hash_search(external_trans,&DtmNextXid,HASH_ENTER,NULL);
984+
et->count=dtm->nNodes-1;
985+
LWLockRelease(dtm->hashLock);
964986
}
965987

988+
boolMultimasterIsExternalTransaction(TransactionIdxid)
989+
{
990+
ExternalTrans*et;
991+
boolresult= false;
992+
LWLockAcquire(dtm->hashLock,LW_EXCLUSIVE);
993+
et=hash_search(external_trans,&DtmNextXid,HASH_FIND,NULL);
994+
if (et!=NULL) {
995+
result= true;
996+
if (--et->count==0) {
997+
hash_search(external_trans,&DtmNextXid,HASH_REMOVE,NULL);
998+
}
999+
}
1000+
LWLockRelease(dtm->hashLock);
1001+
returnresult;
1002+
}
1003+
1004+
1005+
9661006
voidDtmBackgroundWorker(Datumarg)
9671007
{
9681008
Shubshub;
9691009
ShubParamsparams;
9701010
charunix_sock_path[MAXPGPATH];
9711011

1012+
isBackgroundWorker= true;
1013+
9721014
snprintf(unix_sock_path,sizeof(unix_sock_path),"%s/p%d",Unix_socket_directories,DtmPort);
9731015

9741016
ShubInitParams(&params);

‎contrib/multimaster/multimaster.h‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#ifndef__MULTIMASTER_H__
2+
#define__MULTIMASTER_H__
3+
4+
externintLogicalReplicationStartReceivers(char*nodes,intnode_id);
5+
externvoidMultimasterBeginTransaction(void);
6+
externvoidMultimasterJoinTransaction(TransactionIdxid);
7+
externboolMultimasterIsExternalTransaction(TransactionIdxid);
8+
9+
externboolisBackgroundWorker;
10+
11+
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp