Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit58b5ae9

Browse files
author
Amit Kapila
committed
Add additional tests to test streaming of in-progress transactions.
This covers the functionality tests for streaming in-progresssubtransactions, streaming transactions containing rollback to savepoints,and streaming transactions having DDLs.Author: Tomas Vondra, Amit Kapila and Dilip KumarReviewed-by: Dilip KumarDiscussion:https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent8870917 commit58b5ae9

File tree

4 files changed

+384
-0
lines changed

4 files changed

+384
-0
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Test streaming of large transaction containing large subtransactions
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 2;
7+
8+
# Create publisher node
9+
my$node_publisher = get_new_node('publisher');
10+
$node_publisher->init(allows_streaming=>'logical');
11+
$node_publisher->append_conf('postgresql.conf','logical_decoding_work_mem = 64kB');
12+
$node_publisher->start;
13+
14+
# Create subscriber node
15+
my$node_subscriber = get_new_node('subscriber');
16+
$node_subscriber->init(allows_streaming=>'logical');
17+
$node_subscriber->start;
18+
19+
# Create some preexisting content on publisher
20+
$node_publisher->safe_psql('postgres',
21+
"CREATE TABLE test_tab (a int primary key, b varchar)");
22+
$node_publisher->safe_psql('postgres',
23+
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
24+
25+
# Setup structure on subscriber
26+
$node_subscriber->safe_psql('postgres',"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
27+
28+
# Setup logical replication
29+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
30+
$node_publisher->safe_psql('postgres',"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
31+
32+
my$appname ='tap_sub';
33+
$node_subscriber->safe_psql('postgres',
34+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
35+
);
36+
37+
$node_publisher->wait_for_catchup($appname);
38+
39+
# Also wait for initial table sync to finish
40+
my$synced_query =
41+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
42+
$node_subscriber->poll_query_until('postgres',$synced_query)
43+
ordie"Timed out while waiting for subscriber to synchronize data";
44+
45+
my$result =
46+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c), count(d = 999) FROM test_tab");
47+
is($result,qq(2|2|2),'check initial data was copied to subscriber');
48+
49+
# Insert, update and delete enough rows to exceed 64kB limit.
50+
$node_publisher->safe_psql('postgres',q{
51+
BEGIN;
52+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i);
53+
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
54+
DELETE FROM test_tab WHERE mod(a,3) = 0;
55+
SAVEPOINT s1;
56+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
57+
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
58+
DELETE FROM test_tab WHERE mod(a,3) = 0;
59+
SAVEPOINT s2;
60+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
61+
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
62+
DELETE FROM test_tab WHERE mod(a,3) = 0;
63+
SAVEPOINT s3;
64+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
65+
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
66+
DELETE FROM test_tab WHERE mod(a,3) = 0;
67+
SAVEPOINT s4;
68+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
69+
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
70+
DELETE FROM test_tab WHERE mod(a,3) = 0;
71+
COMMIT;
72+
});
73+
74+
$node_publisher->wait_for_catchup($appname);
75+
76+
$result =
77+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c), count(d = 999) FROM test_tab");
78+
is($result,qq(1667|1667|1667),'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
79+
80+
$node_subscriber->stop;
81+
$node_publisher->stop;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Test streaming of large transaction with DDL and subtransactions
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 3;
7+
8+
# Create publisher node
9+
my$node_publisher = get_new_node('publisher');
10+
$node_publisher->init(allows_streaming=>'logical');
11+
$node_publisher->append_conf('postgresql.conf','logical_decoding_work_mem = 64kB');
12+
$node_publisher->start;
13+
14+
# Create subscriber node
15+
my$node_subscriber = get_new_node('subscriber');
16+
$node_subscriber->init(allows_streaming=>'logical');
17+
$node_subscriber->start;
18+
19+
# Create some preexisting content on publisher
20+
$node_publisher->safe_psql('postgres',
21+
"CREATE TABLE test_tab (a int primary key, b varchar)");
22+
$node_publisher->safe_psql('postgres',
23+
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
24+
25+
# Setup structure on subscriber
26+
$node_subscriber->safe_psql('postgres',"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)");
27+
28+
# Setup logical replication
29+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
30+
$node_publisher->safe_psql('postgres',"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
31+
32+
my$appname ='tap_sub';
33+
$node_subscriber->safe_psql('postgres',
34+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
35+
);
36+
37+
$node_publisher->wait_for_catchup($appname);
38+
39+
# Also wait for initial table sync to finish
40+
my$synced_query =
41+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
42+
$node_subscriber->poll_query_until('postgres',$synced_query)
43+
ordie"Timed out while waiting for subscriber to synchronize data";
44+
45+
my$result =
46+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c), count(d = 999) FROM test_tab");
47+
is($result,qq(2|0|0),'check initial data was copied to subscriber');
48+
49+
# a small (non-streamed) transaction with DDL and DML
50+
$node_publisher->safe_psql('postgres',q{
51+
BEGIN;
52+
INSERT INTO test_tab VALUES (3, md5(3::text));
53+
ALTER TABLE test_tab ADD COLUMN c INT;
54+
SAVEPOINT s1;
55+
INSERT INTO test_tab VALUES (4, md5(4::text), -4);
56+
COMMIT;
57+
});
58+
59+
# large (streamed) transaction with DDL and DML
60+
$node_publisher->safe_psql('postgres',q{
61+
BEGIN;
62+
INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i);
63+
ALTER TABLE test_tab ADD COLUMN d INT;
64+
SAVEPOINT s1;
65+
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i);
66+
COMMIT;
67+
});
68+
69+
# a small (non-streamed) transaction with DDL and DML
70+
$node_publisher->safe_psql('postgres',q{
71+
BEGIN;
72+
INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001);
73+
ALTER TABLE test_tab ADD COLUMN e INT;
74+
SAVEPOINT s1;
75+
INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002);
76+
COMMIT;
77+
});
78+
79+
$node_publisher->wait_for_catchup($appname);
80+
81+
$result =
82+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c), count(d), count(e) FROM test_tab");
83+
is($result,qq(2002|1999|1002|1),'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
84+
85+
# A large (streamed) transaction with DDL and DML. One of the DDL is performed
86+
# after DML to ensure that we invalidate the schema sent for test_tab so that
87+
# the next transaction has to send the schema again.
88+
$node_publisher->safe_psql('postgres',q{
89+
BEGIN;
90+
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i);
91+
ALTER TABLE test_tab ADD COLUMN f INT;
92+
COMMIT;
93+
});
94+
95+
# A small transaction that won't get streamed. This is just to ensure that we
96+
# send the schema again to reflect the last column added in the previous test.
97+
$node_publisher->safe_psql('postgres',q{
98+
BEGIN;
99+
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i);
100+
COMMIT;
101+
});
102+
103+
$node_publisher->wait_for_catchup($appname);
104+
105+
$result =
106+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab");
107+
is($result,qq(5005|5002|4005|3004|5),'check data was copied to subscriber for both streaming and non-streaming transactions');
108+
109+
$node_subscriber->stop;
110+
$node_publisher->stop;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Test streaming of large transaction containing multiple subtransactions and rollbacks
2+
use strict;
3+
use warnings;
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 4;
7+
8+
# Create publisher node
9+
my$node_publisher = get_new_node('publisher');
10+
$node_publisher->init(allows_streaming=>'logical');
11+
$node_publisher->append_conf('postgresql.conf','logical_decoding_work_mem = 64kB');
12+
$node_publisher->start;
13+
14+
# Create subscriber node
15+
my$node_subscriber = get_new_node('subscriber');
16+
$node_subscriber->init(allows_streaming=>'logical');
17+
$node_subscriber->start;
18+
19+
# Create some preexisting content on publisher
20+
$node_publisher->safe_psql('postgres',
21+
"CREATE TABLE test_tab (a int primary key, b varchar)");
22+
$node_publisher->safe_psql('postgres',
23+
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
24+
25+
# Setup structure on subscriber
26+
$node_subscriber->safe_psql('postgres',"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
27+
28+
# Setup logical replication
29+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
30+
$node_publisher->safe_psql('postgres',"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
31+
32+
my$appname ='tap_sub';
33+
$node_subscriber->safe_psql('postgres',
34+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
35+
);
36+
37+
$node_publisher->wait_for_catchup($appname);
38+
39+
# Also wait for initial table sync to finish
40+
my$synced_query =
41+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
42+
$node_subscriber->poll_query_until('postgres',$synced_query)
43+
ordie"Timed out while waiting for subscriber to synchronize data";
44+
45+
my$result =
46+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
47+
is($result,qq(2|0),'check initial data was copied to subscriber');
48+
49+
# large (streamed) transaction with DDL, DML and ROLLBACKs
50+
$node_publisher->safe_psql('postgres',q{
51+
BEGIN;
52+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
53+
SAVEPOINT s1;
54+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
55+
SAVEPOINT s2;
56+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
57+
SAVEPOINT s3;
58+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
59+
ROLLBACK TO s2;
60+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
61+
ROLLBACK TO s1;
62+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
63+
SAVEPOINT s4;
64+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
65+
SAVEPOINT s5;
66+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
67+
COMMIT;
68+
});
69+
70+
$node_publisher->wait_for_catchup($appname);
71+
72+
$result =
73+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
74+
is($result,qq(2000|0),'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
75+
76+
# large (streamed) transaction with subscriber receiving out of order
77+
# subtransaction ROLLBACKs
78+
$node_publisher->safe_psql('postgres',q{
79+
BEGIN;
80+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
81+
SAVEPOINT s1;
82+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
83+
SAVEPOINT s2;
84+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
85+
SAVEPOINT s3;
86+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
87+
RELEASE s2;
88+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
89+
ROLLBACK TO s1;
90+
COMMIT;
91+
});
92+
93+
$node_publisher->wait_for_catchup($appname);
94+
95+
$result =
96+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
97+
is($result,qq(2500|0),'check rollback to savepoint was reflected on subscriber');
98+
99+
# large (streamed) transaction with subscriber receiving rollback
100+
$node_publisher->safe_psql('postgres',q{
101+
BEGIN;
102+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
103+
SAVEPOINT s1;
104+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
105+
SAVEPOINT s2;
106+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
107+
ROLLBACK;
108+
});
109+
110+
$node_publisher->wait_for_catchup($appname);
111+
112+
$result =
113+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
114+
is($result,qq(2500|0),'check rollback was reflected on subscriber');
115+
116+
$node_subscriber->stop;
117+
$node_publisher->stop;
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
2+
# rollbacks
3+
use strict;
4+
use warnings;
5+
use PostgresNode;
6+
use TestLib;
7+
use Test::Moretests=> 2;
8+
9+
# Create publisher node
10+
my$node_publisher = get_new_node('publisher');
11+
$node_publisher->init(allows_streaming=>'logical');
12+
$node_publisher->append_conf('postgresql.conf','logical_decoding_work_mem = 64kB');
13+
$node_publisher->start;
14+
15+
# Create subscriber node
16+
my$node_subscriber = get_new_node('subscriber');
17+
$node_subscriber->init(allows_streaming=>'logical');
18+
$node_subscriber->start;
19+
20+
# Create some preexisting content on publisher
21+
$node_publisher->safe_psql('postgres',
22+
"CREATE TABLE test_tab (a int primary key, b varchar)");
23+
$node_publisher->safe_psql('postgres',
24+
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
25+
26+
# Setup structure on subscriber
27+
$node_subscriber->safe_psql('postgres',"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
28+
29+
# Setup logical replication
30+
my$publisher_connstr =$node_publisher->connstr .' dbname=postgres';
31+
$node_publisher->safe_psql('postgres',"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
32+
33+
my$appname ='tap_sub';
34+
$node_subscriber->safe_psql('postgres',
35+
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
36+
);
37+
38+
$node_publisher->wait_for_catchup($appname);
39+
40+
# Also wait for initial table sync to finish
41+
my$synced_query =
42+
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
43+
$node_subscriber->poll_query_until('postgres',$synced_query)
44+
ordie"Timed out while waiting for subscriber to synchronize data";
45+
46+
my$result =
47+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
48+
is($result,qq(2|0),'check initial data was copied to subscriber');
49+
50+
# large (streamed) transaction with DDL, DML and ROLLBACKs
51+
$node_publisher->safe_psql('postgres',q{
52+
BEGIN;
53+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
54+
ALTER TABLE test_tab ADD COLUMN c INT;
55+
SAVEPOINT s1;
56+
INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
57+
ALTER TABLE test_tab ADD COLUMN d INT;
58+
SAVEPOINT s2;
59+
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
60+
ALTER TABLE test_tab ADD COLUMN e INT;
61+
SAVEPOINT s3;
62+
INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
63+
ALTER TABLE test_tab DROP COLUMN c;
64+
ROLLBACK TO s1;
65+
INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
66+
COMMIT;
67+
});
68+
69+
$node_publisher->wait_for_catchup($appname);
70+
71+
$result =
72+
$node_subscriber->safe_psql('postgres',"SELECT count(*), count(c) FROM test_tab");
73+
is($result,qq(1000|500),'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
74+
75+
$node_subscriber->stop;
76+
$node_publisher->stop;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp