Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Logical replication support#42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
funbringer merged 16 commits intopostgrespro:masterfromzilder:logical
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
Show all changes
16 commits
Select commitHold shift + click to select a range
ca5b546
Logical replication
zilderMar 16, 2018
bb01c7d
Skip logical replication test on PostgreSQL versions below 10
zilderMar 16, 2018
782484b
Fix logical replication for python3
zilderMar 16, 2018
f8b95c6
Merge branch 'master' into logical
zilderMar 19, 2018
b1cba73
Some minor refactoring of logical replication
zilderMar 22, 2018
954879a
Added options_string() func
zilderMar 22, 2018
0741c70
Merge branch 'master' into logical
zilderMar 22, 2018
f4e0bd0
Minor refactoring
zilderMar 22, 2018
f48623b
Add failing logical replication test for 9.6
zilderMar 22, 2018
138c6cc
Added test for Publication.add_tables()
zilderMar 22, 2018
f652bf4
Merge branch 'master' into logical. Also testgres.pubsub was added to…
zilderMar 26, 2018
bc1002f
Additional subscription catchup test
zilderMar 27, 2018
08ed6ef
Some refactoring of logical replication API and formatting
zilderMar 27, 2018
4b279ef
minor refactoring
zilderMay 31, 2018
d60cdcb
Merge branch 'master' into logical
zilderMay 31, 2018
50e02ff
change safe_psql() call to execute() in pubsub.py
zilderJun 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
Logical replication
  • Loading branch information
@zilder
zilder committedMar 16, 2018
commitca5b5465a07e45f91f6258bfcc73a4dd06ae9e8c
58 changes: 55 additions & 3 deletionstestgres/node.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -48,10 +48,13 @@
ExecUtilException, \
QueryException, \
StartNodeException, \
TimeoutException
TimeoutException, \
InitNodeException

from .logger import TestgresLogger

from .pubsub import Publication, Subscription

from .utils import \
eprint, \
get_bin_path, \
Expand DownExpand Up@@ -278,6 +281,7 @@ def default_conf(self,
fsync=False,
unix_sockets=True,
allow_streaming=True,
allow_logical=False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why don't we enable this by default?

Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Because it is not supported on postgres versions below 10 and there is specific message when someone's trying to enable this feature on those versions. Besides it produces extra WAL data and hence could work slightly slower.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ah, i see.

log_statement='all'):
"""
Apply default settings to this node.
Expand All@@ -286,6 +290,7 @@ def default_conf(self,
fsync: should this node use fsync to keep data safe?
unix_sockets: should we enable UNIX sockets?
allow_streaming: should this node add a hba entry for replication?
allow_logical: can this node be used as a logical replication publisher?
log_statement: one of ('all', 'off', 'mod', 'ddl').

Returns:
Expand DownExpand Up@@ -365,6 +370,12 @@ def get_auth_method(t):
wal_keep_segments,
wal_level))

if allow_logical:
if not pg_version_ge('10'):
raise InitNodeException("Logical replication is only "
"available for Postgres 10 and newer")
conf.write(u"wal_level = logical\n")

# disable UNIX sockets if asked to
if not unix_sockets:
conf.write(u"unix_socket_directories = ''\n")
Expand DownExpand Up@@ -751,7 +762,8 @@ def poll_query_until(self,
expected=True,
commit=True,
raise_programming_error=True,
raise_internal_error=True):
raise_internal_error=True,
zero_rows_is_ok=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Isn'tzero_rows_is_ok=True effectively equal toexpected=None?

Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

InSubscription.catchup() we are expecting True and at the same time it may happen that there is no rows at the moment (until statistics collected).

"""
Run a query once per second until it returns 'expected'.
Query should return a single value (1 row, 1 column).
Expand DownExpand Up@@ -788,7 +800,12 @@ def poll_query_until(self,
raise QueryException('Query returned None', query)

if len(res) == 0:
raise QueryException('Query returned 0 rows', query)
if zero_rows_is_ok:
time.sleep(sleep_time)
attempts += 1
continue
else:
raise QueryException('Query returned 0 rows', query)

if len(res[0]) == 0:
raise QueryException('Query returned 0 columns', query)
Expand DownExpand Up@@ -902,6 +919,41 @@ def catchup(self, dbname=None, username=None):
except Exception as e:
raise_from(CatchUpException("Failed to catch up", poll_lsn), e)

def publish(self,
pubname,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Maybe changepubname toname?

ildus reacted with thumbs up emoji
tables=None,
dbname=None,
username=None):
"""
Create publication for logical replication

Args:
pubname: publication name
tables: tables names list
dbname: database name where objects or interest are located
username: replication username
"""
return Publication(pubname, self, tables, dbname, username)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Could you please use keyword args instead of positional args?


def subscribe(self,
publication,
subname,
dbname=None,
username=None,
**kwargs):
"""
Create subscription for logical replication

Args:
subname: subscription name
publication: publication object obtained from publish()

"""
return Subscription(subname, self, publication,
dbname=dbname,
username=username,
**kwargs)

def pgbench(self,
dbname=None,
username=None,
Expand Down
161 changes: 161 additions & 0 deletionstestgres/pubsub.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
# coding: utf-8

from six import raise_from

from .defaults import default_dbname, default_username
from .exceptions import CatchUpException
from .utils import pg_version_ge


class Publication(object):
def __init__(self, pubname, node, tables=None, dbname=None, username=None):
"""
Constructor

Args:
pubname: publication name
node: publisher's node
tables: tables list or None for all tables
dbname: database name used to connect and perform subscription
username: username used to connect to the database
"""
self.name = pubname
self.node = node
self.dbname = dbname or default_dbname()
self.username = username or default_username()

# create publication in database
t = 'table ' + ', '.join(tables) if tables else 'all tables'
query = "create publication {} for {}"
node.safe_psql(query.format(pubname, t),
dbname=dbname,
username=username)

def close(self, dbname=None, username=None):
"""
Drop publication
"""
self.node.safe_psql("drop publication {}".format(self.name),
dbname=dbname, username=username)

def add_tables(self, tables, dbname=None, username=None):
"""
Add tables

Args:
tables: a list of tables to add to the publication
"""
if not tables:
raise ValueError("Tables list is empty")

query = "alter publication {} add table {}"
self.node.safe_psql(query.format(self.name, ', '.join(tables)),
dbname=dbname or self.dbname,
username=username or self.username)


class Subscription(object):
def __init__(self,
subname,
node,
publication,
dbname=None,
username=None,
**kwargs):
"""
Constructor

Args:
subname: subscription name
node: subscriber's node
publication: Publication object we are subscribing to
dbname: database name used to connect and perform subscription
username: username used to connect to the database
**kwargs: subscription parameters (see CREATE SUBSCRIPTION
in PostgreSQL documentation for more information)
"""
self.name = subname
self.node = node
self.pub = publication

# connection info
conninfo = (
u"dbname={} user={} host={} port={}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I think we could extract commonconninfo-related code from_create_recovery_conf() into a function and add it toutils.py. What do you think?

).format(self.pub.dbname,
self.pub.username,
self.pub.node.host,
self.pub.node.port)

query = (
"create subscription {} connection '{}' publication {}"
).format(subname, conninfo, self.pub.name)

# additional parameters
if kwargs:
params = ','.join('{}={}'.format(k, v) for k, v in kwargs.iteritems())
query += " with ({})".format(params)

node.safe_psql(query, dbname=dbname, username=username)

def disable(self, dbname=None, username=None):
"""
Disables the running subscription.
"""
query = "alter subscription {} disable"
self.node.safe_psql(query.format(self.name),
dbname=None,
username=None)

def enable(self, dbname=None, username=None):
"""
Enables the previously disabled subscription.
"""
query = "alter subscription {} enable"
self.node.safe_psql(query.format(self.name),
dbname=None,
username=None)

def refresh(self, copy_data=True, dbname=None, username=None):
"""
Disables the running subscription.
"""
query = "alter subscription {} refresh publication with (copy_data={})"
self.node.safe_psql(query.format(self.name, copy_data),
dbname=dbname,
username=username)

def close(self, dbname=None, username=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Shouldn't it be calleddrop instead?

"""
Drops subscription
"""
self.node.safe_psql("drop subscription {}".format(self.name),
dbname=dbname, username=username)

def catchup(self, username=None):
"""
Wait until subscription catches up with publication.

Args:
username: remote node's user name
"""
if pg_version_ge('10'):
query = (
"select pg_current_wal_lsn() - replay_lsn = 0 "
"from pg_stat_replication where application_name = '{}'"
).format(self.name)
else:
query = (
"select pg_current_xlog_location() - replay_location = 0 "
"from pg_stat_replication where application_name = '{}'"
).format(self.name)

try:
# wait until this LSN reaches subscriber
self.pub.node.poll_query_until(
query=query,
dbname=self.pub.dbname,
username=username or self.pub.username,
max_attempts=60,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

IMHO it's better to replacemax_attempts=60 andzero_rows_is_ok=True with kwargs.

zero_rows_is_ok=True) # statistics may have not updated yet
except Exception as e:
raise_from(CatchUpException("Failed to catch up", query), e)
66 changes: 66 additions & 0 deletionstests/test_simple.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -382,6 +382,72 @@ def test_replicate(self):
res = node.execute('select * from test')
self.assertListEqual(res, [])

def test_logical_replication(self):
with get_new_node() as node1, get_new_node() as node2:
node1.init(allow_logical=True)
node1.start()
node2.init().start()

create_table = 'create table test (a int, b int)'
node1.safe_psql(create_table)
node2.safe_psql(create_table)

# create publication / create subscription
pub = node1.publish('mypub')
sub = node2.subscribe(pub, 'mysub')

node1.safe_psql('insert into test values (1, 1), (2, 2)')

# wait until changes apply on subscriber and check them
sub.catchup()
res = node2.execute('select * from test')
self.assertListEqual(res, [(1, 1), (2, 2)])

# disable and put some new data
sub.disable()
node1.safe_psql('insert into test values (3, 3)')

# enable and ensure that data successfully transfered
sub.enable()
sub.catchup()
res = node2.execute('select * from test')
self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)])

# Add new tables. Since we added "all tables" to publication
# (default behaviour of publish() method) we don't need
# to explicitely perform pub.add_table()
create_table = 'create table test2 (c char)'
node1.safe_psql(create_table)
node2.safe_psql(create_table)
sub.refresh()

# put new data
node1.safe_psql('insert into test2 values (\'a\'), (\'b\')')
sub.catchup()
res = node2.execute('select * from test2')
self.assertListEqual(res, [('a',), ('b',)])

# drop subscription
sub.close()
pub.close()

# create new publication and subscription for specific table
# (ommitting copying data as it's already done)
pub = node1.publish('newpub', tables=['test'])
sub = node2.subscribe(pub, 'newsub', copy_data=False)

node1.safe_psql('insert into test values (4, 4)')
sub.catchup()
res = node2.execute('select * from test')
self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)])

# explicitely add table
pub.add_tables(['test2'])
node1.safe_psql('insert into test2 values (\'c\')')
sub.catchup()
res = node2.execute('select * from test2')
self.assertListEqual(res, [('a',), ('b',)])

def test_incorrect_catchup(self):
with get_new_node() as node:
node.init(allow_streaming=True).start()
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp