Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit713f7c4

Browse files
committed
Fix after trigger execution in logical replication
From: Petr Jelinek <petr.jelinek@2ndquadrant.com>Tested-by: Thom Brown <thom@linux.com>
1 parent1e8a850 commit713f7c4

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
173173
if (resultRelInfo->ri_TrigDesc)
174174
estate->es_trig_tuple_slot=ExecInitExtraTupleSlot(estate);
175175

176+
/* Prepare to catch AFTER triggers. */
177+
AfterTriggerBeginQuery();
178+
176179
returnestate;
177180
}
178181

@@ -533,6 +536,10 @@ apply_handle_insert(StringInfo s)
533536
/* Cleanup. */
534537
ExecCloseIndices(estate->es_result_relation_info);
535538
PopActiveSnapshot();
539+
540+
/* Handle queued AFTER triggers. */
541+
AfterTriggerEndQuery(estate);
542+
536543
ExecResetTupleTable(estate->es_tupleTable, false);
537544
FreeExecutorState(estate);
538545

@@ -673,6 +680,10 @@ apply_handle_update(StringInfo s)
673680
/* Cleanup. */
674681
ExecCloseIndices(estate->es_result_relation_info);
675682
PopActiveSnapshot();
683+
684+
/* Handle queued AFTER triggers. */
685+
AfterTriggerEndQuery(estate);
686+
676687
EvalPlanQualEnd(&epqstate);
677688
ExecResetTupleTable(estate->es_tupleTable, false);
678689
FreeExecutorState(estate);
@@ -760,6 +771,10 @@ apply_handle_delete(StringInfo s)
760771
/* Cleanup. */
761772
ExecCloseIndices(estate->es_result_relation_info);
762773
PopActiveSnapshot();
774+
775+
/* Handle queued AFTER triggers. */
776+
AfterTriggerEndQuery(estate);
777+
763778
EvalPlanQualEnd(&epqstate);
764779
ExecResetTupleTable(estate->es_tupleTable, false);
765780
FreeExecutorState(estate);
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Basic logical replication test
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 4;
7+
8+
# Initialize publisher node
9+
my$node_publisher = get_new_node('publisher');
10+
$node_publisher->init(allows_streaming=>'logical');
11+
$node_publisher->start;
12+
13+
# Create subscriber node
14+
my$node_subscriber = get_new_node('subscriber');
15+
$node_subscriber->init(allows_streaming=>'logical');
16+
$node_subscriber->start;
17+
18+
# Setup structure on publisher
19+
$node_publisher->safe_psql('postgres',
20+
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
21+
$node_publisher->safe_psql('postgres',
22+
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
23+
24+
# Setup structure on subscriber
25+
$node_subscriber->safe_psql('postgres',
26+
"CREATE TABLE tab_fk (bid int PRIMARY KEY);");
27+
$node_subscriber->safe_psql('postgres',
28+
"CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid));");
29+
30+
# Setup logical replication
31+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
32+
$node_publisher->safe_psql('postgres',
33+
"CREATE PUBLICATION tap_pub FOR ALL TABLES;");
34+
35+
my$appname ='tap_sub';
36+
$node_subscriber->safe_psql('postgres',
37+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub;");
38+
39+
# Wait for subscriber to finish initialization
40+
my$caughtup_query =
41+
"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';";
42+
$node_publisher->poll_query_until('postgres',$caughtup_query)
43+
ordie"Timed out while waiting for subscriber to catch up";
44+
45+
$node_publisher->safe_psql('postgres',
46+
"INSERT INTO tab_fk (bid) VALUES (1);");
47+
$node_publisher->safe_psql('postgres',
48+
"INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
49+
50+
$node_publisher->poll_query_until('postgres',$caughtup_query)
51+
ordie"Timed out while waiting for subscriber to catch up";
52+
53+
# Check data on subscriber
54+
my$result =
55+
$node_subscriber->safe_psql('postgres',"SELECT count(*), min(bid), max(bid) FROM tab_fk;");
56+
is($result,qq(1|1|1),'check replicated tab_fk inserts on subscriber');
57+
58+
$result =
59+
$node_subscriber->safe_psql('postgres',"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
60+
is($result,qq(1|1|1),'check replicated tab_fk_ref inserts on subscriber');
61+
62+
# Drop the fk on publisher
63+
$node_publisher->safe_psql('postgres',
64+
"DROP TABLE tab_fk CASCADE;");
65+
66+
# Insert data
67+
$node_publisher->safe_psql('postgres',
68+
"INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
69+
70+
$node_publisher->poll_query_until('postgres',$caughtup_query)
71+
ordie"Timed out while waiting for subscriber to catch up";
72+
73+
# FK is not enforced on subscriber
74+
$result =
75+
$node_subscriber->safe_psql('postgres',"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
76+
is($result,qq(2|1|2),'check FK ignored on subscriber');
77+
78+
# Add replica trigger
79+
$node_subscriber->safe_psql('postgres',qq{
80+
CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS\$\$
81+
BEGIN
82+
IF (TG_OP = 'INSERT') THEN
83+
IF (NEW.id < 10) THEN
84+
RETURN NEW;
85+
ELSE
86+
RETURN NULL;
87+
END IF;
88+
ELSE
89+
RAISE WARNING 'Unknown action';
90+
RETURN NULL;
91+
END IF;
92+
END;
93+
\$\$ LANGUAGE plpgsql;
94+
CREATE TRIGGER filter_basic_dml_trg
95+
BEFORE INSERT ON tab_fk_ref
96+
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
97+
ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
98+
});
99+
100+
# Insert data
101+
$node_publisher->safe_psql('postgres',
102+
"INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
103+
104+
$node_publisher->poll_query_until('postgres',$caughtup_query)
105+
ordie"Timed out while waiting for subscriber to catch up";
106+
107+
# The row should be skipped on subscriber
108+
$result =
109+
$node_subscriber->safe_psql('postgres',"SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
110+
is($result,qq(2|1|2),'check replica trigger applied on subscriber');
111+
112+
$node_subscriber->stop('fast');
113+
$node_publisher->stop('fast');

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp