Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit09cf6e4

Browse files
committed
Add local tables hash
1 parent2420bbe commit09cf6e4

File tree

4 files changed

+183
-6
lines changed

4 files changed

+183
-6
lines changed

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,11 @@ CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATEFUNCTIONmtm.make_table_local(relation regclass) RETURNS void
40+
AS'MODULE_PATHNAME','mtm_make_table_local'
41+
LANGUAGE C;
42+
3943
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);
44+
45+
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key pk(rel_schema, rel_name));
46+

‎contrib/mmts/multimaster.c‎

Lines changed: 163 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include"storage/pmsignal.h"
4646
#include"storage/proc.h"
4747
#include"utils/syscache.h"
48+
#include"utils/lsyscache.h"
4849
#include"replication/walsender.h"
4950
#include"replication/walsender_private.h"
5051
#include"replication/slot.h"
@@ -53,6 +54,7 @@
5354
#include"nodes/makefuncs.h"
5455
#include"access/htup_details.h"
5556
#include"catalog/indexing.h"
57+
#include"catalog/namespace.h"
5658
#include"pglogical_output/hooks.h"
5759

5860
#include"multimaster.h"
@@ -105,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
105107
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
106108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
107109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110+
PG_FUNCTION_INFO_V1(mtm_make_table_local);
108111

109112
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
110113
staticvoidMtmInitialize(void);
@@ -135,6 +138,7 @@ MtmState* Mtm;
135138

136139
HTAB*MtmXid2State;
137140
staticHTAB*MtmGid2State;
141+
staticHTAB*MtmLocalTables;
138142

139143
staticMtmCurrentTransMtmTx;
140144

@@ -176,11 +180,12 @@ bool MtmUseRaftable;
176180
MtmConnectionInfo*MtmConnections;
177181

178182
staticchar*MtmConnStrs;
179-
staticintMtmQueueSize;
180-
staticintMtmWorkers;
181-
staticintMtmVacuumDelay;
182-
staticintMtmMinRecoveryLag;
183-
staticintMtmMaxRecoveryLag;
183+
staticintMtmQueueSize;
184+
staticintMtmWorkers;
185+
staticintMtmVacuumDelay;
186+
staticintMtmMinRecoveryLag;
187+
staticintMtmMaxRecoveryLag;
188+
staticboolMtmIgnoreTablesWithoutPk;
184189

185190
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
186191
staticProcessUtility_hook_typePreviousProcessUtilityHook;
@@ -1280,6 +1285,71 @@ MtmCreateGidMap(void)
12801285
returnhtab;
12811286
}
12821287

1288+
staticHTAB*
1289+
MtmCreateLocalTableMap(void)
1290+
{
1291+
HASHCTLinfo;
1292+
HTAB*htab;
1293+
memset(&info,0,sizeof(info));
1294+
info.keysize=sizeof(Oid);
1295+
htab=ShmemInitHash(
1296+
"MtmLocalTables",
1297+
MULTIMASTER_MAX_LOCAL_TABLES,MULTIMASTER_MAX_LOCAL_TABLES,
1298+
&info,
1299+
0
1300+
);
1301+
returnhtab;
1302+
}
1303+
1304+
staticvoidMtmMakeRelationLocal(Oidrelid)
1305+
{
1306+
if (OidIsValid(relid)) {
1307+
MtmLock(LW_EXCLUSIVE);
1308+
hash_search(MtmLocalTables,&relid,HASH_ENTER,NULL);
1309+
MtmUnlock();
1310+
}
1311+
}
1312+
1313+
1314+
voidMtmMakeTableLocal(char*schema,char*name)
1315+
{
1316+
RangeVar*rv=makeRangeVar(schema,name,-1);
1317+
Oidrelid=RangeVarGetRelid(rv,NoLock, true);
1318+
MtmMakeRelationLocal(relid);
1319+
}
1320+
1321+
1322+
typedefstruct {
1323+
NameDataschema;
1324+
NameDataname;
1325+
}MtmLocalTablesTuple;
1326+
1327+
staticvoidMtmLoadLocalTables(void)
1328+
{
1329+
RangeVar*rv;
1330+
Relationrel;
1331+
SysScanDescscan;
1332+
HeapTupletuple;
1333+
1334+
Assert(IsTransactionState());
1335+
1336+
rv=makeRangeVar(MULTIMASTER_SCHEMA_NAME,MULTIMASTER_LOCAL_TABLES_TABLE,-1);
1337+
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
1338+
if (rel!=NULL) {
1339+
scan=systable_beginscan(rel,0, true,NULL,0,NULL);
1340+
1341+
while (HeapTupleIsValid(tuple=systable_getnext(scan)))
1342+
{
1343+
MtmLocalTablesTuple*t= (MtmLocalTablesTuple*)GETSTRUCT(tuple);
1344+
MtmMakeTableLocal(NameStr(t->schema),NameStr(t->name));
1345+
}
1346+
1347+
systable_endscan(scan);
1348+
heap_close(rel,RowExclusiveLock);
1349+
}
1350+
}
1351+
1352+
12831353
staticvoidMtmInitialize()
12841354
{
12851355
boolfound;
@@ -1309,6 +1379,7 @@ static void MtmInitialize()
13091379
Mtm->nReceivers=0;
13101380
Mtm->timeShift=0;
13111381
Mtm->transCount=0;
1382+
Mtm->localTablesHashLoaded= false;
13121383
for (i=0;i<MtmNodes;i++) {
13131384
Mtm->nodes[i].oldestSnapshot=0;
13141385
Mtm->nodes[i].transDelay=0;
@@ -1324,6 +1395,7 @@ static void MtmInitialize()
13241395
}
13251396
MtmXid2State=MtmCreateXidMap();
13261397
MtmGid2State=MtmCreateGidMap();
1398+
MtmLocalTables=MtmCreateLocalTableMap();
13271399
MtmDoReplication= true;
13281400
TM=&MtmTM;
13291401
LWLockRelease(AddinShmemInitLock);
@@ -1476,6 +1548,19 @@ _PG_init(void)
14761548
NULL
14771549
);
14781550

1551+
DefineCustomBoolVariable(
1552+
"multimaster.ignore_tables_without_pk",
1553+
"Do not replicate tables withpout primary key",
1554+
NULL,
1555+
&MtmIgnoreTablesWithoutPk,
1556+
false,
1557+
PGC_BACKEND,
1558+
0,
1559+
NULL,
1560+
NULL,
1561+
NULL
1562+
);
1563+
14791564
DefineCustomIntVariable(
14801565
"multimaster.workers",
14811566
"Number of multimaster executor workers per node",
@@ -1805,11 +1890,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18051890
returnres;
18061891
}
18071892

1893+
staticbool
1894+
MtmReplicationRowFilterHook(structPGLogicalRowFilterArgs*args)
1895+
{
1896+
boolisDistributed;
1897+
MtmLock(LW_SHARED);
1898+
if (!Mtm->localTablesHashLoaded) {
1899+
MtmUnlock();
1900+
MtmLock(LW_EXCLUSIVE);
1901+
if (!Mtm->localTablesHashLoaded) {
1902+
MtmLoadLocalTables();
1903+
Mtm->localTablesHashLoaded= true;
1904+
}
1905+
}
1906+
isDistributed=hash_search(MtmLocalTables,&RelationGetRelid(args->changed_rel),HASH_FIND,NULL)==NULL;
1907+
MtmUnlock();
1908+
returnisDistributed;
1909+
}
1910+
18081911
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
18091912
{
18101913
hooks->startup_hook=MtmReplicationStartupHook;
18111914
hooks->shutdown_hook=MtmReplicationShutdownHook;
18121915
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
1916+
hooks->row_filter_hook=MtmReplicationRowFilterHook;
18131917
}
18141918

18151919

@@ -1936,6 +2040,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
19362040
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
19372041
}
19382042

