Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6a8d759

Browse files
committed
Support remote functions
1 parent8b99228 commit6a8d759

File tree

4 files changed

+100
-57
lines changed

4 files changed

+100
-57
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include"parser/analyze.h"
6969
#include"parser/parse_relation.h"
7070
#include"parser/parse_type.h"
71+
#include"parser/parse_func.h"
7172
#include"catalog/pg_class.h"
7273
#include"catalog/pg_type.h"
7374
#include"tcop/pquery.h"
@@ -157,6 +158,7 @@ static void MtmInitializeSequence(int64* start, int64* step);
157158
staticvoid*MtmCreateSavepointContext(void);
158159
staticvoidMtmRestoreSavepointContext(void*ctx);
159160
staticvoidMtmReleaseSavepointContext(void*ctx);
161+
staticvoidMtmSetRemoteFunction(charconst*list,void*extra);
160162

161163
staticvoidMtmCheckClusterLock(void);
162164
staticvoidMtmCheckSlots(void);
@@ -183,6 +185,7 @@ MtmConnectionInfo* MtmConnections;
183185

184186
HTAB*MtmXid2State;
185187
HTAB*MtmGid2State;
188+
staticHTAB*MtmRemoteFunctions;
186189
staticHTAB*MtmLocalTables;
187190

188191
staticboolMtmIsRecoverySession;
@@ -254,6 +257,7 @@ bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some
254257
TransactionIdMtmUtilityProcessedInXid;
255258

256259
staticchar*MtmConnStrs;
260+
staticchar*MtmRemoteFunctionsList;
257261
staticchar*MtmClusterName;
258262
staticintMtmQueueSize;
259263
staticintMtmWorkers;
@@ -2567,7 +2571,7 @@ MtmCreateLocalTableMap(void)
25672571
"MtmLocalTables",
25682572
MULTIMASTER_MAX_LOCAL_TABLES,MULTIMASTER_MAX_LOCAL_TABLES,
25692573
&info,
2570-
HASH_ELEM
2574+
HASH_ELEM |HASH_BLOBS
25712575
);
25722576
returnhtab;
25732577
}
@@ -2761,6 +2765,48 @@ MtmShmemStartup(void)
27612765
MtmInitialize();
27622766
}
27632767

2768+
staticvoidMtmSetRemoteFunction(charconst*list,void*extra)
2769+
{
2770+
if (MtmRemoteFunctions) {
2771+
hash_destroy(MtmRemoteFunctions);
2772+
MtmRemoteFunctions=NULL;
2773+
}
2774+
}
2775+
2776+
staticvoidMtmInitializeRemoteFunctionsMap()
2777+
{
2778+
HASHCTLinfo;
2779+
char*p,*q;
2780+
intn_funcs=1;
2781+
FuncCandidateListclist;
2782+
2783+
for (p=MtmRemoteFunctionsList; (q=strchr(p,','))!=NULL;p=q+1,n_funcs++);
2784+
2785+
Assert(MtmRemoteFunctions==NULL);
2786+
2787+
memset(&info,0,sizeof(info));
2788+
info.entrysize=info.keysize=sizeof(Oid);
2789+
info.hcxt=TopMemoryContext;
2790+
MtmRemoteFunctions=hash_create("MtmRemoteFunctions",n_funcs,&info,
2791+
HASH_ELEM |HASH_BLOBS |HASH_CONTEXT);
2792+
2793+
p=pstrdup(MtmRemoteFunctionsList);
2794+
do {
2795+
q=strchr(p,',');
2796+
if (q!=NULL) {
2797+
*q++='\0';
2798+
}
2799+
clist=FuncnameGetCandidates(stringToQualifiedNameList(p),-1,NIL, false, false, true);
2800+
if (clist==NULL) {
2801+
MTM_ELOG(ERROR,"Failed to lookup function %s",p);
2802+
}elseif (clist->next!=NULL) {
2803+
MTM_ELOG(ERROR,"Ambigious function %s",p);
2804+
}
2805+
hash_search(MtmRemoteFunctions,&clist->oid,HASH_ENTER,NULL);
2806+
p=q;
2807+
}while (p!=NULL);
2808+
}
2809+
27642810
/*
27652811
* Parse node connection string.
27662812
* This function is called at cluster startup and while adding new cluster node
@@ -3377,6 +3423,19 @@ _PG_init(void)
33773423
NULL/* GucShowHook show_hook */
33783424
);
33793425

