@@ -276,8 +276,6 @@ create_arrangeappend_plan(PlannerInfo *root, RelOptInfo *rel,
276
276
lfirst (plan_cell )= subplan ;
277
277
}
278
278
279
- /* TODO: write node_XXX wariables to custom_private */
280
-
281
279
pack_arrangeappend_private (node ,& mag );
282
280
283
281
return plan ;
@@ -299,46 +297,75 @@ arrangeappend_create_scan_state(CustomScan *node)
299
297
void
300
298
arrangeappend_begin (CustomScanState * node ,EState * estate ,int eflags )
301
299
{
302
- ArrangeAppendState * scan_state = (ArrangeAppendState * )node ;
303
-
304
300
begin_append_common (node ,estate ,eflags );
305
301
}
306
302
307
303
TupleTableSlot *
308
304
arrangeappend_exec (CustomScanState * node )
309
305
{
310
- ArrangeAppendState * scan_state = (ArrangeAppendState * )node ;
311
- RuntimeAppendState * rstate = & scan_state -> rstate ;
306
+ ArrangeAppendState * scan_state = (ArrangeAppendState * )node ;
307
+ RuntimeAppendState * rstate = & scan_state -> rstate ;
308
+ PlanState * ps ;
309
+ int i ;
312
310
313
311
if (scan_state -> rstate .ncur_plans == 0 )
314
312
ExecReScan (& node -> ss .ps );
315
313
316
- while (rstate -> running_idx < rstate -> ncur_plans )
314
+ if (!scan_state -> ms_initialized )
315
+ {
316
+ for (i = 0 ;i < scan_state -> rstate .ncur_plans ;i ++ )
317
+ {
318
+ ChildScanCommon child = scan_state -> rstate .cur_plans [i ];
319
+ PlanState * ps = child -> content .plan_state ;
320
+
321
+ Assert (child -> content_type == CHILD_PLAN_STATE );
322
+
323
+ scan_state -> ms_slots [i ]= ExecProcNode (ps );
324
+ if (!TupIsNull (scan_state -> ms_slots [i ]))
325
+ binaryheap_add_unordered (scan_state -> ms_heap ,Int32GetDatum (i ));
326
+ }
327
+ binaryheap_build (scan_state -> ms_heap );
328
+ scan_state -> ms_initialized = true;
329
+ }
330
+ else
317
331
{
318
- ChildScanCommon child = rstate -> cur_plans [rstate -> running_idx ];
319
- PlanState * state = child -> content .plan_state ;
320
- TupleTableSlot * slot = NULL ;
321
- bool quals ;
332
+ i = DatumGetInt32 (binaryheap_first (scan_state -> ms_heap ));
333
+ ps = scan_state -> rstate .cur_plans [i ]-> content .plan_state ;
322
334
323
335
for (;;)
324
336
{
325
- slot = ExecProcNode ( state ) ;
337
+ bool quals ;
326
338
327
- if (TupIsNull (slot ))
339
+ scan_state -> ms_slots [i ]= ExecProcNode (ps );
340
+
341
+ if (TupIsNull (scan_state -> ms_slots [i ]))
342
+ {
343
+ (void )binaryheap_remove_first (scan_state -> ms_heap );
328
344
break ;
345
+ }
329
346
330
- node -> ss .ps .ps_ExprContext -> ecxt_scantuple = slot ;
347
+ node -> ss .ps .ps_ExprContext -> ecxt_scantuple = scan_state -> ms_slots [ i ] ;
331
348
quals = ExecQual (rstate -> custom_expr_states ,
332
349
node -> ss .ps .ps_ExprContext , false);
333
350
334
351
if (quals )
335
- return slot ;
352
+ {
353
+ binaryheap_replace_first (scan_state -> ms_heap ,Int32GetDatum (i ));
354
+ break ;
355
+ }
336
356
}
337
-
338
- rstate -> running_idx ++ ;
339
357
}
340
358
341
- return NULL ;
359
+ if (binaryheap_empty (scan_state -> ms_heap ))
360
+ {
361
+ /* All the subplans are exhausted, and so is the heap */
362
+ return NULL ;
363
+ }
364
+ else
365
+ {
366
+ i = DatumGetInt32 (binaryheap_first (scan_state -> ms_heap ));
367
+ return scan_state -> ms_slots [i ];
368
+ }
342
369
}
343
370
344
371
void
@@ -347,17 +374,22 @@ arrangeappend_end(CustomScanState *node)
347
374
ArrangeAppendState * scan_state = (ArrangeAppendState * )node ;
348
375
349
376
end_append_common (node );
377
+
378
+ if (scan_state -> ms_heap )
379
+ binaryheap_free (scan_state -> ms_heap );
350
380
}
351
381
352
382
void
353
383
arrangeappend_rescan (CustomScanState * node )
354
384
{
355
385
ArrangeAppendState * scan_state = (ArrangeAppendState * )node ;
356
- int nplans = scan_state -> rstate . ncur_plans ;
386
+ int nplans ;
357
387
int i ;
358
388
359
389
rescan_append_common (node );
360
390
391
+ nplans = scan_state -> rstate .ncur_plans ;
392
+
361
393
scan_state -> ms_slots = (TupleTableSlot * * )palloc0 (sizeof (TupleTableSlot * )* nplans );
362
394
scan_state -> ms_heap = binaryheap_allocate (nplans ,heap_compare_slots ,scan_state );
363
395
@@ -387,6 +419,9 @@ arrangeappend_rescan(CustomScanState *node)
387
419
388
420
PrepareSortSupportFromOrderingOp (scan_state -> sortOperators [i ],sortKey );
389
421
}
422
+
423
+ binaryheap_reset (scan_state -> ms_heap );
424
+ scan_state -> ms_initialized = false;
390
425
}
391
426
392
427
void