@@ -81,8 +81,8 @@ typedef struct PgFdwScanState
8181Portal portal ;/* SPI portal */
8282int numParams ;/* number of parameters passed to query */
8383int tile_pos ;
84+ uint64 table_pos ;
8485HeapTuple spi_tuple ;
85- SPITupleTable * spi_tuptable ;
8686Datum * src_values ;
8787Datum * dst_values ;
8888bool * src_nulls ;
@@ -629,9 +629,13 @@ postgresIterateForeignScan(ForeignScanState *node)
629629MemoryContext oldcontext = MemoryContextSwitchTo (fsstate -> spi_context );
630630
631631while (true) {
632- if (fsstate -> spi_tuptable == NULL ) {
633- SPI_cursor_fetch (fsstate -> portal , true,1 );
634- if (!SPI_processed ) {
632+ if (fsstate -> spi_tuple == NULL ) {
633+ if (fsstate -> portal != NULL ) {
634+ filter_mask = ~0 ;
635+ SPI_cursor_fetch (fsstate -> portal , true,1 );
636+ fsstate -> table_pos = 0 ;
637+ }
638+ if (fsstate -> table_pos == SPI_processed ) {
635639MemoryContextSwitchTo (oldcontext );
636640return ExecClearTuple (slot );
637641}
@@ -642,8 +646,7 @@ postgresIterateForeignScan(ForeignScanState *node)
642646fsstate -> tile_pos = 0 ;
643647fsstate -> filter_mask = filter_mask ;
644648}
645- fsstate -> spi_tuptable = SPI_tuptable ;
646- fsstate -> spi_tuple = SPI_tuptable -> vals [0 ];
649+ fsstate -> spi_tuple = SPI_tuptable -> vals [fsstate -> table_pos ++ ];
647650if (fsstate -> vops_types == NULL ) {
648651fsstate -> vops_types = palloc (sizeof (vops_type_info )* n_attrs );
649652fsstate -> attr_types = palloc (sizeof (Oid )* n_attrs );
@@ -672,7 +675,7 @@ postgresIterateForeignScan(ForeignScanState *node)
672675if (i > 0 )
673676{
674677/* ordinary column */
675- fsstate -> src_values [i - 1 ]= SPI_getbinval (fsstate -> spi_tuple ,fsstate -> spi_tuptable -> tupdesc ,j + 1 ,& fsstate -> src_nulls [i - 1 ]);
678+ fsstate -> src_values [i - 1 ]= SPI_getbinval (fsstate -> spi_tuple ,SPI_tuptable -> tupdesc ,j + 1 ,& fsstate -> src_nulls [i - 1 ]);
676679}
677680j += 1 ;
678681}
@@ -742,8 +745,10 @@ postgresIterateForeignScan(ForeignScanState *node)
742745NextTuple :;
743746}
744747SPI_freetuple (fsstate -> spi_tuple );
745- SPI_freetuptable (fsstate -> spi_tuptable );
746- fsstate -> spi_tuptable = NULL ;
748+ if (fsstate -> portal ) {
749+ SPI_freetuptable (SPI_tuptable );
750+ }
751+ fsstate -> spi_tuple = NULL ;
747752}
748753}
749754
@@ -760,6 +765,8 @@ postgresReScanForeignScan(ForeignScanState *node)
760765bool * nulls = NULL ;
761766MemoryContext oldcontext = MemoryContextSwitchTo (fsstate -> spi_context );
762767Oid * argtypes = NULL ;
768+ int rc ;
769+
763770if (fsstate -> numParams > 0 ) {
764771ExprContext * econtext = node -> ss .ps .ps_ExprContext ;
765772ForeignScan * fsplan = (ForeignScan * )node -> ss .ps .plan ;
@@ -780,8 +787,17 @@ postgresReScanForeignScan(ForeignScanState *node)
780787i += 1 ;
781788}
782789}
783- fsstate -> portal = SPI_cursor_open_with_args (NULL ,fsstate -> query ,fsstate -> numParams ,argtypes ,values ,nulls , true,CURSOR_OPT_PARALLEL_OK );
784- fsstate -> spi_tuptable = NULL ;
790+ if (fsstate -> rel == NULL ) {/* aggregate is pushed down: do not use cusror to allow parallel query execution */
791+ rc = SPI_execute_with_args (fsstate -> query ,fsstate -> numParams ,argtypes ,values ,nulls , true,0 );
792+ if (rc != SPI_OK_SELECT ) {
793+ elog (ERROR ,"Failed to execute VOPS query %s: %d" ,fsstate -> query ,rc );
794+ }
795+ fsstate -> portal = NULL ;
796+ }else {
797+ fsstate -> portal = SPI_cursor_open_with_args (NULL ,fsstate -> query ,fsstate -> numParams ,argtypes ,values ,nulls , true,CURSOR_OPT_PARALLEL_OK );
798+ }
799+ fsstate -> table_pos = 0 ;
800+ fsstate -> spi_tuple = NULL ;
785801
786802MemoryContextSwitchTo (oldcontext );
787803}
@@ -800,7 +816,9 @@ postgresEndForeignScan(ForeignScanState *node)
800816{
801817MemoryContext oldcontext = MemoryContextSwitchTo (fsstate -> spi_context );
802818
803- SPI_cursor_close (fsstate -> portal );
819+ if (fsstate -> portal ) {
820+ SPI_cursor_close (fsstate -> portal );
821+ }
804822SPI_finish ();
805823
806824MemoryContextSwitchTo (oldcontext );