Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd4bcc33

Browse files
knizhnikkelvich
authored andcommitted
Add local tables hash
1 parent4cb7f37 commitd4bcc33

File tree

4 files changed

+183
-6
lines changed

4 files changed

+183
-6
lines changed

‎multimaster--1.0.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,11 @@ CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATEFUNCTIONmtm.make_table_local(relation regclass) RETURNS void
40+
AS'MODULE_PATHNAME','mtm_make_table_local'
41+
LANGUAGE C;
42+
3943
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);
44+
45+
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key pk(rel_schema, rel_name));
46+

‎multimaster.c

Lines changed: 163 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include"storage/pmsignal.h"
4646
#include"storage/proc.h"
4747
#include"utils/syscache.h"
48+
#include"utils/lsyscache.h"
4849
#include"replication/walsender.h"
4950
#include"replication/walsender_private.h"
5051
#include"replication/slot.h"
@@ -53,6 +54,7 @@
5354
#include"nodes/makefuncs.h"
5455
#include"access/htup_details.h"
5556
#include"catalog/indexing.h"
57+
#include"catalog/namespace.h"
5658
#include"pglogical_output/hooks.h"
5759

5860
#include"multimaster.h"
@@ -104,6 +106,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
104106
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
105107
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
106108
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
109+
PG_FUNCTION_INFO_V1(mtm_make_table_local);
107110

108111
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
109112
staticvoidMtmInitialize(void);
@@ -134,6 +137,7 @@ MtmState* Mtm;
134137

135138
HTAB*MtmXid2State;
136139
staticHTAB*MtmGid2State;
140+
staticHTAB*MtmLocalTables;
137141

138142
staticMtmCurrentTransMtmTx;
139143

@@ -175,11 +179,12 @@ bool MtmUseRaftable;
175179
MtmConnectionInfo*MtmConnections;
176180

177181
staticchar*MtmConnStrs;
178-
staticintMtmQueueSize;
179-
staticintMtmWorkers;
180-
staticintMtmVacuumDelay;
181-
staticintMtmMinRecoveryLag;
182-
staticintMtmMaxRecoveryLag;
182+
staticintMtmQueueSize;
183+
staticintMtmWorkers;
184+
staticintMtmVacuumDelay;
185+
staticintMtmMinRecoveryLag;
186+
staticintMtmMaxRecoveryLag;
187+
staticboolMtmIgnoreTablesWithoutPk;
183188

184189
staticExecutorFinish_hook_typePreviousExecutorFinishHook;
185190
staticProcessUtility_hook_typePreviousProcessUtilityHook;
@@ -1279,6 +1284,71 @@ MtmCreateGidMap(void)
12791284
returnhtab;
12801285
}
12811286

1287+
staticHTAB*
1288+
MtmCreateLocalTableMap(void)
1289+
{
1290+
HASHCTLinfo;
1291+
HTAB*htab;
1292+
memset(&info,0,sizeof(info));
1293+
info.keysize=sizeof(Oid);
1294+
htab=ShmemInitHash(
1295+
"MtmLocalTables",
1296+
MULTIMASTER_MAX_LOCAL_TABLES,MULTIMASTER_MAX_LOCAL_TABLES,
1297+
&info,
1298+
0
1299+
);
1300+
returnhtab;
1301+
}
1302+
1303+
staticvoidMtmMakeRelationLocal(Oidrelid)
1304+
{
1305+
if (OidIsValid(relid)) {
1306+
MtmLock(LW_EXCLUSIVE);
1307+
hash_search(MtmLocalTables,&relid,HASH_ENTER,NULL);
1308+
MtmUnlock();
1309+
}
1310+
}
1311+
1312+
1313+
voidMtmMakeTableLocal(char*schema,char*name)
1314+
{
1315+
RangeVar*rv=makeRangeVar(schema,name,-1);
1316+
Oidrelid=RangeVarGetRelid(rv,NoLock, true);
1317+
MtmMakeRelationLocal(relid);
1318+
}
1319+
1320+
1321+
typedefstruct {
1322+
NameDataschema;
1323+
NameDataname;
1324+
}MtmLocalTablesTuple;
1325+
1326+
staticvoidMtmLoadLocalTables(void)
1327+
{
1328+
RangeVar*rv;
1329+
Relationrel;
1330+
SysScanDescscan;
1331+
HeapTupletuple;
1332+
1333+
Assert(IsTransactionState());
1334+
1335+
rv=makeRangeVar(MULTIMASTER_SCHEMA_NAME,MULTIMASTER_LOCAL_TABLES_TABLE,-1);
1336+
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
1337+
if (rel!=NULL) {
1338+
scan=systable_beginscan(rel,0, true,NULL,0,NULL);
1339+
1340+
while (HeapTupleIsValid(tuple=systable_getnext(scan)))
1341+
{
1342+
MtmLocalTablesTuple*t= (MtmLocalTablesTuple*)GETSTRUCT(tuple);
1343+
MtmMakeTableLocal(NameStr(t->schema),NameStr(t->name));
1344+
}
1345+
1346+
systable_endscan(scan);
1347+
heap_close(rel,RowExclusiveLock);
1348+
}
1349+
}
1350+
1351+
12821352
staticvoidMtmInitialize()
12831353
{
12841354
boolfound;
@@ -1308,6 +1378,7 @@ static void MtmInitialize()
13081378
Mtm->nReceivers=0;
13091379
Mtm->timeShift=0;
13101380
Mtm->transCount=0;
1381+
Mtm->localTablesHashLoaded= false;
13111382
for (i=0;i<MtmNodes;i++) {
13121383
Mtm->nodes[i].oldestSnapshot=0;
13131384
Mtm->nodes[i].transDelay=0;
@@ -1323,6 +1394,7 @@ static void MtmInitialize()
13231394
}
13241395
MtmXid2State=MtmCreateXidMap();
13251396
MtmGid2State=MtmCreateGidMap();
1397+
MtmLocalTables=MtmCreateLocalTableMap();
13261398
MtmDoReplication= true;
13271399
TM=&MtmTM;
13281400
LWLockRelease(AddinShmemInitLock);
@@ -1475,6 +1547,19 @@ _PG_init(void)
14751547
NULL
14761548
);
14771549

