4545#include "storage/pmsignal.h"
4646#include "storage/proc.h"
4747#include "utils/syscache.h"
48+ #include "utils/lsyscache.h"
4849#include "replication/walsender.h"
4950#include "replication/walsender_private.h"
5051#include "replication/slot.h"
5354#include "nodes/makefuncs.h"
5455#include "access/htup_details.h"
5556#include "catalog/indexing.h"
57+ #include "catalog/namespace.h"
5658#include "pglogical_output/hooks.h"
5759
5860#include "multimaster.h"
@@ -104,6 +106,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
104106PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
105107PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
106108PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
109+ PG_FUNCTION_INFO_V1 (mtm_make_table_local );
107110
108111static Snapshot MtmGetSnapshot (Snapshot snapshot );
109112static void MtmInitialize (void );
@@ -134,6 +137,7 @@ MtmState* Mtm;
134137
135138HTAB * MtmXid2State ;
136139static HTAB * MtmGid2State ;
140+ static HTAB * MtmLocalTables ;
137141
138142static MtmCurrentTrans MtmTx ;
139143
@@ -175,11 +179,12 @@ bool MtmUseRaftable;
175179MtmConnectionInfo * MtmConnections ;
176180
177181static char * MtmConnStrs ;
178- static int MtmQueueSize ;
179- static int MtmWorkers ;
180- static int MtmVacuumDelay ;
181- static int MtmMinRecoveryLag ;
182- static int MtmMaxRecoveryLag ;
182+ static int MtmQueueSize ;
183+ static int MtmWorkers ;
184+ static int MtmVacuumDelay ;
185+ static int MtmMinRecoveryLag ;
186+ static int MtmMaxRecoveryLag ;
187+ static bool MtmIgnoreTablesWithoutPk ;
183188
184189static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
185190static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1279,6 +1284,71 @@ MtmCreateGidMap(void)
12791284return htab ;
12801285}
12811286
1287+ static HTAB *
1288+ MtmCreateLocalTableMap (void )
1289+ {
1290+ HASHCTL info ;
1291+ HTAB * htab ;
1292+ memset (& info ,0 ,sizeof (info ));
1293+ info .keysize = sizeof (Oid );
1294+ htab = ShmemInitHash (
1295+ "MtmLocalTables" ,
1296+ MULTIMASTER_MAX_LOCAL_TABLES ,MULTIMASTER_MAX_LOCAL_TABLES ,
1297+ & info ,
1298+ 0
1299+ );
1300+ return htab ;
1301+ }
1302+
1303+ static void MtmMakeRelationLocal (Oid relid )
1304+ {
1305+ if (OidIsValid (relid )) {
1306+ MtmLock (LW_EXCLUSIVE );
1307+ hash_search (MtmLocalTables ,& relid ,HASH_ENTER ,NULL );
1308+ MtmUnlock ();
1309+ }
1310+ }
1311+
1312+
1313+ void MtmMakeTableLocal (char * schema ,char * name )
1314+ {
1315+ RangeVar * rv = makeRangeVar (schema ,name ,-1 );
1316+ Oid relid = RangeVarGetRelid (rv ,NoLock , true);
1317+ MtmMakeRelationLocal (relid );
1318+ }
1319+
1320+
1321+ typedef struct {
1322+ NameData schema ;
1323+ NameData name ;
1324+ }MtmLocalTablesTuple ;
1325+
1326+ static void MtmLoadLocalTables (void )
1327+ {
1328+ RangeVar * rv ;
1329+ Relation rel ;
1330+ SysScanDesc scan ;
1331+ HeapTuple tuple ;
1332+
1333+ Assert (IsTransactionState ());
1334+
1335+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME ,MULTIMASTER_LOCAL_TABLES_TABLE ,-1 );
1336+ rel = heap_openrv_extended (rv ,RowExclusiveLock , true);
1337+ if (rel != NULL ) {
1338+ scan = systable_beginscan (rel ,0 , true,NULL ,0 ,NULL );
1339+
1340+ while (HeapTupleIsValid (tuple = systable_getnext (scan )))
1341+ {
1342+ MtmLocalTablesTuple * t = (MtmLocalTablesTuple * )GETSTRUCT (tuple );
1343+ MtmMakeTableLocal (NameStr (t -> schema ),NameStr (t -> name ));
1344+ }
1345+
1346+ systable_endscan (scan );
1347+ heap_close (rel ,RowExclusiveLock );
1348+ }
1349+ }
1350+
1351+
12821352static void MtmInitialize ()
12831353{
12841354bool found ;
@@ -1308,6 +1378,7 @@ static void MtmInitialize()
13081378Mtm -> nReceivers = 0 ;
13091379Mtm -> timeShift = 0 ;
13101380Mtm -> transCount = 0 ;
1381+ Mtm -> localTablesHashLoaded = false;
13111382for (i = 0 ;i < MtmNodes ;i ++ ) {
13121383Mtm -> nodes [i ].oldestSnapshot = 0 ;
13131384Mtm -> nodes [i ].transDelay = 0 ;
@@ -1323,6 +1394,7 @@ static void MtmInitialize()
13231394}
13241395MtmXid2State = MtmCreateXidMap ();
13251396MtmGid2State = MtmCreateGidMap ();
1397+ MtmLocalTables = MtmCreateLocalTableMap ();
13261398MtmDoReplication = true;
13271399TM = & MtmTM ;
13281400LWLockRelease (AddinShmemInitLock );
@@ -1475,6 +1547,19 @@ _PG_init(void)
14751547NULL
14761548);
14771549
1550+ DefineCustomBoolVariable (
1551+ "multimaster.ignore_tables_without_pk" ,
1552+ "Do not replicate tables withpout primary key" ,
1553+ NULL ,
1554+ & MtmIgnoreTablesWithoutPk ,
1555+ false,
1556+ PGC_BACKEND ,
1557+ 0 ,
1558+ NULL ,
1559+ NULL ,
1560+ NULL
1561+ );
1562+
14781563DefineCustomIntVariable (
14791564"multimaster.workers" ,
14801565"Number of multimaster executor workers per node" ,
@@ -1804,11 +1889,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18041889return res ;
18051890}
18061891
1892+ static bool
1893+ MtmReplicationRowFilterHook (struct PGLogicalRowFilterArgs * args )
1894+ {
1895+ bool isDistributed ;
1896+ MtmLock (LW_SHARED );
1897+ if (!Mtm -> localTablesHashLoaded ) {
1898+ MtmUnlock ();
1899+ MtmLock (LW_EXCLUSIVE );
1900+ if (!Mtm -> localTablesHashLoaded ) {
1901+ MtmLoadLocalTables ();
1902+ Mtm -> localTablesHashLoaded = true;
1903+ }
1904+ }
1905+ isDistributed = hash_search (MtmLocalTables ,& RelationGetRelid (args -> changed_rel ),HASH_FIND ,NULL )== NULL ;
1906+ MtmUnlock ();
1907+ return isDistributed ;
1908+ }
1909+
18071910void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
18081911{
18091912hooks -> startup_hook = MtmReplicationStartupHook ;
18101913hooks -> shutdown_hook = MtmReplicationShutdownHook ;
18111914hooks -> txn_filter_hook = MtmReplicationTxnFilterHook ;
1915+ hooks -> row_filter_hook = MtmReplicationRowFilterHook ;
18121916}
18131917
18141918
@@ -1935,6 +2039,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
19352039PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc ,values ,nulls )));
19362040}
19372041
2042+
2043+ Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2044+ {
2045+ Oid reloid = PG_GETARG_OID (1 );
2046+ RangeVar * rv ;
2047+ Relation rel ;
2048+ TupleDesc tupDesc ;
2049+ HeapTuple tup ;
2050+ Datum values [Natts_mtm_local_tables ];
2051+ bool nulls [Natts_mtm_local_tables ];
2052+
2053+ MtmMakeRelationLocal (reloid );
2054+
2055+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME ,MULTIMASTER_LOCAL_TABLES_TABLE ,-1 );
2056+ rel = heap_openrv (rv ,RowExclusiveLock );
2057+ if (rel != NULL ) {
2058+ char * tableName = get_rel_name (reloid );
2059+ Oid schemaid = get_rel_namespace (reloid );
2060+ char * schemaName = get_namespace_name (schemaid );
2061+
2062+ tupDesc = RelationGetDescr (rel );
2063+
2064+ /* Form a tuple. */
2065+ memset (nulls , false,sizeof (nulls ));
2066+
2067+ values [Anum_mtm_local_tables_rel_schema - 1 ]= CStringGetTextDatum (schemaName );
2068+ values [Anum_mtm_local_tables_rel_name - 1 ]= CStringGetTextDatum (tableName );
2069+
2070+ tup = heap_form_tuple (tupDesc ,values ,nulls );
2071+
2072+ /* Insert the tuple to the catalog. */
2073+ simple_heap_insert (rel ,tup );
2074+
2075+ /* Update the indexes. */
2076+ CatalogUpdateIndexes (rel ,tup );
2077+
2078+ /* Cleanup. */
2079+ heap_freetuple (tup );
2080+ heap_close (rel ,RowExclusiveLock );
2081+
2082+ MtmTx .containsDML = true;
2083+ }
2084+ return false;
2085+ }
2086+
2087+
19382088/*
19392089 * -------------------------------------------
19402090 * Broadcast utulity statements
@@ -2243,10 +2393,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22432393if (estate -> es_processed != 0 && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE )) {
22442394int i ;
22452395for (i = 0 ;i < estate -> es_num_result_relations ;i ++ ) {
2246- if (RelationNeedsWAL (estate -> es_result_relations [i ].ri_RelationDesc )) {
2396+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
2397+ if (RelationNeedsWAL (rel )) {
22472398MtmTx .containsDML = true;
22482399break ;
22492400}
2401+ if (MtmIgnoreTablesWithoutPk ) {
2402+ if (!rel -> rd_indexvalid ) {
2403+ RelationGetIndexList (rel );
2404+ }
2405+ MtmMakeRelationLocal (rel -> rd_replidindex );
2406+ }
22502407}
22512408 }
22522409if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {