Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0f36ea2

Browse files
knizhnikkelvich
authored andcommitted
Support remote functions
1 parent238d9ee commit0f36ea2

File tree

4 files changed

+100
-57
lines changed

4 files changed

+100
-57
lines changed

‎multimaster.c‎

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include"parser/analyze.h"
6969
#include"parser/parse_relation.h"
7070
#include"parser/parse_type.h"
71+
#include"parser/parse_func.h"
7172
#include"catalog/pg_class.h"
7273
#include"catalog/pg_type.h"
7374
#include"tcop/pquery.h"
@@ -158,6 +159,7 @@ static void MtmInitializeSequence(int64* start, int64* step);
158159
staticvoid*MtmCreateSavepointContext(void);
159160
staticvoidMtmRestoreSavepointContext(void*ctx);
160161
staticvoidMtmReleaseSavepointContext(void*ctx);
162+
staticvoidMtmSetRemoteFunction(charconst*list,void*extra);
161163

162164
staticvoidMtmCheckClusterLock(void);
163165
staticvoidMtmCheckSlots(void);
@@ -184,6 +186,7 @@ MtmConnectionInfo* MtmConnections;
184186

185187
HTAB*MtmXid2State;
186188
HTAB*MtmGid2State;
189+
staticHTAB*MtmRemoteFunctions;
187190
staticHTAB*MtmLocalTables;
188191

189192
staticboolMtmIsRecoverySession;
@@ -258,6 +261,7 @@ bool MtmMajorNode;
258261
TransactionIdMtmUtilityProcessedInXid;
259262

260263
staticchar*MtmConnStrs;
264+
staticchar*MtmRemoteFunctionsList;
261265
staticchar*MtmClusterName;
262266
staticintMtmQueueSize;
263267
staticintMtmWorkers;
@@ -2229,7 +2233,7 @@ MtmCreateLocalTableMap(void)
22292233
"MtmLocalTables",
22302234
MULTIMASTER_MAX_LOCAL_TABLES,MULTIMASTER_MAX_LOCAL_TABLES,
22312235
&info,
2232-
HASH_ELEM
2236+
HASH_ELEM |HASH_BLOBS
22332237
);
22342238
returnhtab;
22352239
}
@@ -2423,6 +2427,48 @@ MtmShmemStartup(void)
24232427
MtmInitialize();
24242428
}
24252429

2430+
staticvoidMtmSetRemoteFunction(charconst*list,void*extra)
2431+
{
2432+
if (MtmRemoteFunctions) {
2433+
hash_destroy(MtmRemoteFunctions);
2434+
MtmRemoteFunctions=NULL;
2435+
}
2436+
}
2437+
2438+
staticvoidMtmInitializeRemoteFunctionsMap()
2439+
{
2440+
HASHCTLinfo;
2441+
char*p,*q;
2442+
intn_funcs=1;
2443+
FuncCandidateListclist;
2444+
2445+
for (p=MtmRemoteFunctionsList; (q=strchr(p,','))!=NULL;p=q+1,n_funcs++);
2446+
2447+
Assert(MtmRemoteFunctions==NULL);
2448+
2449+
memset(&info,0,sizeof(info));
2450+
info.entrysize=info.keysize=sizeof(Oid);
2451+
info.hcxt=TopMemoryContext;
2452+
MtmRemoteFunctions=hash_create("MtmRemoteFunctions",n_funcs,&info,
2453+
HASH_ELEM |HASH_BLOBS |HASH_CONTEXT);
2454+
2455+
p=pstrdup(MtmRemoteFunctionsList);
2456+
do {
2457+
q=strchr(p,',');
2458+
if (q!=NULL) {
2459+
*q++='\0';
2460+
}
2461+
clist=FuncnameGetCandidates(stringToQualifiedNameList(p),-1,NIL, false, false, true);
2462+
if (clist==NULL) {
2463+
MTM_ELOG(ERROR,"Failed to lookup function %s",p);
2464+
}elseif (clist->next!=NULL) {
2465+
MTM_ELOG(ERROR,"Ambigious function %s",p);
2466+
}
2467+
hash_search(MtmRemoteFunctions,&clist->oid,HASH_ENTER,NULL);
2468+
p=q;
2469+
}while (p!=NULL);
2470+
}
2471+
24262472
/*
24272473
* Parse node connection string.
24282474
* This function is called at cluster startup and while adding new cluster node
@@ -3052,6 +3098,19 @@ _PG_init(void)
30523098
NULL/* GucShowHook show_hook */
30533099
);
30543100