1550+
DefineCustomBoolVariable(
1551+
"multimaster.ignore_tables_without_pk",
1552+
"Do not replicate tables withpout primary key",
1553+
NULL,
1554+
&MtmIgnoreTablesWithoutPk,
1555+
false,
1556+
PGC_BACKEND,
1557+
0,
1558+
NULL,
1559+
NULL,
1560+
NULL
1561+
);
1562+
14781563
DefineCustomIntVariable(
14791564
"multimaster.workers",
14801565
"Number of multimaster executor workers per node",
@@ -1804,11 +1889,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18041889
returnres;
18051890
}
18061891

1892+
staticbool
1893+
MtmReplicationRowFilterHook(structPGLogicalRowFilterArgs*args)
1894+
{
1895+
boolisDistributed;
1896+
MtmLock(LW_SHARED);
1897+
if (!Mtm->localTablesHashLoaded) {
1898+
MtmUnlock();
1899+
MtmLock(LW_EXCLUSIVE);
1900+
if (!Mtm->localTablesHashLoaded) {
1901+
MtmLoadLocalTables();
1902+
Mtm->localTablesHashLoaded= true;
1903+
}
1904+
}
1905+
isDistributed=hash_search(MtmLocalTables,&RelationGetRelid(args->changed_rel),HASH_FIND,NULL)==NULL;
1906+
MtmUnlock();
1907+
returnisDistributed;
1908+
}
1909+
18071910
voidMtmSetupReplicationHooks(structPGLogicalHooks*hooks)
18081911
{
18091912
hooks->startup_hook=MtmReplicationStartupHook;
18101913
hooks->shutdown_hook=MtmReplicationShutdownHook;
18111914
hooks->txn_filter_hook=MtmReplicationTxnFilterHook;
1915+
hooks->row_filter_hook=MtmReplicationRowFilterHook;
18121916
}
18131917

18141918

@@ -1935,6 +2039,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
19352039
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc,values,nulls)));
19362040
}
19372041

2042+
2043+
Datummtm_make_table_local(PG_FUNCTION_ARGS)
2044+
{
2045+
Oidreloid=PG_GETARG_OID(1);
2046+
RangeVar*rv;
2047+
Relationrel;
2048+
TupleDesctupDesc;
2049+
HeapTupletup;
2050+
Datumvalues[Natts_mtm_local_tables];
2051+
boolnulls[Natts_mtm_local_tables];
2052+
2053+
MtmMakeRelationLocal(reloid);
2054+
2055+
rv=makeRangeVar(MULTIMASTER_SCHEMA_NAME,MULTIMASTER_LOCAL_TABLES_TABLE,-1);
2056+
rel=heap_openrv(rv,RowExclusiveLock);
2057+
if (rel!=NULL) {
2058+
char*tableName=get_rel_name(reloid);
2059+
Oidschemaid=get_rel_namespace(reloid);
2060+
char*schemaName=get_namespace_name(schemaid);
2061+
2062+
tupDesc=RelationGetDescr(rel);
2063+
2064+
/* Form a tuple. */
2065+
memset(nulls, false,sizeof(nulls));
2066+
2067+
values[Anum_mtm_local_tables_rel_schema-1]=CStringGetTextDatum(schemaName);
2068+
values[Anum_mtm_local_tables_rel_name-1]=CStringGetTextDatum(tableName);
2069+
2070+
tup=heap_form_tuple(tupDesc,values,nulls);
2071+
2072+
/* Insert the tuple to the catalog. */
2073+
simple_heap_insert(rel,tup);
2074+
2075+
/* Update the indexes. */
2076+
CatalogUpdateIndexes(rel,tup);
2077+
2078+
/* Cleanup. */
2079+
heap_freetuple(tup);
2080+
heap_close(rel,RowExclusiveLock);
2081+
2082+
MtmTx.containsDML= true;
2083+
}
2084+
return false;
2085+
}
2086+
2087+
19382088
/*
19392089
* -------------------------------------------
19402090
* Broadcast utulity statements
@@ -2243,10 +2393,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22432393
if (estate->es_processed!=0&& (operation==CMD_INSERT||operation==CMD_UPDATE||operation==CMD_DELETE)) {
22442394
inti;
22452395
for (i=0;i<estate->es_num_result_relations;i++) {
2246-
if (RelationNeedsWAL(estate->es_result_relations[i].ri_RelationDesc)) {
2396+
Relationrel=estate->es_result_relations[i].ri_RelationDesc;
2397+
if (RelationNeedsWAL(rel)) {
22472398
MtmTx.containsDML= true;
22482399
break;
22492400
}
2401+
if (MtmIgnoreTablesWithoutPk) {
2402+
if (!rel->rd_indexvalid) {
2403+
RelationGetIndexList(rel);
2404+
}
2405+
MtmMakeRelationLocal(rel->rd_replidindex);
2406+
}
22502407
}
22512408
}
22522409
if (MtmTx.isDistributed&&MtmTx.containsDML&& !IsTransactionBlock()) {

‎multimaster.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
#defineMULTIMASTER_NAME "multimaster"
2020
#defineMULTIMASTER_SCHEMA_NAME "mtm"
2121
#defineMULTIMASTER_DDL_TABLE "ddl_log"
22+
#defineMULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
2223
#defineMULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
2324
#defineMULTIMASTER_MIN_PROTO_VERSION 1
2425
#defineMULTIMASTER_MAX_PROTO_VERSION 1
2526
#defineMULTIMASTER_MAX_GID_SIZE 32
2627
#defineMULTIMASTER_MAX_SLOT_NAME_SIZE 16
2728
#defineMULTIMASTER_MAX_CONN_STR_SIZE 128
2829
#defineMULTIMASTER_MAX_HOST_NAME_SIZE 64
30+
#defineMULTIMASTER_MAX_LOCAL_TABLES 256
2931
#defineMULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
3032
#defineMULTIMASTER_ADMIN "mtm_admin"
3133

@@ -35,6 +37,10 @@
3537
#defineAnum_mtm_ddl_log_issued1
3638
#defineAnum_mtm_ddl_log_query2
3739

40+
#defineNatts_mtm_local_tables 2
41+
#defineAnum_mtm_local_tables_rel_schema 1
42+
#defineAnum_mtm_local_tables_rel_name 2
43+
3844
typedefuint64csn_t;/* commit serial number */
3945
#defineINVALID_CSN ((csn_t)-1)
4046

@@ -135,6 +141,7 @@ typedef struct
135141
nodemask_tnodeLockerMask;/* Mask of node IDs which WAL-senders are locking the cluster */
136142
nodemask_treconnectMask;/* Mask of nodes connection to which has to be reestablished by sender */
137143

144+
boollocalTablesHashLoaded;/* Whether data from local_tables table is loaded in shared memory hash table */
138145
intnNodes;/* Number of active nodes */
139146
intnReceivers;/* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
140147
intnLockers;/* Number of lockers */
@@ -208,4 +215,6 @@ extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
208215
externvoidMtmCheckQuorum(void);
209216
externboolMtmRecoveryCaughtUp(intnodeId,XLogRecPtrslotLSN);
210217
externvoidMtmRecoveryCompleted(void);
218+
externvoidMtmMakeTableLocal(char*schema,char*name);
219+
211220
#endif

‎pglogical_apply.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ process_remote_insert(StringInfo s, Relation rel)
677677
if (rc!=SPI_OK_UTILITY) {
678678
elog(ERROR,"Failed to execute utility statement %s",ddl);
679679
}
680+
}elseif (strcmp(relname,MULTIMASTER_LOCAL_TABLES_TABLE)==0) {
681+
char*schema=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
682+
char*name=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
683+
MtmMakeTableLocal(schema,name);
680684
}
681685

682686
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp