1+ #!/usr/bin/env python3
12# coding: utf-8
3+
24"""
35 concurrent_partitioning_test.py
46Tests concurrent partitioning worker with simultaneous update queries
57
6- Copyright (c) 2015-2016 , Postgres Professional
8+ Copyright (c) 2015-2017 , Postgres Professional
79"""
810
911import unittest
1012import math
11- from testgres import get_new_node ,stop_all
1213import time
1314import os
1415import re
1516import subprocess
1617import threading
1718
19+ from testgres import get_new_node ,stop_all
20+
1821
1922# Helper function for json equality
2023def ordered (obj ):
@@ -25,6 +28,7 @@ def ordered(obj):
2528else :
2629return obj
2730
31+
2832def if_fdw_enabled (func ):
2933"""To run tests with FDW support set environment variable TEST_FDW=1"""
3034def wrapper (* args ,** kwargs ):
@@ -110,7 +114,7 @@ def test_concurrent(self):
110114self .assertEqual (data [0 ][0 ],300000 )
111115
112116node .stop ()
113- except Exception , e :
117+ except Exception as e :
114118self .printlog (node .logs_dir + '/postgresql.log' )
115119raise e
116120
@@ -175,7 +179,7 @@ def test_replication(self):
175179node .execute ('postgres' ,'select count(*) from abc' )[0 ][0 ],
1761800
177181)
178- except Exception , e :
182+ except Exception as e :
179183self .printlog (node .logs_dir + '/postgresql.log' )
180184self .printlog (replica .logs_dir + '/postgresql.log' )
181185raise e
@@ -199,7 +203,7 @@ def get(self):
199203
200204# There is one flag for each thread which shows if thread have done
201205# its work
202- flags = [Flag (False )for i in xrange (3 )]
206+ flags = [Flag (False )for i in range (3 )]
203207
204208# All threads synchronizes though this lock
205209lock = threading .Lock ()
@@ -275,9 +279,9 @@ def add_partition(node, flag, query):
275279'postgres' ,
276280'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
277281),
278- '6\n '
282+ b '6\n '
279283)
280- except Exception , e :
284+ except Exception as e :
281285self .printlog (node .logs_dir + '/postgresql.log' )
282286raise e
283287
@@ -422,14 +426,14 @@ def test_foreign_table(self):
422426# Check that table attached to partitioned table
423427self .assertEqual (
424428master .safe_psql ('postgres' ,'select * from ftable' ),
425- '25|foreign\n '
429+ b '25|foreign\n '
426430)
427431
428432# Check that we can successfully insert new data into foreign partition
429433master .safe_psql ('postgres' ,'insert into abc values (26,\' part\' )' )
430434self .assertEqual (
431435master .safe_psql ('postgres' ,'select * from ftable order by id' ),
432- '25|foreign\n 26|part\n '
436+ b '25|foreign\n 26|part\n '
433437)
434438
435439# Testing drop partitions (including foreign partitions)
@@ -459,7 +463,7 @@ def test_foreign_table(self):
459463
460464self .assertEqual (
461465master .safe_psql ('postgres' ,'select * from hash_test' ),
462- '1|\n 2|\n 5|\n 6|\n 8|\n 9|\n 3|\n 4|\n 7|\n 10|\n '
466+ b '1|\n 2|\n 5|\n 6|\n 8|\n 9|\n 3|\n 4|\n 7|\n 10|\n '
463467)
464468master .safe_psql ('postgres' ,'select drop_partitions(\' hash_test\' )' )
465469
@@ -851,63 +855,72 @@ def turnon_pathman(node):
851855"--dbname=copy" ],
852856cmp_full ),# dump in archive format
853857]
854- for preproc ,postproc ,pg_dump_params ,pg_restore_params ,cmp_dbs in test_params :
855858
856- dump_restore_cmd = " | " .join ((' ' .join (pg_dump_params ),' ' .join (pg_restore_params )))
857-
858- if (preproc != None ):
859- preproc (node )
860-
861- # transfer and restore data
859+ try :
862860FNULL = open (os .devnull ,'w' )
863- p1 = subprocess .Popen (pg_dump_params ,stdout = subprocess .PIPE )
864- p2 = subprocess .Popen (pg_restore_params ,stdin = p1 .stdout ,stdout = FNULL ,stderr = FNULL )
865- p1 .stdout .close ()# Allow p1 to receive a SIGPIPE if p2 exits.
866- p2 .communicate ()
867-
868- if (postproc != None ):
869- postproc (node )
870-
871- # check validity of data
872- with node .connect ('initial' )as con1 ,node .connect ('copy' )as con2 :
873-
874- # compare plans and contents of initial and copy
875- cmp_result = cmp_dbs (con1 ,con2 )
876- self .assertNotEqual (cmp_result ,PLANS_MISMATCH ,
877- "mismatch in plans of select query on partitioned tables under the command: %s" % dump_restore_cmd )
878- self .assertNotEqual (cmp_result ,CONTENTS_MISMATCH ,
879- "mismatch in contents of partitioned tables under the command: %s" % dump_restore_cmd )
880-
881- # compare enable_parent flag and callback function
882- config_params_query = """
883- select partrel, enable_parent, init_callback from pathman_config_params
884- """
885- config_params_initial ,config_params_copy = {}, {}
886- for row in con1 .execute (config_params_query ):
887- config_params_initial [row [0 ]]= row [1 :]
888- for row in con2 .execute (config_params_query ):
889- config_params_copy [row [0 ]]= row [1 :]
890- self .assertEqual (config_params_initial ,config_params_copy , \
891- "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
892-
893- # compare constraints on each partition
894- constraints_query = """
895- select r.relname, c.conname, c.consrc from
896- pg_constraint c join pg_class r on c.conrelid=r.oid
897- where relname similar to '(range|hash)_partitioned_\d+'
898- """
899- constraints_initial ,constraints_copy = {}, {}
900- for row in con1 .execute (constraints_query ):
901- constraints_initial [row [0 ]]= row [1 :]
902- for row in con2 .execute (constraints_query ):
903- constraints_copy [row [0 ]]= row [1 :]
904- self .assertEqual (constraints_initial ,constraints_copy , \
905- "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
906-
907- # clear copy database
908- node .psql ('copy' ,'drop schema public cascade' )
909- node .psql ('copy' ,'create schema public' )
910- node .psql ('copy' ,'drop extension pg_pathman cascade' )
861+
862+ for preproc ,postproc ,pg_dump_params ,pg_restore_params ,cmp_dbs in test_params :
863+
864+ dump_restore_cmd = " | " .join ((' ' .join (pg_dump_params ),' ' .join (pg_restore_params )))
865+
866+ if (preproc != None ):
867+ preproc (node )
868+
869+ # transfer and restore data
870+ p1 = subprocess .Popen (pg_dump_params ,stdout = subprocess .PIPE )
871+ stdoutdata ,_ = p1 .communicate ()
872+ p2 = subprocess .Popen (pg_restore_params ,stdin = subprocess .PIPE ,
873+ stdout = FNULL ,stderr = FNULL )
874+ p2 .communicate (input = stdoutdata )
875+
876+ if (postproc != None ):
877+ postproc (node )
878+
879+ # check validity of data
880+ with node .connect ('initial' )as con1 ,node .connect ('copy' )as con2 :
881+
882+ # compare plans and contents of initial and copy
883+ cmp_result = cmp_dbs (con1 ,con2 )
884+ self .assertNotEqual (cmp_result ,PLANS_MISMATCH ,
885+ "mismatch in plans of select query on partitioned tables under the command: %s" % dump_restore_cmd )
886+ self .assertNotEqual (cmp_result ,CONTENTS_MISMATCH ,
887+ "mismatch in contents of partitioned tables under the command: %s" % dump_restore_cmd )
888+
889+ # compare enable_parent flag and callback function
890+ config_params_query = """
891+ select partrel, enable_parent, init_callback from pathman_config_params
892+ """
893+ config_params_initial ,config_params_copy = {}, {}
894+ for row in con1 .execute (config_params_query ):
895+ config_params_initial [row [0 ]]= row [1 :]
896+ for row in con2 .execute (config_params_query ):
897+ config_params_copy [row [0 ]]= row [1 :]
898+ self .assertEqual (config_params_initial ,config_params_copy , \
899+ "mismatch in pathman_config_params under the command: %s" % dump_restore_cmd )
900+
901+ # compare constraints on each partition
902+ constraints_query = """
903+ select r.relname, c.conname, c.consrc from
904+ pg_constraint c join pg_class r on c.conrelid=r.oid
905+ where relname similar to '(range|hash)_partitioned_\d+'
906+ """
907+ constraints_initial ,constraints_copy = {}, {}
908+ for row in con1 .execute (constraints_query ):
909+ constraints_initial [row [0 ]]= row [1 :]
910+ for row in con2 .execute (constraints_query ):
911+ constraints_copy [row [0 ]]= row [1 :]
912+ self .assertEqual (constraints_initial ,constraints_copy , \
913+ "mismatch in partitions' constraints under the command: %s" % dump_restore_cmd )
914+
915+ # clear copy database
916+ node .psql ('copy' ,'drop schema public cascade' )
917+ node .psql ('copy' ,'create schema public' )
918+ node .psql ('copy' ,'drop extension pg_pathman cascade' )
919+
920+ except :
921+ raise
922+ finally :
923+ FNULL .close ()
911924
912925# Stop instance and finish work
913926node .stop ()
@@ -958,24 +971,24 @@ def test_concurrent_detach(self):
958971"-T" ,"%i" % (test_interval + inserts_advance )
959972])
960973time .sleep (inserts_advance )
961- detachs = node .pgbench (stdout = FNULL ,stderr = subprocess . PIPE ,options = [
974+ detachs = node .pgbench (stdout = FNULL ,stderr = FNULL ,options = [
962975"-D" ,"timeout=%f" % detach_timeout ,
963976"-f" ,detach_pgbench_script ,
964977"-T" ,"%i" % test_interval
965978])
966979
967980# Wait for completion of processes
968- inserts .wait ()
981+ _ , stderrdata = inserts .communicate ()
969982detachs .wait ()
970983
971984# Obtain error log from inserts process
972- inserts_errors = inserts .stderr .read ()
973- self .assertIsNone (re .search ("ERROR|FATAL|PANIC" ,inserts_errors ),
985+ self .assertIsNone (re .search ("ERROR|FATAL|PANIC" ,str (stderrdata )),
974986msg = "Race condition between detach and concurrent inserts with append partition is expired" )
975987
976988# Stop instance and finish work
977989node .stop ()
978990node .cleanup ()
991+ FNULL .close ()
979992
980993
981994if __name__ == "__main__" :