Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitab28fea

Browse files
committed
Handle heap rewrites better in logical replication
A FOR ALL TABLES publication naturally considers all base tables to be acandidate for replication. This includes transient heaps that arecreated during a table rewrite during DDL. This causes failures on thesubscriber side because it will not have a table like pg_temp_16386 toreceive data (and if it did, it would be the wrong table).The prevent this problem, we filter out any tables that match thisnaming pattern and match an actual table from FOR ALL TABLESpublications. This is only a heuristic, meaning that user tables thatmatch that naming could accidentally be omitted. A more robust solutionmight require an explicit marking of such tables in pg_class somehow.Reported-by: yxq <yxq@o2.pl>Bug: #14785Reviewed-by: Andres Freund <andres@anarazel.de>Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parent22c5e73 commitab28fea

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

‎src/backend/replication/pgoutput/pgoutput.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include"utils/inval.h"
2323
#include"utils/int8.h"
24+
#include"utils/lsyscache.h"
2425
#include"utils/memutils.h"
2526
#include"utils/syscache.h"
2627
#include"utils/varlena.h"
@@ -509,6 +510,31 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
509510
{
510511
Publication*pub=lfirst(lc);
511512

513+
/*
514+
* Skip tables that look like they are from a heap rewrite (see
515+
* make_new_heap()). We need to skip them because the subscriber
516+
* won't have a table by that name to receive the data. That
517+
* means we won't ship the new data in, say, an added column with
518+
* a DEFAULT, but if the user applies the same DDL manually on the
519+
* subscriber, then this will work out for them.
520+
*
521+
* We only need to consider the alltables case, because such a
522+
* transient heap won't be an explicit member of a publication.
523+
*/
524+
if (pub->alltables)
525+
{
526+
char*relname=get_rel_name(relid);
527+
unsignedintu;
528+
intn;
529+
530+
if (sscanf(relname,"pg_temp_%u%n",&u,&n)==1&&
531+
relname[n]=='\0')
532+
{
533+
if (get_rel_relkind(u)==RELKIND_RELATION)
534+
break;
535+
}
536+
}
537+
512538
if (pub->alltables||list_member_oid(pubids,pub->oid))
513539
{
514540
entry->pubactions.pubinsert |=pub->pubactions.pubinsert;
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Test logical replication behavior with heap rewrites
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 2;
7+
8+
subwait_for_caught_up
9+
{
10+
my ($node,$appname) =@_;
11+
12+
$node->poll_query_until('postgres',
13+
"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
14+
)ordie"Timed out while waiting for subscriber to catch up";
15+
}
16+
17+
my$node_publisher = get_new_node('publisher');
18+
$node_publisher->init(allows_streaming=>'logical');
19+
$node_publisher->start;
20+
21+
my$node_subscriber = get_new_node('subscriber');
22+
$node_subscriber->init(allows_streaming=>'logical');
23+
$node_subscriber->start;
24+
25+
my$ddl ="CREATE TABLE test1 (a int, b text);";
26+
$node_publisher->safe_psql('postgres',$ddl);
27+
$node_subscriber->safe_psql('postgres',$ddl);
28+
29+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
30+
my$appname ='encoding_test';
31+
32+
$node_publisher->safe_psql('postgres',
33+
"CREATE PUBLICATION mypub FOR ALL TABLES;");
34+
$node_subscriber->safe_psql('postgres',
35+
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
36+
);
37+
38+
wait_for_caught_up($node_publisher,$appname);
39+
40+
# Wait for initial sync to finish as well
41+
my$synced_query =
42+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
43+
$node_subscriber->poll_query_until('postgres',$synced_query)
44+
ordie"Timed out while waiting for subscriber to synchronize data";
45+
46+
$node_publisher->safe_psql('postgres',q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
47+
48+
wait_for_caught_up($node_publisher,$appname);
49+
50+
is($node_subscriber->safe_psql('postgres',q{SELECT a, b FROM test1}),
51+
qq(1|one
52+
2|two),
53+
'initial data replicated to subscriber');
54+
55+
# DDL that causes a heap rewrite
56+
my$ddl2 ="ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;";
57+
$node_subscriber->safe_psql('postgres',$ddl2);
58+
$node_publisher->safe_psql('postgres',$ddl2);
59+
60+
wait_for_caught_up($node_publisher,$appname);
61+
62+
$node_publisher->safe_psql('postgres',q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);});
63+
64+
wait_for_caught_up($node_publisher,$appname);
65+
66+
is($node_subscriber->safe_psql('postgres',q{SELECT a, b, c FROM test1}),
67+
qq(1|one|0
68+
2|two|0
69+
3|three|33),
70+
'data replicated to subscriber');
71+
72+
$node_subscriber->stop;
73+
$node_publisher->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp