@@ -409,6 +409,8 @@ def test_foreign_table(self):
409409def test_parallel_nodes (self ):
410410"""Test parallel queries under partitions"""
411411
412+ import json
413+
412414# Init and start postgres instance with preload pg_pathman module
413415node = get_new_node ('test' )
414416node .init ()
@@ -419,8 +421,8 @@ def test_parallel_nodes(self):
419421
420422# Check version of postgres server
421423# If version < 9.6 skip all tests for parallel queries
422- version = node .psql ("postgres" ,"show server_version_num" )
423- if int ( version [ 1 ]) < 90600 :
424+ version = int ( node .psql ("postgres" ,"show server_version_num" )[ 1 ] )
425+ if version < 90600 :
424426return
425427
426428# Prepare test database
@@ -435,6 +437,26 @@ def test_parallel_nodes(self):
435437node .psql ('postgres' ,'select create_hash_partitions(\' hash_partitioned\' ,\' i\' , 10)' )
436438node .psql ('postgres' ,'vacuum analyze hash_partitioned' )
437439
440+ node .psql ('postgres' ,"""
441+ create or replace function query_plan(query text) returns jsonb as $$
442+ declare
443+ plan jsonb;
444+ begin
445+ execute 'explain (costs off, format json)' || query into plan;
446+ return plan;
447+ end;
448+ $$ language plpgsql;
449+ """ )
450+
451+ # Helper function for json equality
452+ def ordered (obj ):
453+ if isinstance (obj ,dict ):
454+ return sorted ((k ,ordered (v ))for k ,v in obj .items ())
455+ if isinstance (obj ,list ):
456+ return sorted (ordered (x )for x in obj )
457+ else :
458+ return obj
459+
438460# Test parallel select
439461with node .connect ()as con :
440462con .execute ('set max_parallel_workers_per_gather = 2' )
@@ -443,35 +465,120 @@ def test_parallel_nodes(self):
443465con .execute ('set parallel_tuple_cost = 0' )
444466
445467# Check parallel aggregate plan
446- plan = con .execute ('explain (costs off) select count(*) from range_partitioned where i < 1500' )
447- expected = [('Finalize Aggregate' ,),
448- (' -> Gather' ,),
449- (' Workers Planned: 2' ,),
450- (' -> Partial Aggregate' ,),
451- (' -> Append' ,),
452- (' -> Parallel Seq Scan on range_partitioned_1' ,),
453- (' -> Parallel Seq Scan on range_partitioned_2' ,),
454- (' Filter: (i < 1500)' ,)]
455- self .assertEqual (plan ,expected )
468+ test_query = 'select count(*) from range_partitioned where i < 1500'
469+ plan = con .execute ('select query_plan(\' %s\' )' % test_query )[0 ][0 ]
470+ expected = json .loads ("""
471+ [
472+ {
473+ "Plan": {
474+ "Node Type": "Aggregate",
475+ "Strategy": "Plain",
476+ "Partial Mode": "Finalize",
477+ "Parallel Aware": false,
478+ "Plans": [
479+ {
480+ "Node Type": "Gather",
481+ "Parent Relationship": "Outer",
482+ "Parallel Aware": false,
483+ "Workers Planned": 2,
484+ "Single Copy": false,
485+ "Plans": [
486+ {
487+ "Node Type": "Aggregate",
488+ "Strategy": "Plain",
489+ "Partial Mode": "Partial",
490+ "Parent Relationship": "Outer",
491+ "Parallel Aware": false,
492+ "Plans": [
493+ {
494+ "Node Type": "Append",
495+ "Parent Relationship": "Outer",
496+ "Parallel Aware": false,
497+ "Plans": [
498+ {
499+ "Node Type": "Seq Scan",
500+ "Parent Relationship": "Member",
501+ "Parallel Aware": true,
502+ "Relation Name": "range_partitioned_2",
503+ "Alias": "range_partitioned_2",
504+ "Filter": "(i < 1500)"
505+ },
506+ {
507+ "Node Type": "Seq Scan",
508+ "Parent Relationship": "Member",
509+ "Parallel Aware": true,
510+ "Relation Name": "range_partitioned_1",
511+ "Alias": "range_partitioned_1"
512+ }
513+ ]
514+ }
515+ ]
516+ }
517+ ]
518+ }
519+ ]
520+ }
521+ }
522+ ]
523+ """ )
524+ self .assertEqual (ordered (plan ),ordered (expected ))
456525
457526# Check count of returned tuples
458- count = con .execute ('select count(*) from range_partitioned where i < 1500' )
459- self .assertEqual (count [ 0 ][ 0 ] ,1499 )
527+ count = con .execute ('select count(*) from range_partitioned where i < 1500' )[ 0 ][ 0 ]
528+ self .assertEqual (count ,1499 )
460529
461530# Check simple parallel seq scan plan with limit
462- plan = con .execute ('explain (costs off) select * from range_partitioned where i < 1500 limit 5' )
463- expected = [('Limit' ,),
464- (' -> Gather' ,),
465- (' Workers Planned: 2' ,),
466- (' -> Append' ,),
467- (' -> Parallel Seq Scan on range_partitioned_1' ,),
468- (' -> Parallel Seq Scan on range_partitioned_2' ,),
469- (' Filter: (i < 1500)' ,)]
470- self .assertEqual (plan ,expected )
531+ test_query = 'select * from range_partitioned where i < 1500 limit 5'
532+ plan = con .execute ('select query_plan(\' %s\' )' % test_query )[0 ][0 ]
533+ expected = json .loads ("""
534+ [
535+ {
536+ "Plan": {
537+ "Node Type": "Limit",
538+ "Parallel Aware": false,
539+ "Plans": [
540+ {
541+ "Node Type": "Gather",
542+ "Parent Relationship": "Outer",
543+ "Parallel Aware": false,
544+ "Workers Planned": 2,
545+ "Single Copy": false,
546+ "Plans": [
547+ {
548+ "Node Type": "Append",
549+ "Parent Relationship": "Outer",
550+ "Parallel Aware": false,
551+ "Plans": [
552+ {
553+ "Node Type": "Seq Scan",
554+ "Parent Relationship": "Member",
555+ "Parallel Aware": true,
556+ "Relation Name": "range_partitioned_2",
557+ "Alias": "range_partitioned_2",
558+ "Filter": "(i < 1500)"
559+ },
560+ {
561+ "Node Type": "Seq Scan",
562+ "Parent Relationship": "Member",
563+ "Parallel Aware": true,
564+ "Relation Name": "range_partitioned_1",
565+ "Alias": "range_partitioned_1"
566+ }
567+ ]
568+ }
569+ ]
570+ }
571+ ]
572+ }
573+ }
574+ ]
575+ """ )
576+ self .assertEqual (ordered (plan ),ordered (expected ))
471577
472578# Check tuples returned by query above
473579res_tuples = con .execute ('select * from range_partitioned where i < 1500 limit 5' )
474- expected = [(1 ,), (2 ,), (3 ,), (4 ,), (5 ,)]
580+ res_tuples = sorted (map (lambda x :x [0 ],res_tuples ))
581+ expected = [1 ,2 ,3 ,4 ,5 ]
475582self .assertEqual (res_tuples ,expected )
476583# import ipdb; ipdb.set_trace()
477584