2043+
2044+
Datummtm_make_table_local(PG_FUNCTION_ARGS)
2045+
{
2046+
Oidreloid=PG_GETARG_OID(1);
2047+
RangeVar*rv;
2048+
Relationrel;
2049+
TupleDesctupDesc;
2050+
HeapTupletup;
2051+
Datumvalues[Natts_mtm_local_tables];
2052+
boolnulls[Natts_mtm_local_tables];
2053+
2054+
MtmMakeRelationLocal(reloid);
2055+
2056+
rv=makeRangeVar(MULTIMASTER_SCHEMA_NAME,MULTIMASTER_LOCAL_TABLES_TABLE,-1);
2057+
rel=heap_openrv(rv,RowExclusiveLock);
2058+
if (rel!=NULL) {
2059+
char*tableName=get_rel_name(reloid);
2060+
Oidschemaid=get_rel_namespace(reloid);
2061+
char*schemaName=get_namespace_name(schemaid);
2062+
2063+
tupDesc=RelationGetDescr(rel);
2064+
2065+
/* Form a tuple. */
2066+
memset(nulls, false,sizeof(nulls));
2067+
2068+
values[Anum_mtm_local_tables_rel_schema-1]=CStringGetTextDatum(schemaName);
2069+
values[Anum_mtm_local_tables_rel_name-1]=CStringGetTextDatum(tableName);
2070+
2071+
tup=heap_form_tuple(tupDesc,values,nulls);
2072+
2073+
/* Insert the tuple to the catalog. */
2074+
simple_heap_insert(rel,tup);
2075+
2076+
/* Update the indexes. */
2077+
CatalogUpdateIndexes(rel,tup);
2078+
2079+
/* Cleanup. */
2080+
heap_freetuple(tup);
2081+
heap_close(rel,RowExclusiveLock);
2082+
2083+
MtmTx.containsDML= true;
2084+
}
2085+
return false;
2086+
}
2087+
2088+
19392089
/*
19402090
* -------------------------------------------
19412091
* Broadcast utulity statements
@@ -2248,10 +2398,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22482398
if (estate->es_processed!=0&& (operation==CMD_INSERT||operation==CMD_UPDATE||operation==CMD_DELETE)) {
22492399
inti;
22502400
for (i=0;i<estate->es_num_result_relations;i++) {
2251-
if (RelationNeedsWAL(estate->es_result_relations[i].ri_RelationDesc)) {
2401+
Relationrel=estate->es_result_relations[i].ri_RelationDesc;
2402+
if (RelationNeedsWAL(rel)) {
22522403
MtmTx.containsDML= true;
22532404
break;
22542405
}
2406+
if (MtmIgnoreTablesWithoutPk) {
2407+
if (!rel->rd_indexvalid) {
2408+
RelationGetIndexList(rel);
2409+
}
2410+
MtmMakeRelationLocal(rel->rd_replidindex);
2411+
}
22552412
}
22562413
}
22572414
}

‎contrib/mmts/multimaster.h‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
#defineMULTIMASTER_NAME "multimaster"
2020
#defineMULTIMASTER_SCHEMA_NAME "mtm"
2121
#defineMULTIMASTER_DDL_TABLE "ddl_log"
22+
#defineMULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
2223
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
2324
#defineMULTIMASTER_MIN_PROTO_VERSION 1
2425
#defineMULTIMASTER_MAX_PROTO_VERSION 1
2526
#defineMULTIMASTER_MAX_GID_SIZE 32
2627
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE 16
2728
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
2829
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
30+
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
2931
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
3032
#defineMULTIMASTER_ADMIN "mtm_admin"
3133

@@ -35,6 +37,10 @@
3537
#defineAnum_mtm_ddl_log_issued1
3638
#defineAnum_mtm_ddl_log_query2
3739

40+
#defineNatts_mtm_local_tables 2
41+
#defineAnum_mtm_local_tables_rel_schema 1
42+
#defineAnum_mtm_local_tables_rel_name 2
43+
3844
typedefuint64csn_t;/* commit serial number */
3945
#defineINVALID_CSN ((csn_t)-1)
4046

@@ -135,6 +141,7 @@ typedef struct
135141
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
136142
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
137143

144+
boollocalTablesHashLoaded;/* Whether data from local_tables table is loaded in shared memory hash table */
138145
intnNodes;/* Number of active nodes */
139146
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
140147
intnLockers;/* Number of lockers */
@@ -208,4 +215,6 @@ extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
208215
externvoidMtmCheckQuorum(void);
209216
externboolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN);
210217
externvoidMtmRecoveryCompleted(void);
218+
externvoidMtmMakeTableLocal(char*schema,char*name);
219+
211220
#endif

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,10 @@ process_remote_insert(StringInfo s, Relation rel)
679679
if (rc!=SPI_OK_UTILITY) {
680680
elog(ERROR,"Failed to execute utility statement %s",ddl);
681681
}
682+
}elseif (strcmp(relname,MULTIMASTER_LOCAL_TABLES_TABLE)==0) {
683+
char*schema=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
684+
char*name=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
685+
MtmMakeTableLocal(schema,name);
682686
}
683687

684688
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp