testgres package

testgres.api

Testing framework for PostgreSQL and its extensions

This module was created under influence of Postgres TAP test feature(PostgresNode.pm module). It can manage Postgres clusters: initialize,edit configuration files, start/stop cluster, execute queries. Thetypical flow may look like:

>>>withget_new_node()asnode:...node.init().start()...result=node.safe_psql('postgres','select 1')...print(result.decode('utf-8').strip())...node.stop()PostgresNode(name='...', port=..., base_dir='...')1PostgresNode(name='...', port=..., base_dir='...')
Or:
>>>withget_new_node()asmaster:...master.init().start()...withmaster.backup()asbackup:...withbackup.spawn_replica()asreplica:...replica=replica.start()...master.execute('postgres','create table test (val int4)')...master.execute('postgres','insert into test values (0), (1), (2)')...replica.catchup()# wait until changes are visible...print(replica.execute('postgres','select count(*) from test'))PostgresNode(name='...', port=..., base_dir='...')[(3,)]
testgres.api.get_new_node(name=None,base_dir=None,**kwargs)

Simply a wrapper aroundPostgresNode constructor.SeePostgresNode.__init__() for details.

testgres.backup

classtestgres.backup.NodeBackup(node,base_dir=None,username=None,xlog_method=<XLogMethod.fetch: 'fetch'>)

Bases:object

Smart object responsible for backups

cleanup()

Remove all files that belong to this backup.No-op if it’s been converted to a PostgresNode (destroy=True).

log_file
spawn_primary(name=None,destroy=True)

Create a primary node from a backup.

Parameters:
  • name – primary’s application name.
  • destroy – should we convert this backup into a node?
Returns:

New instance ofPostgresNode.

spawn_replica(name=None,destroy=True,slot=None)

Create a replica of the original node from a backup.

Parameters:
  • name – replica’s application name.
  • slot – create a replication slot with the specified name.
  • destroy – should we convert this backup into a node?
Returns:

New instance ofPostgresNode.

testgres.config

classtestgres.config.GlobalConfig(**options)

Bases:object

Global configuration object which allows user to override default settings.

copy()

Return a copy of this object.

items()

Return setting-value pairs.

keys()

Return a list of all available settings.

update(config)

Extract setting-value pairs from ‘config’ andassign those values to corresponding settingsof this GlobalConfig object.

cache_initdb = True

shall we use cached initdb instance?

cache_pg_config = True

shall we cache pg_config results?

cached_initdb_dir

path to a temp directory for cached initdb.

cached_initdb_unique = False

shall we give new node a unique system id?

error_log_lines = 20

N of log lines to be shown in exceptions (0=inf).

node_cleanup_full = True

shall we remove EVERYTHING (including logs)?

node_cleanup_on_bad_exit = False

remove base_dir on __exit__() via exception.

node_cleanup_on_good_exit = True

remove base_dir on nominal __exit__().

temp_dir

path to temp dir containing nodes with default ‘base_dir’.

use_python_logging = False

enable python logging subsystem (see logger.py).

testgres.config.configure_testgres(**options)

Adjust current global options.Look at the GlobalConfig to learn about existing settings.

testgres.config.pop_config()

Set previous GlobalConfig options from stack.

testgres.config.push_config(**options)

Permanently set custom GlobalConfig options andput previous settings on top of the config stack.

testgres.config.scoped_config(**options)

Temporarily set custom GlobalConfig options for this context.Previous options are pushed to the config stack.

Example

>>>from.apiimportget_new_node>>>withscoped_config(cache_initdb=False):...# create a new node with fresh initdb...withget_new_node().init().start()asnode:...print(node.execute('select 1'))[(1,)]

testgres.connection

classtestgres.connection.NodeConnection(node,dbname=None,username=None,password=None,autocommit=False)

Bases:object

Transaction wrapper returned by Node

begin(isolation_level=<IsolationLevel.ReadCommitted: 'read committed'>)
close()
commit()
connection
cursor
execute(query,*args)
node
pid
rollback()

testgres.enums

classtestgres.enums.DumpFormat

Bases:enum.Enum

Available dump formats

Custom = 'custom'
Directory = 'directory'
Plain = 'plain'
Tar = 'tar'
classtestgres.enums.IsolationLevel

Bases:enum.Enum

Transaction isolation level forNodeConnection

ReadCommitted = 'read committed'
ReadUncommitted = 'read uncommitted'
RepeatableRead = 'repeatable read'
Serializable = 'serializable'
classtestgres.enums.NodeStatus

Bases:enum.IntEnum

Status of a PostgresNode

Running = 0
Stopped = 1
Uninitialized = 2
classtestgres.enums.ProcessType

Bases:enum.Enum

Types of processes

AutovacuumLauncher = 'autovacuum launcher'
BackgroundWriter = 'background writer'
Checkpointer = 'checkpointer'
LogicalReplicationLauncher = 'logical replication launcher'
Startup = 'startup'
StatsCollector = 'stats collector'
Unknown = 'unknown'
WalReceiver = 'wal receiver'
WalSender = 'wal sender'
WalWriter = 'wal writer'
classtestgres.enums.XLogMethod

Bases:enum.Enum

Available WAL methods forNodeBackup

fetch = 'fetch'
none = 'none'
stream = 'stream'

testgres.exceptions

exceptiontestgres.exceptions.BackupException

Bases:testgres.exceptions.TestgresException

exceptiontestgres.exceptions.CatchUpException(message=None,query=None)

Bases:testgres.exceptions.QueryException

exceptiontestgres.exceptions.ExecUtilException(message=None,command=None,exit_code=0,out=None)

Bases:testgres.exceptions.TestgresException

exceptiontestgres.exceptions.InitNodeException

Bases:testgres.exceptions.TestgresException

exceptiontestgres.exceptions.QueryException(message=None,query=None)

Bases:testgres.exceptions.TestgresException

exceptiontestgres.exceptions.StartNodeException(message=None,files=None)

Bases:testgres.exceptions.TestgresException

exceptiontestgres.exceptions.TestgresException

Bases:Exception

exceptiontestgres.exceptions.TimeoutException(message=None,query=None)

Bases:testgres.exceptions.QueryException

testgres.node

classtestgres.node.PostgresNode(name=None,port=None,base_dir=None)
__init__(name=None,port=None,base_dir=None)

PostgresNode constructor.

Parameters:
  • name – node’s application name.
  • port – port to accept connections.
  • base_dir – path to node’s data directory.
append_conf(line='',filename='postgresql.conf',**kwargs)

Append line to a config file.

Parameters:
  • line – string to be appended to config.
  • filename – config file (postgresql.conf by default).
  • **kwargs – named config options.
Returns:

This instance ofPostgresNode.

Examples

>>>append_conf(fsync=False)>>>append_conf('log_connections = yes')>>>append_conf(random_page_cost=1.5,fsync=True,...)>>>append_conf('postgresql.conf','synchronous_commit = off')
auxiliary_pids

Returns a dict of { ProcessType – PID }.

auxiliary_processes

Returns a list of auxiliary processes.Each process is represented byProcessProxy object.

backup(**kwargs)

Perform pg_basebackup.

Parameters:
  • username – database user name.
  • xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
  • base_dir – the base directory for data files and logs
Returns:

A smart object of type NodeBackup.

catchup(dbname=None,username=None)

Wait until async replica catches up with its master.

child_processes

Returns a list of all child processes.Each process is represented byProcessProxy object.

cleanup(max_attempts=3)

Stop node if needed and remove its data/logs directory.NOTE: take a look at TestgresConfig.node_cleanup_full.

Parameters:max_attempts – how many times should we try to stop()?
Returns:This instance ofPostgresNode.
connect(dbname=None,username=None,password=None,autocommit=False)

Connect to a database.

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • password – user’s password.
  • autocommit – commit each statement automatically. Also it should beset toTrue for statements requiring to be run outsidea transaction? such asVACUUM orCREATE DATABASE.
Returns:

An instance ofNodeConnection.

default_conf(fsync=False,unix_sockets=True,allow_streaming=True,allow_logical=False,log_statement='all')

Apply default settings to this node.

Parameters:
  • fsync – should this node use fsync to keep data safe?
  • unix_sockets – should we enable UNIX sockets?
  • allow_streaming – should this node add a hba entry for replication?
  • allow_logical – can this node be used as a logical replication publisher?
  • log_statement – one of (‘all’, ‘off’, ‘mod’, ‘ddl’).
Returns:

This instance ofPostgresNode.

dump(filename=None,dbname=None,username=None,format=<DumpFormat.Plain: 'plain'>)

Dump database into a file using pg_dump.NOTE: the file is not removed automatically.

Parameters:
  • filename – database dump taken by pg_dump.
  • dbname – database name to connect to.
  • username – database user name.
  • format – format argument plain/custom/directory/tar.
Returns:

Path to a file containing dump.

execute(query,dbname=None,username=None,password=None,commit=True)

Execute a query and return all rows as list.

Parameters:
  • query – query to be executed.
  • dbname – database name to connect to.
  • username – database user name.
  • password – user’s password.
  • commit – should we commit this query?
Returns:

A list of tuples representing rows.

free_port()

Reclaim port owned by this node.NOTE: does not free auto selected ports.

get_control_data()

Return contents of pg_control file.

init(initdb_params=None,**kwargs)

Perform initdb for this node.

Parameters:
  • initdb_params – parameters for initdb (list).
  • fsync – should this node use fsync to keep data safe?
  • unix_sockets – should we enable UNIX sockets?
  • allow_streaming – should this node add a hba entry for replication?
Returns:

This instance ofPostgresNode

pg_ctl(params)

Invoke pg_ctl with params.

Parameters:params – arguments for pg_ctl.
Returns:Stdout + stderr of pg_ctl.
pgbench(dbname=None,username=None,stdout=None,stderr=None,options=[])

Spawn a pgbench process.

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • stdout – stdout file to be used by Popen.
  • stderr – stderr file to be used by Popen.
  • options – additional options for pgbench (list).
Returns:

Process created by subprocess.Popen.

pgbench_init(**kwargs)

Small wrapper for pgbench_run().Sets initialize=True.

Returns:This instance ofPostgresNode.
pgbench_run(dbname=None,username=None,options=[],**kwargs)

Run pgbench with some options.This event is logged (see self.utils_log_file).

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • options – additional options for pgbench (list).
  • **kwargs – named options for pgbench.Run pgbench –help to learn more.
Returns:

Stdout produced by pgbench.

Examples

>>>pgbench_run(initialize=True,scale=2)>>>pgbench_run(time=10)
pid

Return postmaster’s PID if node is running, else 0.

poll_query_until(query,dbname=None,username=None,max_attempts=0,sleep_time=1,expected=True,commit=True,suppress=None)

Run a query once per second until it returns ‘expected’.Query should return a single value (1 row, 1 column).

Parameters:
  • query – query to be executed.
  • dbname – database name to connect to.
  • username – database user name.
  • max_attempts – how many times should we try? 0 == infinite
  • sleep_time – how much should we sleep after a failure?
  • expected – what should be returned to break the cycle?
  • commit – should (possible) changes be committed?
  • suppress – a collection of exceptions to be suppressed.

Examples

>>>poll_query_until('select true')>>>poll_query_until('postgres',"select now() > '01.01.2018'")>>>poll_query_until('select false',expected=True,max_attempts=4)>>>poll_query_until('select 1',suppress={testgres.OperationalError})
promote(dbname=None,username=None)

Promote standby instance to master using pg_ctl. For PostgreSQL versionsbelow 10 some additional actions required to ensure that instancebecame writable and hencedbname andusername parameters may beneeded.

Returns:This instance ofPostgresNode.
psql(query=None,filename=None,dbname=None,username=None,input=None,**variables)

Execute a query using psql.

Parameters:
  • query – query to be executed.
  • filename – file with a query.
  • dbname – database name to connect to.
  • username – database user name.
  • input – raw input to be passed.
  • **variables – vars to be set before execution.
Returns:

A tuple of (code, stdout, stderr).

Examples

>>>psql('select 1')>>>psql('postgres','select 2')>>>psql(query='select 3',ON_ERROR_STOP=1)
publish(name,**kwargs)

Create publication for logical replication

Parameters:
  • pubname – publication name
  • tables – tables names list
  • dbname – database name where objects or interest are located
  • username – replication username
reload(params=[])

Reload config files using pg_ctl.

Parameters:params – additional arguments for pg_ctl.
Returns:This instance ofPostgresNode.
replicate(name=None,slot=None,**kwargs)

Create a binary replica of this node.

Parameters:
  • name – replica’s application name.
  • slot – create a replication slot with the specified name.
  • username – database user name.
  • xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
  • base_dir – the base directory for data files and logs
restart(params=[])

Restart this node using pg_ctl.

Parameters:params – additional arguments for pg_ctl.
Returns:This instance ofPostgresNode.
restore(filename,dbname=None,username=None)

Restore database from pg_dump’s file.

Parameters:
  • filename – database dump taken by pg_dump in custom/directory/tar formats.
  • dbname – database name to connect to.
  • username – database user name.
safe_psql(query=None,**kwargs)

Execute a query using psql.

Parameters:
  • query – query to be executed.
  • filename – file with a query.
  • dbname – database name to connect to.
  • username – database user name.
  • input – raw input to be passed.
  • are passed to psql() (**kwargs) –
Returns:

psql’s output as str.

source_walsender

Returns master’s walsender feeding this replica.

start(params=[],wait=True)

Start this node using pg_ctl.

Parameters:
  • params – additional arguments for pg_ctl.
  • wait – wait until operation completes.
Returns:

This instance ofPostgresNode.

status()

Check this node’s status.

Returns:An instance ofNodeStatus.
stop(params=[],wait=True)

Stop this node using pg_ctl.

Parameters:
  • params – additional arguments for pg_ctl.
  • wait – wait until operation completes.
Returns:

This instance ofPostgresNode.

subscribe(publication,name,dbname=None,username=None,**params)

Create subscription for logical replication

Parameters:
  • name – subscription name
  • publication – publication object obtained from publish()
  • dbname – database name
  • username – replication username
  • params – subscription parameters (see documentation onCREATE SUBSCRIPTIONfor details)
version

Return PostgreSQL version for this node.

Returns:Instance ofdistutils.version.LooseVersion.
classtestgres.node.ProcessProxy(process,ptype=None)

Wrapper for psutil.Process

process

wrapped psutill.Process object

ptype

instance of ProcessType

testgres.pubsub

Unlike physical replication the logical replication allows users replicate onlyspecified databases and tables. It uses publish-subscribe model with possiblymultiple publishers and multiple subscribers. When initializing publisher’snodeallow_logical=True should be passed to thePostgresNode.init()method to enable PostgreSQL to write extra information to the WAL needed bylogical replication.

To replicate tableX from node A to node B the same table structure shouldbe defined on the subscriber’s node as logical replication don’t replicate DDL.After thatpublish() andsubscribe()methods may be used to setup replication. Example:

>>>fromtestgresimportget_new_node>>>withget_new_node()asnodeA,get_new_node()asnodeB:...nodeA.init(allow_logical=True).start()...nodeB.init().start()......# create same table both on publisher and subscriber...create_table='create table test (a int, b int)'...nodeA.safe_psql(create_table)...nodeB.safe_psql(create_table)......# create publication...pub=nodeA.publish('mypub')...# create subscription...sub=nodeB.subscribe(pub,'mysub')......# insert some data to the publisher's node...nodeA.execute('insert into test values (1, 1), (2, 2)')......# wait until changes apply on subscriber and check them...sub.catchup()......# read the data from subscriber's node...nodeB.execute('select * from test')PostgresNode(name='...', port=..., base_dir='...')PostgresNode(name='...', port=..., base_dir='...')''''[(1, 1), (2, 2)]
classtestgres.node.Publication(name,node,tables=None,dbname=None,username=None)
__init__(name,node,tables=None,dbname=None,username=None)

Constructor. UsePostgresNode.publish() instead of directconstructing publication objects.

Parameters:
  • name – publication name.
  • node – publisher’s node.
  • tables – tables list or None for all tables.
  • dbname – database name used to connect and perform subscription.
  • username – username used to connect to the database.
add_tables(tables,dbname=None,username=None)

Add tables to the publication. Cannot be used if publication wascreated with empty tables list.

Parameters:tables – a list of tables to be added to the publication.
drop(dbname=None,username=None)

Drop publication

classtestgres.node.Subscription(node,publication,name=None,dbname=None,username=None,**params)
__init__(node,publication,name=None,dbname=None,username=None,**params)

Constructor. UsePostgresNode.subscribe() instead of directconstructing subscription objects.

Parameters:
  • name – subscription name.
  • node – subscriber’s node.
  • publicationPublication object we are subscribing to(seePostgresNode.publish()).
  • dbname – database name used to connect and perform subscription.
  • username – username used to connect to the database.
  • params

    subscription parameters (see documentation onCREATE SUBSCRIPTIONfor details).

catchup(username=None)

Wait until subscription catches up with publication.

Parameters:username – remote node’s user name.
disable(dbname=None,username=None)

Disables the running subscription.

drop(dbname=None,username=None)

Drops subscription

enable(dbname=None,username=None)

Enables the previously disabled subscription.

refresh(copy_data=True,dbname=None,username=None)

Disables the running subscription.