3426+
DefineCustomStringVariable(
3427+
"multimaster.remote_functions",
3428+
"List of fnuction names which should be executed remotely at all multimaster nodes instead of executing them at master and replicating result of their work",
3429+
NULL,
3430+
&MtmRemoteFunctionsList,
3431+
"lo_create,lo_unlink",
3432+
PGC_USERSET,/* context */
3433+
0,/* flags */
3434+
NULL,/* GucStringCheckHook check_hook */
3435+
MtmSetRemoteFunction,/* GucStringAssignHook assign_hook */
3436+
NULL/* GucShowHook show_hook */
3437+
);
3438+
33803439
DefineCustomStringVariable(
33813440
"multimaster.cluster_name",
33823441
"Name of the cluster",
@@ -3867,7 +3926,7 @@ lsn_t MtmGetFlushPosition(int nodeId)
38673926
* Keep track of progress of WAL writer.
38683927
* We need to notify WAL senders at other nodes which logical records
38693928
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3870-
* WAL is flushed by WAL writer. Currentflish position can be obtained by GetFlushRecPtr().
3929+
* WAL is flushed by WAL writer. Currentflush position can be obtained by GetFlushRecPtr().
38713930
* So on applying new logical record we insert it in the MtmLsnMapping and compare
38723931
* their poistions in local WAL log with current flush position.
38733932
* The records which are flushed to the disk by WAL writer are removed from the list
@@ -4975,7 +5034,7 @@ char* MtmGucSerialize(void)
49755034
appendStringInfoString(serialized_gucs," TO ");
49765035

49775036
/* quite a crutch */
4978-
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0')
5037+
if (strstr(cur_entry->key,"_mem")!=NULL||*(cur_entry->value)=='\0'||strchr(cur_entry->value,',')!=NULL)
49795038
{
49805039
appendStringInfoString(serialized_gucs,"'");
49815040
appendStringInfoString(serialized_gucs,cur_entry->value);
@@ -5006,10 +5065,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
50065065
if (transactional)
50075066
{
50085067
char*gucCtx=MtmGucSerialize();
5009-
if (*gucCtx)
5010-
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s; %s",gucCtx,queryString);
5011-
else
5012-
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s",queryString);
5068+
queryString=psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s",gucCtx,queryString);
50135069

50145070
/* Transactional DDL */
50155071
MTM_LOG3("Sending DDL: %s",queryString);
@@ -5377,29 +5433,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53775433
staticvoid
53785434
MtmExecutorStart(QueryDesc*queryDesc,inteflags)
53795435
{
5380-
boolddl_generating_call= false;
5381-
ListCell*tlist;
5382-
5383-
foreach(tlist,queryDesc->plannedstmt->planTree->targetlist)
5436+
if (!MtmTx.isReplicated&&ActivePortal)
53845437
{
5385-
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
5438+
ListCell*tlist;
53865439

5387-
if (tle->resname&&strcmp(tle->resname,"lo_create")==0)
5440+
if (!MtmRemoteFunctions)
53885441
{
5389-
ddl_generating_call= true;
5390-
break;
5442+
MtmInitializeRemoteFunctionsMap();
53915443
}
53925444

5393-
if (tle->resname&&strcmp(tle->resname,"lo_unlink")==0)
5445+
foreach(tlist,queryDesc->plannedstmt->planTree->targetlist)
53945446
{
5395-
ddl_generating_call= true;
5396-
break;
5447+
TargetEntry*tle= (TargetEntry*)lfirst(tlist);
5448+
if (tle->expr&&IsA(tle->expr,FuncExpr))
5449+
{
5450+
if (hash_search(MtmRemoteFunctions,&((FuncExpr*)tle->expr)->funcid,HASH_FIND,NULL))
5451+
{
5452+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5453+
break;
5454+
}
5455+
}
53975456
}
53985457
}
5399-
5400-
if (ddl_generating_call&& !MtmTx.isReplicated)
5401-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5402-
54035458
if (PreviousExecutorStartHook!=NULL)
54045459
PreviousExecutorStartHook(queryDesc,eflags);
54055460
else

‎contrib/mmts/multimaster.h

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@
5151
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
5252
#endif
5353

54-
#defineMULTIMASTER_NAME "multimaster"
55-
#defineMULTIMASTER_SCHEMA_NAME "mtm"
56-
#defineMULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57-
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58-
#defineMULTIMASTER_MIN_PROTO_VERSION 1
59-
#defineMULTIMASTER_MAX_PROTO_VERSION 1
60-
#defineMULTIMASTER_MAX_GID_SIZE 32
61-
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE 16
62-
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
63-
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
64-
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
65-
#defineMULTIMASTER_MAX_CTL_STR_SIZE 256
66-
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67-
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68-
#defineMULTIMASTER_ADMIN "mtm_admin"
69-
#defineMULTIMASTER_PRECOMMITTED "precommitted"
54+
#defineMULTIMASTER_NAME"multimaster"
55+
#defineMULTIMASTER_SCHEMA_NAME"mtm"
56+
#defineMULTIMASTER_LOCAL_TABLES_TABLE"local_tables"
57+
#defineMULTIMASTER_SLOT_PATTERN"mtm_slot_%d"
58+
#defineMULTIMASTER_MIN_PROTO_VERSION1
59+
#defineMULTIMASTER_MAX_PROTO_VERSION1
60+
#defineMULTIMASTER_MAX_GID_SIZE32
61+
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE16
62+
#defineMULTIMASTER_MAX_CONN_STR_SIZE128
63+
#defineMULTIMASTER_MAX_HOST_NAME_SIZE64
64+
#defineMULTIMASTER_MAX_LOCAL_TABLES256
65+
#defineMULTIMASTER_MAX_CTL_STR_SIZE256
66+
#defineMULTIMASTER_LOCK_BUF_INIT_SIZE4096
67+
#defineMULTIMASTER_BROADCAST_SERVICE"mtm_broadcast"
68+
#defineMULTIMASTER_ADMIN"mtm_admin"
69+
#defineMULTIMASTER_PRECOMMITTED"precommitted"
7070

7171
#defineMULTIMASTER_DEFAULT_ARBITER_PORT 5433
7272

‎contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,8 @@ pglogical_receiver_main(Datum main_arg)
478478
{
479479
int64now=feGetCurrentTimestamp();
480480

481-
/* Leave is feedback is not sent properly */
482481
MtmUpdateLsnMapping(nodeId,walEnd);
482+
/* Leave if feedback is not sent properly */
483483
if (!sendFeedback(conn,now,nodeId)) {
484484
gotoOnError;
485485
}
@@ -628,7 +628,6 @@ pglogical_receiver_main(Datum main_arg)
628628
{
629629
int64now=feGetCurrentTimestamp();
630630

631-
/* Leave is feedback is not sent properly */
632631
MtmUpdateLsnMapping(nodeId,INVALID_LSN);
633632
sendFeedback(conn,now,nodeId);
634633
}
@@ -724,4 +723,3 @@ void MtmStartReceivers(void)
724723
}
725724
}
726725
}
727-

‎contrib/mmts/pglogical_relid_map.c

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,19 @@ static void
2020
pglogical_relid_map_init(void)
2121
{
2222
HASHCTLctl;
23-
inthash_flags=HASH_ELEM;
24-
2523
Assert(relid_map==NULL);
2624

2725
MemSet(&ctl,0,sizeof(ctl));
2826
ctl.keysize=sizeof(Oid);
2927
ctl.entrysize=sizeof(PGLRelidMapEntry);
30-
31-
#ifPG_VERSION_NUM >=90500
32-
hash_flags |=HASH_BLOBS;
33-
#else
34-
ctl.hash=tag_hash;
35-
hash_flags |=HASH_FUNCTION;
36-
#endif
37-
38-
relid_map=hash_create("pglogical_relid_map",PGL_INIT_RELID_MAP_SIZE,&ctl,hash_flags);
28+
relid_map=hash_create("pglogical_relid_map",PGL_INIT_RELID_MAP_SIZE,&ctl,HASH_ELEM |HASH_BLOBS);
3929

4030
Assert(relid_map!=NULL);
4131
}
4232

4333
Oidpglogical_relid_map_get(Oidrelid)
4434
{
45-
if (relid_map!=NULL) {
35+
if (relid_map!=NULL) {
4636
PGLRelidMapEntry*entry= (PGLRelidMapEntry*)hash_search(relid_map,&relid,HASH_FIND,NULL);
4737
returnentry ?entry->local_relid :InvalidOid;
4838
}
@@ -51,23 +41,23 @@ Oid pglogical_relid_map_get(Oid relid)
5141

5242
boolpglogical_relid_map_put(Oidremote_relid,Oidlocal_relid)
5343
{
54-
boolfound;
44+
boolfound;
5545
PGLRelidMapEntry*entry;
56-
if (relid_map==NULL) {
46+
if (relid_map==NULL) {
5747
pglogical_relid_map_init();
5848
}
5949
entry=hash_search(relid_map,&remote_relid,HASH_ENTER,&found);
6050
if (found) {
6151
entry->local_relid=local_relid;
62-
return false;
52+
return false;
6353
}
6454
entry->local_relid=local_relid;
6555
return true;
6656
}
6757

6858
voidpglogical_relid_map_reset(void)
6959
{
70-
if (relid_map!=NULL) {
60+
if (relid_map!=NULL) {
7161
hash_destroy(relid_map);
7262
relid_map=NULL;
7363
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp