Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5de94a0

Browse files
author
Amit Kapila
committed
Add 'logical_decoding_mode' GUC.
This enables streaming or serializing changes immediately in logicaldecoding. This parameter is intended to be used to test logical decodingand replication of large transactions for which otherwise we need togenerate the changes till logical_decoding_work_mem is reached.This helps in reducing the timing of existing tests related to logicalreplication of in-progress transactions and will help in writing tests forfor the upcoming feature for parallelly applying large in-progresstransactions.Author: Shi yuReviewed-by: Sawada Masahiko, Shveta Mallik, Amit Kapila, Dilip Kumar, Kuroda Hayato, Kyotaro HoriguchiDiscussion:https://postgr.es/m/OSZPR01MB63104E7449DBE41932DB19F1FD1B9@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parentd3c0cc4 commit5de94a0

File tree

9 files changed

+134
-67
lines changed

9 files changed

+134
-67
lines changed

‎doc/src/sgml/config.sgml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11597,6 +11597,34 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1)
1159711597
</listitem>
1159811598
</varlistentry>
1159911599

11600+
<varlistentry id="guc-logical-decoding-mode" xreflabel="logical_decoding_mode">
11601+
<term><varname>logical_decoding_mode</varname> (<type>enum</type>)
11602+
<indexterm>
11603+
<primary><varname>logical_decoding_mode</varname> configuration parameter</primary>
11604+
</indexterm>
11605+
</term>
11606+
<listitem>
11607+
<para>
11608+
Allows streaming or serializing changes immediately in logical decoding.
11609+
The allowed values of <varname>logical_decoding_mode</varname> are
11610+
<literal>buffered</literal> and <literal>immediate</literal>. When set
11611+
to <literal>immediate</literal>, stream each change if
11612+
<literal>streaming</literal> option (see optional parameters set by
11613+
<link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>)
11614+
is enabled, otherwise, serialize each change. When set to
11615+
<literal>buffered</literal>, which is the default, decoding will stream
11616+
or serialize changes when <varname>logical_decoding_work_mem</varname>
11617+
is reached.
11618+
</para>
11619+
<para>
11620+
This parameter is intended to be used to test logical decoding and
11621+
replication of large transactions for which otherwise we need to
11622+
generate the changes till <varname>logical_decoding_work_mem</varname>
11623+
is reached.
11624+
</para>
11625+
</listitem>
11626+
</varlistentry>
11627+
1160011628
</variablelist>
1160111629
</sect1>
1160211630
<sect1 id="runtime-config-short">

‎src/backend/replication/logical/reorderbuffer.c

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ typedef struct ReorderBufferDiskChange
209209
intlogical_decoding_work_mem;
210210
staticconstSizemax_changes_in_memory=4096;/* XXX for restore only */
211211

212+
/* GUC variable */
213+
intlogical_decoding_mode=LOGICAL_DECODING_MODE_BUFFERED;
214+
212215
/* ---------------------------------------
213216
* primary reorderbuffer support routines
214217
* ---------------------------------------
@@ -3540,7 +3543,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
35403543
/*
35413544
* Check whether the logical_decoding_work_mem limit was reached, and if yes
35423545
* pick the largest (sub)transaction at-a-time to evict and spill its changes to
3543-
* disk until we reach under the memory limit.
3546+
* disk or send to the output plugin until we reach under the memory limit.
3547+
*
3548+
* If logical_decoding_mode is set to "immediate", stream or serialize the changes
3549+
* immediately.
35443550
*
35453551
* XXX At this point we select the transactions until we reach under the memory
35463552
* limit, but we might also adapt a more elaborate eviction strategy - for example
@@ -3552,20 +3558,27 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
35523558
{
35533559
ReorderBufferTXN*txn;
35543560

3555-
/* bail out if we haven't exceeded the memory limit */
3556-
if (rb->size<logical_decoding_work_mem*1024L)
3561+
/*
3562+
* Bail out if logical_decoding_mode is buffered and we haven't exceeded
3563+
* the memory limit.
3564+
*/
3565+
if (logical_decoding_mode==LOGICAL_DECODING_MODE_BUFFERED&&
3566+
rb->size<logical_decoding_work_mem*1024L)
35573567
return;
35583568

35593569
/*
3560-
* Loop until we reach under the memory limit. One might think that just
3561-
* by evicting the largest (sub)transaction we will come under the memory
3562-
* limit based on assumption that the selected transaction is at least as
3563-
* large as the most recent change (which caused us to go over the memory
3564-
* limit). However, that is not true because a user can reduce the
3565-
* logical_decoding_work_mem to a smaller value before the most recent
3570+
* If logical_decoding_mode is immediate, loop until there's no change.
3571+
* Otherwise, loop until we reach under the memory limit. One might think
3572+
* that just by evicting the largest (sub)transaction we will come under
3573+
* the memory limit based on assumption that the selected transaction is
3574+
* at least as large as the most recent change (which caused us to go over
3575+
* the memory limit). However, that is not true because a user can reduce
3576+
* the logical_decoding_work_mem to a smaller value before the most recent
35663577
* change.
35673578
*/
3568-
while (rb->size >=logical_decoding_work_mem*1024L)
3579+
while (rb->size >=logical_decoding_work_mem*1024L||
3580+
(logical_decoding_mode==LOGICAL_DECODING_MODE_IMMEDIATE&&
3581+
rb->size>0))
35693582
{
35703583
/*
35713584
* Pick the largest transaction (or subtransaction) and evict it from

‎src/backend/utils/misc/guc_tables.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ static const struct config_enum_entry ssl_protocol_versions_info[] = {
395395
{NULL,0, false}
396396
};
397397

398+
staticconststructconfig_enum_entrylogical_decoding_mode_options[]= {
399+
{"buffered",LOGICAL_DECODING_MODE_BUFFERED, false},
400+
{"immediate",LOGICAL_DECODING_MODE_IMMEDIATE, false},
401+
{NULL,0, false}
402+
};
403+
398404
StaticAssertDecl(lengthof(ssl_protocol_versions_info)== (PG_TLS1_3_VERSION+2),
399405
"array length mismatch");
400406

@@ -4877,6 +4883,17 @@ struct config_enum ConfigureNamesEnum[] =
48774883
NULL,NULL,NULL
48784884
},
48794885

4886+
{
4887+
{"logical_decoding_mode",PGC_USERSET,DEVELOPER_OPTIONS,
4888+
gettext_noop("Allows streaming or serializing each change in logical decoding."),
4889+
NULL,
4890+
GUC_NOT_IN_SAMPLE
4891+
},
4892+
&logical_decoding_mode,
4893+
LOGICAL_DECODING_MODE_BUFFERED,logical_decoding_mode_options,
4894+
NULL,NULL,NULL
4895+
},
4896+
48804897
/* End-of-list marker */
48814898
{
48824899
{NULL,0,0,NULL,NULL},NULL,0,NULL,NULL,NULL,NULL

‎src/include/replication/reorderbuffer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@
1818
#include"utils/timestamp.h"
1919

2020
externPGDLLIMPORTintlogical_decoding_work_mem;
21+
externPGDLLIMPORTintlogical_decoding_mode;
22+
23+
/* possible values for logical_decoding_mode */
24+
typedefenum
25+
{
26+
LOGICAL_DECODING_MODE_BUFFERED,
27+
LOGICAL_DECODING_MODE_IMMEDIATE
28+
}LogicalDecodingMode;
2129

2230
/* an individual tuple, stored in one chunk of memory */
2331
typedefstructReorderBufferTupleBuf

‎src/test/subscription/t/016_stream_subxact.pl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming oflargetransaction containing large subtransactions
4+
# Test streaming of transaction containing subtransactions
55
use strict;
66
use warnings;
77
use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@
1212
my$node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1313
$node_publisher->init(allows_streaming=>'logical');
1414
$node_publisher->append_conf('postgresql.conf',
15-
'logical_decoding_work_mem =64kB');
15+
'logical_decoding_mode =immediate');
1616
$node_publisher->start;
1717

1818
# Create subscriber node
@@ -49,27 +49,27 @@
4949
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
5050
is($result,qq(2|2|2),'check initial data was copied to subscriber');
5151

52-
# Insert, update and deleteenough rows to exceed 64kB limit.
52+
# Insert, update and deletesome rows.
5353
$node_publisher->safe_psql(
5454
'postgres',q{
5555
BEGIN;
56-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 500) s(i);
56+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,5) s(i);
5757
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
5858
DELETE FROM test_tab WHERE mod(a,3) = 0;
5959
SAVEPOINT s1;
60-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
60+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
6161
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
6262
DELETE FROM test_tab WHERE mod(a,3) = 0;
6363
SAVEPOINT s2;
64-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
64+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
6565
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
6666
DELETE FROM test_tab WHERE mod(a,3) = 0;
6767
SAVEPOINT s3;
68-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
68+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
6969
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
7070
DELETE FROM test_tab WHERE mod(a,3) = 0;
7171
SAVEPOINT s4;
72-
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
72+
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
7373
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
7474
DELETE FROM test_tab WHERE mod(a,3) = 0;
7575
COMMIT;
@@ -80,7 +80,7 @@
8080
$result =
8181
$node_subscriber->safe_psql('postgres',
8282
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
83-
is($result,qq(1667|1667|1667),
83+
is($result,qq(12|12|12),
8484
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
8585
);
8686

‎src/test/subscription/t/018_stream_subxact_abort.pl

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming oflargetransaction containing multiple subtransactions and rollbacks
4+
# Test streaming of transaction containing multiple subtransactions and rollbacks
55
use strict;
66
use warnings;
77
use PostgreSQL::Test::Cluster;
@@ -12,7 +12,7 @@
1212
my$node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1313
$node_publisher->init(allows_streaming=>'logical');
1414
$node_publisher->append_conf('postgresql.conf',
15-
'logical_decoding_work_mem =64kB');
15+
'logical_decoding_mode =immediate');
1616
$node_publisher->start;
1717

1818
# Create subscriber node
@@ -48,25 +48,25 @@
4848
"SELECT count(*), count(c) FROM test_tab");
4949
is($result,qq(2|0),'check initial data was copied to subscriber');
5050

51-
#large (streamed) transaction with DDL, DML and ROLLBACKs
51+
# streamed transaction with DDL, DML and ROLLBACKs
5252
$node_publisher->safe_psql(
5353
'postgres',q{
5454
BEGIN;
55-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(3,500) s(i);
55+
INSERT INTO test_tabVALUES (3, md5(3::text));
5656
SAVEPOINT s1;
57-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
57+
INSERT INTO test_tabVALUES (4, md5(4::text));
5858
SAVEPOINT s2;
59-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
59+
INSERT INTO test_tabVALUES (5, md5(5::text));
6060
SAVEPOINT s3;
61-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
61+
INSERT INTO test_tabVALUES (6, md5(6::text));
6262
ROLLBACK TO s2;
63-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
63+
INSERT INTO test_tabVALUES (7, md5(7::text));
6464
ROLLBACK TO s1;
65-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
65+
INSERT INTO test_tabVALUES (8, md5(8::text));
6666
SAVEPOINT s4;
67-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
67+
INSERT INTO test_tabVALUES (9, md5(9::text));
6868
SAVEPOINT s5;
69-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
69+
INSERT INTO test_tabVALUES (10, md5(10::text));
7070
COMMIT;
7171
});
7272

@@ -75,24 +75,24 @@
7575
$result =
7676
$node_subscriber->safe_psql('postgres',
7777
"SELECT count(*), count(c) FROM test_tab");
78-
is($result,qq(2000|0),
78+
is($result,qq(6|0),
7979
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
8080
);
8181

82-
#large (streamed) transaction with subscriber receiving out of order
83-
#subtransactionROLLBACKs
82+
# streamed transaction with subscriber receiving out of order subtransaction
83+
# ROLLBACKs
8484
$node_publisher->safe_psql(
8585
'postgres',q{
8686
BEGIN;
87-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
87+
INSERT INTO test_tabVALUES (11, md5(11::text));
8888
SAVEPOINT s1;
89-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
89+
INSERT INTO test_tabVALUES (12, md5(12::text));
9090
SAVEPOINT s2;
91-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
91+
INSERT INTO test_tabVALUES (13, md5(13::text));
9292
SAVEPOINT s3;
93-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
93+
INSERT INTO test_tabVALUES (14, md5(14::text));
9494
RELEASE s2;
95-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
95+
INSERT INTO test_tabVALUES (15, md5(15::text));
9696
ROLLBACK TO s1;
9797
COMMIT;
9898
});
@@ -102,18 +102,18 @@
102102
$result =
103103
$node_subscriber->safe_psql('postgres',
104104
"SELECT count(*), count(c) FROM test_tab");
105-
is($result,qq(2500|0),
105+
is($result,qq(7|0),
106106
'check rollback to savepoint was reflected on subscriber');
107107

108-
#large (streamed) transaction with subscriber receiving rollback
108+
# streamed transaction with subscriber receiving rollback
109109
$node_publisher->safe_psql(
110110
'postgres',q{
111111
BEGIN;
112-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
112+
INSERT INTO test_tabVALUES (16, md5(16::text));
113113
SAVEPOINT s1;
114-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
114+
INSERT INTO test_tabVALUES (17, md5(17::text));
115115
SAVEPOINT s2;
116-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
116+
INSERT INTO test_tabVALUES (18, md5(18::text));
117117
ROLLBACK;
118118
});
119119

@@ -122,7 +122,7 @@
122122
$result =
123123
$node_subscriber->safe_psql('postgres',
124124
"SELECT count(*), count(c) FROM test_tab");
125-
is($result,qq(2500|0),'check rollback was reflected on subscriber');
125+
is($result,qq(7|0),'check rollback was reflected on subscriber');
126126

127127
$node_subscriber->stop;
128128
$node_publisher->stop;

‎src/test/subscription/t/019_stream_subxact_ddl_abort.pl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
33

4-
# Test streaming oflargetransaction with subtransactions, DDLs, DMLs, and
4+
# Test streaming of transaction with subtransactions, DDLs, DMLs, and
55
# rollbacks
66
use strict;
77
use warnings;
@@ -13,7 +13,7 @@
1313
my$node_publisher = PostgreSQL::Test::Cluster->new('publisher');
1414
$node_publisher->init(allows_streaming=>'logical');
1515
$node_publisher->append_conf('postgresql.conf',
16-
'logical_decoding_work_mem =64kB');
16+
'logical_decoding_mode =immediate');
1717
$node_publisher->start;
1818

1919
# Create subscriber node
@@ -49,23 +49,23 @@
4949
"SELECT count(*), count(c) FROM test_tab");
5050
is($result,qq(2|0),'check initial data was copied to subscriber');
5151

52-
#large (streamed) transaction with DDL, DML and ROLLBACKs
52+
# streamed transaction with DDL, DML and ROLLBACKs
5353
$node_publisher->safe_psql(
5454
'postgres',q{
5555
BEGIN;
56-
INSERT INTO test_tabSELECT i, md5(i::text) FROM generate_series(3,500) s(i);
56+
INSERT INTO test_tabVALUES (3, md5(3::text));
5757
ALTER TABLE test_tab ADD COLUMN c INT;
5858
SAVEPOINT s1;
59-
INSERT INTO test_tabSELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
59+
INSERT INTO test_tabVALUES (4, md5(4::text), -4);
6060
ALTER TABLE test_tab ADD COLUMN d INT;
6161
SAVEPOINT s2;
62-
INSERT INTO test_tabSELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
62+
INSERT INTO test_tabVALUES (5, md5(5::text), -5, 5*2);
6363
ALTER TABLE test_tab ADD COLUMN e INT;
6464
SAVEPOINT s3;
65-
INSERT INTO test_tabSELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
65+
INSERT INTO test_tabVALUES (6, md5(6::text), -6, 6*2, -6*3);
6666
ALTER TABLE test_tab DROP COLUMN c;
6767
ROLLBACK TO s1;
68-
INSERT INTO test_tabSELECT i, md5(i::text),i FROM generate_series(501,1000) s(i);
68+
INSERT INTO test_tabVALUES (4, md5(4::text),4);
6969
COMMIT;
7070
});
7171

@@ -74,7 +74,7 @@
7474
$result =
7575
$node_subscriber->safe_psql('postgres',
7676
"SELECT count(*), count(c) FROM test_tab");
77-
is($result,qq(1000|500),
77+
is($result,qq(4|1),
7878
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
7979
);
8080

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp