@@ -512,7 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512
512
if (failures_count ++ >=PART_WORKER_MAX_ATTEMPTS )
513
513
{
514
514
/* Mark slot as FREE */
515
- cps_set_status (part_slot ,WS_FREE );
515
+ cps_set_status (part_slot ,CPS_FREE );
516
516
517
517
elog (LOG ,
518
518
"Concurrent partitioning worker has canceled the task because "
@@ -555,7 +555,7 @@ bgw_main_concurrent_part(Datum main_arg)
555
555
}
556
556
557
557
/* If other backend requested to stop us, quit */
558
- if (cps_check_status (part_slot )== WS_STOPPING )
558
+ if (cps_check_status (part_slot )== CPS_STOPPING )
559
559
break ;
560
560
}
561
561
while (rows > 0 || failed );/* do while there's still rows to be relocated */
@@ -564,7 +564,7 @@ bgw_main_concurrent_part(Datum main_arg)
564
564
pfree (sql );
565
565
566
566
/* Mark slot as FREE */
567
- cps_set_status (part_slot ,WS_FREE );
567
+ cps_set_status (part_slot ,CPS_FREE );
568
568
}
569
569
570
570
@@ -603,7 +603,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
603
603
604
604
SpinLockAcquire (& cur_slot -> mutex );
605
605
606
- if (empty_slot_idx < 0 )
606
+ /* Should we take this slot into account? */
607
+ if (empty_slot_idx < 0 && cur_slot -> worker_status == CPS_FREE )
607
608
{
608
609
empty_slot_idx = i ;
609
610
keep_this_lock = true;
@@ -630,7 +631,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
630
631
{
631
632
/* Initialize concurrent part slot */
632
633
InitConcurrentPartSlot (& concurrent_part_slots [empty_slot_idx ],
633
- GetAuthenticatedUserId (),WS_WORKING ,
634
+ GetAuthenticatedUserId (),CPS_WORKING ,
634
635
MyDatabaseId ,relid ,1000 ,1.0 );
635
636
636
637
SpinLockRelease (& concurrent_part_slots [empty_slot_idx ].mutex );
@@ -707,12 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
707
708
for (i = userctx -> cur_idx ;i < PART_WORKER_SLOTS ;i ++ )
708
709
{
709
710
ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
711
+ HeapTuple htup = NULL ;
710
712
713
+ HOLD_INTERRUPTS ();
711
714
SpinLockAcquire (& cur_slot -> mutex );
712
715
713
- if (cur_slot -> worker_status != WS_FREE )
716
+ if (cur_slot -> worker_status != CPS_FREE )
714
717
{
715
- HeapTuple tuple ;
716
718
Datum values [Natts_pathman_cp_tasks ];
717
719
bool isnull [Natts_pathman_cp_tasks ]= {0 };
718
720
@@ -725,12 +727,12 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
725
727
/* Now build a status string */
726
728
switch (cur_slot -> worker_status )
727
729
{
728
- case WS_WORKING :
730
+ case CPS_WORKING :
729
731
values [Anum_pathman_cp_tasks_status - 1 ]=
730
732
PointerGetDatum (cstring_to_text ("working" ));
731
733
break ;
732
734
733
- case WS_STOPPING :
735
+ case CPS_STOPPING :
734
736
values [Anum_pathman_cp_tasks_status - 1 ]=
735
737
PointerGetDatum (cstring_to_text ("stopping" ));
736
738
break ;
@@ -741,15 +743,18 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
741
743
}
742
744
743
745
/* Form output tuple */
744
- tuple = heap_form_tuple (funcctx -> tuple_desc ,values ,isnull );
746
+ htup = heap_form_tuple (funcctx -> tuple_desc ,values ,isnull );
745
747
746
748
/* Switch to next worker */
747
749
userctx -> cur_idx = i + 1 ;
748
-
749
- SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (tuple ));
750
750
}
751
751
752
752
SpinLockRelease (& cur_slot -> mutex );
753
+ RESUME_INTERRUPTS ();
754
+
755
+ /* Return tuple if needed */
756
+ if (htup )
757
+ SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (htup ));
753
758
}
754
759
755
760
SRF_RETURN_DONE (funcctx );
@@ -770,19 +775,22 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
770
775
{
771
776
ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
772
777
778
+ HOLD_INTERRUPTS ();
773
779
SpinLockAcquire (& cur_slot -> mutex );
774
780
775
- if (cur_slot -> worker_status != WS_FREE &&
781
+ if (cur_slot -> worker_status != CPS_FREE &&
776
782
cur_slot -> relid == relid &&
777
783
cur_slot -> dbid == MyDatabaseId )
778
784
{
779
785
elog (NOTICE ,"Worker will stop after it finishes current batch" );
780
786
781
- cur_slot -> worker_status = WS_STOPPING ;
787
+ /* Change worker's state & set 'worker_found' */
788
+ cur_slot -> worker_status = CPS_STOPPING ;
782
789
worker_found = true;
783
790
}
784
791
785
792
SpinLockRelease (& cur_slot -> mutex );
793
+ RESUME_INTERRUPTS ();
786
794
}
787
795
788
796
if (worker_found )