3101+
DefineCustomStringVariable(
3102+
"multimaster.remote_functions",
3103+
"List of fnuction names which should be executed remotely at all multimaster nodes instead of executing them at master and replicating result of their work",
3104+
NULL,
3105+
&MtmRemoteFunctionsList,
3106+
"lo_create,lo_unlink",
3107+
PGC_USERSET,/* context */
3108+
0,/* flags */
3109+
NULL,/* GucStringCheckHook check_hook */
3110+
MtmSetRemoteFunction,/* GucStringAssignHook assign_hook */
3111+
NULL/* GucShowHook show_hook */
3112+
);
3113+
30553114
DefineCustomStringVariable(
30563115
"multimaster.cluster_name",
30573116
"Name of the cluster",
@@ -3541,7 +3600,7 @@ lsn_t MtmGetFlushPosition(int nodeId)
35413600
* Keep track of progress of WAL writer.
35423601
* We need to notify WAL senders at other nodes which logical records
35433602
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3544-
* WAL is flushed by WAL writer. Currentflish position can be obtained by GetFlushRecPtr().
3603+
* WAL is flushed by WAL writer. Currentflush position can be obtained by GetFlushRecPtr().
35453604
* So on applying new logical record we insert it in the MtmLsnMapping and compare
35463605
* their poistions in local WAL log with current flush position.
35473606
* The records which are flushed to the disk by WAL writer are removed from the list
@@ -4656,7 +4715,7 @@ char* MtmGucSerialize(void)
46564715
appendStringInfoString(serialized_gucs," TO ");
46574716

46584717
/* quite a crutch */
4659-
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0')
4718+
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0'||strchr(cur_entry->value,',')!=NULL)
46604719
{
46614720
appendStringInfoString(serialized_gucs,"'");
46624721
appendStringInfoString(serialized_gucs,cur_entry->value);
@@ -4686,10 +4745,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
46864745
if (transactional)
46874746
{
46884747
char*gucCtx=MtmGucSerialize();
4689-
if (*gucCtx)
4690-
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s",gucCtx,queryString);
4691-
else
4692-
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s",queryString);
4748+
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s",gucCtx,queryString);
46934749

46944750
/* Transactional DDL */
46954751
MTM_LOG3("Sending DDL: %s",queryString);
@@ -5058,29 +5114,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50585114
staticvoid
50595115
MtmExecutorStart(QueryDesc*queryDesc,inteflags)
50605116
{
5061-
boolddl_generating_call= false;
5062-
ListCell*tlist;
5063-
5064-
foreach(tlist,queryDesc->plannedstmt->planTree->targetlist)
5117+
if (!MtmTx.isReplicated&&ActivePortal)
50655118
{
5066-
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
5119+
ListCell*tlist;
50675120

5068-
if (tle->resname&&strcmp(tle->resname,"lo_create")==0)
5121+
if (!MtmRemoteFunctions)
50695122
{
5070-
ddl_generating_call= true;
5071-
break;
5123+
MtmInitializeRemoteFunctionsMap();
50725124
}
50735125

5074-
if (tle->resname&&strcmp(tle->resname,"lo_unlink")==0)
5126+
foreach(tlist,queryDesc->plannedstmt->planTree->targetlist)
50755127
{
5076-
ddl_generating_call= true;
5077-
break;
5128+
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
5129+
if (tle->expr&&IsA(tle->expr,FuncExpr))
5130+
{
5131+
if (hash_search(MtmRemoteFunctions,&((FuncExpr*)tle->expr)->funcid,HASH_FIND,NULL))
5132+
{
5133+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5134+
break;
5135+
}
5136+
}
50785137
}
50795138
}
5080-
5081-
if (ddl_generating_call&& !MtmTx.isReplicated)
5082-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5083-
50845139
if (PreviousExecutorStartHook!=NULL)
50855140
PreviousExecutorStartHook(queryDesc,eflags);
50865141
else

‎multimaster.h‎

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@
5151
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
5252
// #endif
5353

54-
#defineMULTIMASTER_NAME "multimaster"
55-
#defineMULTIMASTER_SCHEMA_NAME "mtm"
56-
#defineMULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57-
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58-
#defineMULTIMASTER_MIN_PROTO_VERSION 1
59-
#defineMULTIMASTER_MAX_PROTO_VERSION 1
60-
#defineMULTIMASTER_MAX_GID_SIZE 32
61-
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE 16
62-
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
63-
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
64-
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
65-
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
66-
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67-
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68-
#defineMULTIMASTER_ADMIN "mtm_admin"
69-
#defineMULTIMASTER_PRECOMMITTED "precommitted"
54+
#defineMULTIMASTER_NAME"multimaster"
55+
#defineMULTIMASTER_SCHEMA_NAME"mtm"
56+
#defineMULTIMASTER_LOCAL_TABLES_TABLE"local_tables"
57+
#defineMULTIMASTER_SLOT_PATTERN"mtm_slot_%d"
58+
#defineMULTIMASTER_MIN_PROTO_VERSION1
59+
#defineMULTIMASTER_MAX_PROTO_VERSION1
60+
#defineMULTIMASTER_MAX_GID_SIZE32
61+
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE16
62+
#defineMULTIMASTER_MAX_CONN_STR_SIZE128
63+
#defineMULTIMASTER_MAX_HOST_NAME_SIZE64
64+
#defineMULTIMASTER_MAX_LOCAL_TABLES256
65+
#defineMULTIMASTER_MAX_CTL_STR_SIZE256
66+
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE4096
67+
#defineMULTIMASTER_BROADCAST_SERVICE"mtm_broadcast"
68+
#defineMULTIMASTER_ADMIN"mtm_admin"
69+
#defineMULTIMASTER_PRECOMMITTED"precommitted"
7070

7171
#defineMULTIMASTER_DEFAULT_ARBITER_PORT 5433
7272

‎pglogical_receiver.c‎

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -485,8 +485,8 @@ pglogical_receiver_main(Datum main_arg)
485485
{
486486
int64now=feGetCurrentTimestamp();
487487

488-
/* Leave is feedback is not sent properly */
489488
MtmUpdateLsnMapping(nodeId,walEnd);
489+
/* Leave if feedback is not sent properly */
490490
if (!sendFeedback(conn,now,nodeId)) {
491491
gotoOnError;
492492
}
@@ -635,7 +635,6 @@ pglogical_receiver_main(Datum main_arg)
635635
{
636636
int64now=feGetCurrentTimestamp();
637637

638-
/* Leave is feedback is not sent properly */
639638
MtmUpdateLsnMapping(nodeId,INVALID_LSN);
640639
sendFeedback(conn,now,nodeId);
641640
}
@@ -731,4 +730,3 @@ void MtmStartReceivers(void)
731730
}
732731
}
733732
}
734-

‎pglogical_relid_map.c‎

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,19 @@ static void
2020
pglogical_relid_map_init(void)
2121
{
2222
HASHCTLctl;
23-
inthash_flags=HASH_ELEM;
24-
2523
Assert(relid_map==NULL);
2624

2725
MemSet(&ctl,0,sizeof(ctl));
2826
ctl.keysize=sizeof(Oid);
2927
ctl.entrysize=sizeof(PGLRelidMapEntry);
30-
31-
#ifPG_VERSION_NUM >=90500
32-
hash_flags |=HASH_BLOBS;
33-
#else
34-
ctl.hash=tag_hash;
35-
hash_flags |=HASH_FUNCTION;
36-
#endif
37-
38-
relid_map=hash_create("pglogical_relid_map",PGL_INIT_RELID_MAP_SIZE,&ctl,hash_flags);
28+
relid_map=hash_create("pglogical_relid_map",PGL_INIT_RELID_MAP_SIZE,&ctl,HASH_ELEM |HASH_BLOBS);
3929

4030
Assert(relid_map!=NULL);
4131
}
4232

4333
Oidpglogical_relid_map_get(Oidrelid)
4434
{
45-
if (relid_map!=NULL) {
35+
if (relid_map!=NULL) {
4636
PGLRelidMapEntry*entry= (PGLRelidMapEntry*)hash_search(relid_map,&relid,HASH_FIND,NULL);
4737
returnentry ?entry->local_relid :InvalidOid;
4838
}
@@ -51,23 +41,23 @@ Oid pglogical_relid_map_get(Oid relid)
5141

5242
boolpglogical_relid_map_put(Oidremote_relid,Oidlocal_relid)
5343
{
54-
boolfound;
44+
boolfound;
5545
PGLRelidMapEntry*entry;
56-
if (relid_map==NULL) {
46+
if (relid_map==NULL) {
5747
pglogical_relid_map_init();
5848
}
5949
entry=hash_search(relid_map,&remote_relid,HASH_ENTER,&found);
6050
if (found) {
6151
entry->local_relid=local_relid;
62-
return false;
52+
return false;
6353
}
6454
entry->local_relid=local_relid;
6555
return true;
6656
}
6757

6858
voidpglogical_relid_map_reset(void)
6959
{
70-
if (relid_map!=NULL) {
60+
if (relid_map!=NULL) {
7161
hash_destroy(relid_map);
7262
relid_map=NULL;
7363
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp