12
12
import os
13
13
14
14
15
+ def test_fdw (func ):
16
+ """To run tests with FDW support set environment variable TEST_FDW=1"""
17
+ def wrapper (* args ,** kwargs ):
18
+ if os .environ .get ('TEST_FDW' ):
19
+ func (* args ,** kwargs )
20
+ else :
21
+ print ('Warning: FDW features tests are disabled, skipping...' )
22
+ return wrapper
23
+
24
+
15
25
class PartitioningTests (unittest .TestCase ):
16
26
17
27
def setUp (self ):
18
28
self .setup_cmd = [
19
- 'create extension pg_pathman' ,
29
+ # 'create extension pg_pathman',
20
30
'create table abc(id serial, t text)' ,
21
31
'insert into abc select generate_series(1, 300000)' ,
22
32
'select create_hash_partitions(\' abc\' ,\' id\' , 3, partition_data := false)' ,
@@ -26,6 +36,16 @@ def tearDown(self):
26
36
stop_all ()
27
37
# clean_all()
28
38
39
+ def start_new_pathman_cluster (self ,name = 'test' ,allows_streaming = False ):
40
+ node = get_new_node (name )
41
+ node .init (allows_streaming = allows_streaming )
42
+ node .append_conf (
43
+ 'postgresql.conf' ,
44
+ 'shared_preload_libraries=\' pg_pathman\' \n ' )
45
+ node .start ()
46
+ node .psql ('postgres' ,'create extension pg_pathman' )
47
+ return node
48
+
29
49
def init_test_data (self ,node ):
30
50
"""Initialize pg_pathman extension and test data"""
31
51
for cmd in self .setup_cmd :
@@ -42,17 +62,12 @@ def catchup_replica(self, master, replica):
42
62
def printlog (self ,logfile ):
43
63
with open (logfile ,'r' )as log :
44
64
for line in log .readlines ():
45
- print line
65
+ print ( line )
46
66
47
67
def test_concurrent (self ):
48
68
"""Tests concurrent partitioning"""
49
- node = get_new_node ('test' )
50
69
try :
51
- node .init ()
52
- node .append_conf (
53
- 'postgresql.conf' ,
54
- 'shared_preload_libraries=\' pg_pathman\' \n ' )
55
- node .start ()
70
+ node = self .start_new_pathman_cluster ()
56
71
self .init_test_data (node )
57
72
58
73
node .psql (
@@ -95,11 +110,7 @@ def test_replication(self):
95
110
96
111
try :
97
112
# initialize master server
98
- node .init (allows_streaming = True )
99
- node .append_conf (
100
- 'postgresql.conf' ,
101
- 'shared_preload_libraries=\' pg_pathman\' \n ' )
102
- node .start ()
113
+ node = self .start_new_pathman_cluster (allows_streaming = True )
103
114
node .backup ('my_backup' )
104
115
105
116
# initialize replica from backup
@@ -238,8 +249,8 @@ def add_partition(node, flag, query):
238
249
con .commit ()
239
250
240
251
# Now wait until each thread finishes
241
- for i in range ( 3 ) :
242
- threads [ i ] .join ()
252
+ for thread in threads :
253
+ thread .join ()
243
254
244
255
# Check flags, it should be true which means that threads are
245
256
# finished
@@ -277,11 +288,11 @@ def check_tablespace(node, tablename, tablespace):
277
288
'postgresql.conf' ,
278
289
'shared_preload_libraries=\' pg_pathman\' \n ' )
279
290
node .start ()
280
- path = os .path .join (node .data_dir ,'test_space_location' )
281
- os .mkdir (path )
282
291
node .psql ('postgres' ,'create extension pg_pathman' )
283
292
284
293
# create tablespace
294
+ path = os .path .join (node .data_dir ,'test_space_location' )
295
+ os .mkdir (path )
285
296
node .psql (
286
297
'postgres' ,
287
298
'create tablespace test_space location\' {}\' ' .format (path ))
@@ -330,6 +341,71 @@ def check_tablespace(node, tablename, tablespace):
330
341
self .assertTrue (check_tablespace (node ,'abc_prepended_2' ,'pg_default' ))
331
342
self .assertTrue (check_tablespace (node ,'abc_added_2' ,'pg_default' ))
332
343
344
+ @test_fdw
345
+ def test_foreign_table (self ):
346
+ """Test foreign tables"""
347
+
348
+ # Start master server
349
+ master = get_new_node ('test' )
350
+ master .init ()
351
+ master .append_conf (
352
+ 'postgresql.conf' ,
353
+ 'shared_preload_libraries=\' pg_pathman, postgres_fdw\' \n ' )
354
+ master .start ()
355
+ master .psql ('postgres' ,'create extension pg_pathman' )
356
+ master .psql ('postgres' ,'create extension postgres_fdw' )
357
+ master .psql (
358
+ 'postgres' ,
359
+ '''create table abc(id serial, name text);
360
+ select create_range_partitions('abc', 'id', 0, 10, 2)''' )
361
+
362
+ # Current user name (needed for user mapping)
363
+ username = master .execute ('postgres' ,'select current_user' )[0 ][0 ]
364
+
365
+ # Start foreign server
366
+ fserv = get_new_node ('fserv' )
367
+ fserv .init ().start ()
368
+ fserv .safe_psql ('postgres' ,'create table ftable(id serial, name text)' )
369
+ fserv .safe_psql ('postgres' ,'insert into ftable values (25,\' foreign\' )' )
370
+
371
+ # Create foreign table and attach it to partitioned table
372
+ master .safe_psql (
373
+ 'postgres' ,
374
+ '''create server fserv
375
+ foreign data wrapper postgres_fdw
376
+ options (dbname 'postgres', host '127.0.0.1', port '{}')''' .format (fserv .port )
377
+ )
378
+ master .safe_psql (
379
+ 'postgres' ,
380
+ '''create user mapping for {0}
381
+ server fserv
382
+ options (user '{0}')''' .format (username )
383
+ )
384
+ master .safe_psql (
385
+ 'postgres' ,
386
+ '''import foreign schema public limit to (ftable)
387
+ from server fserv into public'''
388
+ )
389
+ master .safe_psql (
390
+ 'postgres' ,
391
+ 'select attach_range_partition(\' abc\' ,\' ftable\' , 20, 30)' )
392
+
393
+ # Check that table attached to partitioned table
394
+ self .assertEqual (
395
+ master .safe_psql ('postgres' ,'select * from ftable' ),
396
+ '25|foreign\n '
397
+ )
398
+
399
+ # Check that we can successfully insert new data into foreign partition
400
+ master .safe_psql ('postgres' ,'insert into abc values (26,\' part\' )' )
401
+ self .assertEqual (
402
+ master .safe_psql ('postgres' ,'select * from ftable order by id' ),
403
+ '25|foreign\n 26|part\n '
404
+ )
405
+
406
+ # Testing drop partitions (including foreign partitions)
407
+ master .safe_psql ('postgres' ,'select drop_partitions(\' abc\' )' )
408
+
333
409
334
410
if __name__ == "__main__" :
335
411
unittest .main ()