@@ -190,7 +190,7 @@ def catchup(self, username=None):
190190pub_lsn = self .pub .node .execute (query = "select pg_current_wal_lsn()" ,
191191dbname = None ,
192192username = None )[0 ][0 ]# yapf: disable
193- # create dummy xact
193+ # create dummy xact, as LR replicates only on commit.
194194self .pub .node .execute (query = "select txid_current()" ,dbname = None ,username = None )
195195query = """
196196 select '{}'::pg_lsn - replay_lsn <= 0
@@ -203,5 +203,17 @@ def catchup(self, username=None):
203203dbname = self .pub .dbname ,
204204username = username or self .pub .username ,
205205max_attempts = LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS )
206+
207+ # Now, wait until there are no tablesync workers: probably
208+ # replay_lsn above was sent with changes of new tables just skipped;
209+ # they will be eaten by tablesync workers.
210+ query = """
211+ select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'
212+ """
213+ self .node .poll_query_until (
214+ query = query ,
215+ dbname = self .pub .dbname ,
216+ username = username or self .pub .username ,
217+ max_attempts = LOGICAL_REPL_MAX_CATCHUP_ATTEMPTS )
206218except Exception as e :
207219raise_from (CatchUpException ("Failed to catch up" ,query ),e )