@@ -92,6 +92,10 @@ typedef struct SubOpts
9292}SubOpts ;
9393
9494static List * fetch_table_list (WalReceiverConn * wrconn ,List * publications );
95+ static void check_publications_origin (WalReceiverConn * wrconn ,
96+ List * publications ,bool copydata ,
97+ char * origin ,Oid * subrel_local_oids ,
98+ int subrel_count ,char * subname );
9599static void check_duplicates_in_publist (List * publist ,Datum * datums );
96100static List * merge_publications (List * oldpublist ,List * newpublist ,bool addpub ,const char * subname );
97101static void ReportSlotConnectionError (List * rstates ,Oid subid ,char * slotname ,char * err );
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
680684PG_TRY ();
681685{
682686check_publications (wrconn ,publications );
687+ check_publications_origin (wrconn ,publications ,opts .copy_data ,
688+ opts .origin ,NULL ,0 ,stmt -> subname );
683689
684690/*
685691 * Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
786792ListCell * lc ;
787793int off ;
788794int remove_rel_len ;
795+ int subrel_count ;
789796Relation rel = NULL ;
790797typedef struct SubRemoveRels
791798{
@@ -815,28 +822,33 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
815822
816823/* Get local table list. */
817824subrel_states = GetSubscriptionRelations (sub -> oid , false);
825+ subrel_count = list_length (subrel_states );
818826
819827/*
820828 * Build qsorted array of local table oids for faster lookup. This can
821829 * potentially contain all tables in the database so speed of lookup
822830 * is important.
823831 */
824- subrel_local_oids = palloc (list_length ( subrel_states ) * sizeof (Oid ));
832+ subrel_local_oids = palloc (subrel_count * sizeof (Oid ));
825833off = 0 ;
826834foreach (lc ,subrel_states )
827835{
828836SubscriptionRelState * relstate = (SubscriptionRelState * )lfirst (lc );
829837
830838subrel_local_oids [off ++ ]= relstate -> relid ;
831839}
832- qsort (subrel_local_oids ,list_length ( subrel_states ) ,
840+ qsort (subrel_local_oids ,subrel_count ,
833841sizeof (Oid ),oid_cmp );
834842
843+ check_publications_origin (wrconn ,sub -> publications ,copy_data ,
844+ sub -> origin ,subrel_local_oids ,
845+ subrel_count ,sub -> name );
846+
835847/*
836848 * Rels that we want to remove from subscription and drop any slots
837849 * and origins corresponding to them.
838850 */
839- sub_remove_rels = palloc (list_length ( subrel_states ) * sizeof (SubRemoveRels ));
851+ sub_remove_rels = palloc (subrel_count * sizeof (SubRemoveRels ));
840852
841853/*
842854 * Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
862874pubrel_local_oids [off ++ ]= relid ;
863875
864876if (!bsearch (& relid ,subrel_local_oids ,
865- list_length ( subrel_states ) ,sizeof (Oid ),oid_cmp ))
877+ subrel_count ,sizeof (Oid ),oid_cmp ))
866878{
867879AddSubscriptionRelState (sub -> oid ,relid ,
868880copy_data ?SUBREL_STATE_INIT :SUBREL_STATE_READY ,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
881893sizeof (Oid ),oid_cmp );
882894
883895remove_rel_len = 0 ;
884- for (off = 0 ;off < list_length ( subrel_states ) ;off ++ )
896+ for (off = 0 ;off < subrel_count ;off ++ )
885897{
886898Oid relid = subrel_local_oids [off ];
887899
@@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
17841796table_close (rel ,RowExclusiveLock );
17851797}
17861798
1799+ /*
1800+ * Check and log a warning if the publisher has subscribed to the same table
1801+ * from some other publisher. This check is required only if "copy_data = true"
1802+ * and "origin = none" for CREATE SUBSCRIPTION and
1803+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1804+ * having origin might have been copied.
1805+ *
1806+ * This check need not be performed on the tables that are already added
1807+ * because incremental sync for those tables will happen through WAL and the
1808+ * origin of the data can be identified from the WAL records.
1809+ *
1810+ * subrel_local_oids contains the list of relation oids that are already
1811+ * present on the subscriber.
1812+ */
1813+ static void
1814+ check_publications_origin (WalReceiverConn * wrconn ,List * publications ,
1815+ bool copydata ,char * origin ,Oid * subrel_local_oids ,
1816+ int subrel_count ,char * subname )
1817+ {
1818+ WalRcvExecResult * res ;
1819+ StringInfoData cmd ;
1820+ TupleTableSlot * slot ;
1821+ Oid tableRow [1 ]= {TEXTOID };
1822+ List * publist = NIL ;
1823+ int i ;
1824+
1825+ if (!copydata || !origin ||
1826+ (pg_strcasecmp (origin ,LOGICALREP_ORIGIN_NONE )!= 0 ))
1827+ return ;
1828+
1829+ initStringInfo (& cmd );
1830+ appendStringInfoString (& cmd ,
1831+ "SELECT DISTINCT P.pubname AS pubname\n"
1832+ "FROM pg_publication P,\n"
1833+ " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1834+ " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1835+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1836+ "WHERE C.oid = GPT.relid AND P.pubname IN (" );
1837+ get_publications_str (publications ,& cmd , true);
1838+ appendStringInfoString (& cmd ,")\n" );
1839+
1840+ /*
1841+ * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1842+ * the list of relation oids that are already present on the subscriber.
1843+ * This check should be skipped for these tables.
1844+ */
1845+ for (i = 0 ;i < subrel_count ;i ++ )
1846+ {
1847+ Oid relid = subrel_local_oids [i ];
1848+ char * schemaname = get_namespace_name (get_rel_namespace (relid ));
1849+ char * tablename = get_rel_name (relid );
1850+
1851+ appendStringInfo (& cmd ,"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n" ,
1852+ schemaname ,tablename );
1853+ }
1854+
1855+ res = walrcv_exec (wrconn ,cmd .data ,1 ,tableRow );
1856+ pfree (cmd .data );
1857+
1858+ if (res -> status != WALRCV_OK_TUPLES )
1859+ ereport (ERROR ,
1860+ (errcode (ERRCODE_CONNECTION_FAILURE ),
1861+ errmsg ("could not receive list of replicated tables from the publisher: %s" ,
1862+ res -> err )));
1863+
1864+ /* Process tables. */
1865+ slot = MakeSingleTupleTableSlot (res -> tupledesc ,& TTSOpsMinimalTuple );
1866+ while (tuplestore_gettupleslot (res -> tuplestore , true, false,slot ))
1867+ {
1868+ char * pubname ;
1869+ bool isnull ;
1870+
1871+ pubname = TextDatumGetCString (slot_getattr (slot ,1 ,& isnull ));
1872+ Assert (!isnull );
1873+
1874+ ExecClearTuple (slot );
1875+ publist = list_append_unique (publist ,makeString (pubname ));
1876+ }
1877+
1878+ /*
1879+ * Log a warning if the publisher has subscribed to the same table from
1880+ * some other publisher. We cannot know the origin of data during the
1881+ * initial sync. Data origins can be found only from the WAL by looking at
1882+ * the origin id.
1883+ *
1884+ * XXX: For simplicity, we don't check whether the table has any data or
1885+ * not. If the table doesn't have any data then we don't need to
1886+ * distinguish between data having origin and data not having origin so we
1887+ * can avoid logging a warning in that case.
1888+ */
1889+ if (publist )
1890+ {
1891+ StringInfo pubnames = makeStringInfo ();
1892+
1893+ /* Prepare the list of publication(s) for warning message. */
1894+ get_publications_str (publist ,pubnames , false);
1895+ ereport (WARNING ,
1896+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1897+ errmsg ("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin" ,
1898+ subname ),
1899+ errdetail_plural ("Subscribed publication %s is subscribing to other publications." ,
1900+ "Subscribed publications %s are subscribing to other publications." ,
1901+ list_length (publist ),pubnames -> data ),
1902+ errhint ("Verify that initial data copied from the publisher tables did not come from other origins." ));
1903+ }
1904+
1905+ ExecDropSingleTupleTableSlot (slot );
1906+
1907+ walrcv_clear_result (res );
1908+ }
1909+
17871910/*
17881911 * Get the list of tables which belong to specified publications on the
17891912 * publisher connection.