Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit66b84fa

Browse files
committed
Receive invalidation messages correctly in tablesync worker
We didn't accept any invalidation messages until the whole sync processhad finished (because it flattens all the remote transactions in thesingle one). So the sync worker didn't learn about subscriptionchanges/drop until it has finished. This could lead to "orphaned" syncworkers.Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
1 parent3c9bc21 commit66b84fa

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

‎src/backend/replication/logical/worker.c‎

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
118118

119119
staticvoidstore_flush_position(XLogRecPtrremote_lsn);
120120

121-
staticvoidreread_subscription(void);
121+
staticvoidmaybe_reread_subscription(void);
122122

123123
/* Flags set by signal handlers */
124124
staticvolatilesig_atomic_tgot_SIGHUP= false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
165165

166166
StartTransactionCommand();
167167

168-
if (!MySubscriptionValid)
169-
reread_subscription();
168+
maybe_reread_subscription();
170169

171170
MemoryContextSwitchTo(ApplyMessageContext);
172171
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
463462

464463
store_flush_position(commit_data.end_lsn);
465464
}
465+
else
466+
{
467+
/* Process any invalidation messages that might have accumulated. */
468+
AcceptInvalidationMessages();
469+
maybe_reread_subscription();
470+
}
466471

467472
in_remote_transaction= false;
468473

@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11191124
* now.
11201125
*/
11211126
AcceptInvalidationMessages();
1122-
if (!MySubscriptionValid)
1123-
reread_subscription();
1127+
maybe_reread_subscription();
11241128

11251129
/* Process any table synchronization changes. */
11261130
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
13021306
last_flushpos=flushpos;
13031307
}
13041308

1305-
13061309
/*
1307-
* Reread subscription infoand exit on change.
1310+
* Reread subscription infoif needed. Most changes will be exit.
13081311
*/
13091312
staticvoid
1310-
reread_subscription(void)
1313+
maybe_reread_subscription(void)
13111314
{
13121315
MemoryContextoldctx;
13131316
Subscription*newsub;
13141317
boolstarted_tx= false;
13151318

1319+
/* When cache state is valid there is nothing to do here. */
1320+
if (MySubscriptionValid)
1321+
return;
1322+
13161323
/* This function might be called inside or outside of transaction. */
13171324
if (!IsTransactionState())
13181325
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp