@@ -106,7 +106,7 @@ init_concurrent_part_task_slots(void)
106
106
memset (concurrent_part_slots ,0 ,size );
107
107
108
108
for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
109
- pg_atomic_init_flag_impl (& concurrent_part_slots [i ].slot_used );
109
+ SpinLockInit (& concurrent_part_slots [i ].mutex );
110
110
}
111
111
}
112
112
@@ -235,10 +235,10 @@ start_bg_worker(const char bgworker_name[BGW_MAXLEN],
235
235
static dsm_segment *
236
236
create_partitions_bg_worker_segment (Oid relid ,Datum value ,Oid value_type )
237
237
{
238
- TypeCacheEntry * typcache ;
239
- Size datum_size ;
240
- Size segment_size ;
241
- dsm_segment * segment ;
238
+ TypeCacheEntry * typcache ;
239
+ Size datum_size ;
240
+ Size segment_size ;
241
+ dsm_segment * segment ;
242
242
SpawnPartitionArgs * args ;
243
243
244
244
typcache = lookup_type_cache (value_type ,0 );
@@ -314,10 +314,10 @@ create_partitions_bg_worker(Oid relid, Datum value, Oid value_type)
314
314
static void
315
315
bgw_main_spawn_partitions (Datum main_arg )
316
316
{
317
- dsm_handle handle = DatumGetUInt32 (main_arg );
318
- dsm_segment * segment ;
319
- SpawnPartitionArgs * args ;
320
- Datum value ;
317
+ dsm_handle handle = DatumGetUInt32 (main_arg );
318
+ dsm_segment * segment ;
319
+ SpawnPartitionArgs * args ;
320
+ Datum value ;
321
321
322
322
/* Establish signal handlers before unblocking signals. */
323
323
pqsignal (SIGTERM ,handle_sigterm );
@@ -512,8 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512
512
if (failures_count ++ >=PART_WORKER_MAX_ATTEMPTS )
513
513
{
514
514
/* Mark slot as FREE */
515
- part_slot -> worker_status = WS_FREE ;
516
- pg_atomic_clear_flag (& part_slot -> slot_used );
515
+ cps_set_status (part_slot ,WS_FREE );
517
516
518
517
elog (LOG ,
519
518
"Concurrent partitioning worker has canceled the task because "
@@ -534,14 +533,6 @@ bgw_main_concurrent_part(Datum main_arg)
534
533
535
534
if (failed )
536
535
{
537
- #ifdef USE_ASSERT_CHECKING
538
- elog (DEBUG1 ,"%s: could not relocate batch (%d/%d), total: %lu [%u]" ,
539
- concurrent_part_bgw ,
540
- failures_count ,PART_WORKER_MAX_ATTEMPTS ,/* current/max */
541
- part_slot -> total_rows ,
542
- MyProcPid );
543
- #endif
544
-
545
536
/* Abort transaction and sleep for a second */
546
537
AbortCurrentTransaction ();
547
538
DirectFunctionCall1 (pg_sleep ,Float8GetDatum (part_slot -> sleep_time ));
@@ -553,26 +544,27 @@ bgw_main_concurrent_part(Datum main_arg)
553
544
failures_count = 0 ;
554
545
555
546
/* Add rows to total_rows */
547
+ SpinLockAcquire (& part_slot -> mutex );
556
548
part_slot -> total_rows += rows ;
557
-
549
+ /* Report debug message */
558
550
#ifdef USE_ASSERT_CHECKING
559
551
elog (DEBUG1 ,"%s: relocated %d rows, total: %lu [%u]" ,
560
552
concurrent_part_bgw ,rows ,part_slot -> total_rows ,MyProcPid );
561
553
#endif
554
+ SpinLockRelease (& part_slot -> mutex );
562
555
}
563
556
564
557
/* If other backend requested to stop us, quit */
565
- if (part_slot -> worker_status == WS_STOPPING )
558
+ if (cps_check_status ( part_slot ) == WS_STOPPING )
566
559
break ;
567
560
}
568
561
while (rows > 0 || failed );/* do while there's still rows to be relocated */
569
562
570
563
/* Reclaim the resources */
571
564
pfree (sql );
572
565
573
- /* Set slot free */
574
- part_slot -> worker_status = WS_FREE ;
575
- pg_atomic_clear_flag (& part_slot -> slot_used );
566
+ /* Mark slot as FREE */
567
+ cps_set_status (part_slot ,WS_FREE );
576
568
}
577
569
578
570
@@ -589,12 +581,11 @@ bgw_main_concurrent_part(Datum main_arg)
589
581
Datum
590
582
partition_table_concurrently (PG_FUNCTION_ARGS )
591
583
{
592
- #define tostr (str ) ( #str )
584
+ #define tostr (str ) ( #str )/* convert function's name to literal */
593
585
594
- Oid relid = PG_GETARG_OID (0 );
595
- ConcurrentPartSlot * my_slot = NULL ;
596
- int empty_slot_idx = -1 ;
597
- int i ;
586
+ Oid relid = PG_GETARG_OID (0 );
587
+ int empty_slot_idx = -1 ;
588
+ int i ;
598
589
599
590
/* Check if relation is a partitioned table */
600
591
shout_if_prel_is_invalid (relid ,
@@ -607,38 +598,43 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
607
598
*/
608
599
for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
609
600
{
610
- /*
611
- * Attempt to acquire the flag. If it has alread been used then skip
612
- * this slot and try another one
613
- */
614
- if (!pg_atomic_test_set_flag (& concurrent_part_slots [i ].slot_used ))
615
- continue ;
601
+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
602
+ bool keep_this_lock = false;
616
603
617
- /* If atomic flag wasn't used then status should be WS_FREE */
618
- Assert (concurrent_part_slots [i ].worker_status == WS_FREE );
604
+ SpinLockAcquire (& cur_slot -> mutex );
619
605
620
606
if (empty_slot_idx < 0 )
621
607
{
622
- my_slot = & concurrent_part_slots [i ];
623
608
empty_slot_idx = i ;
609
+ keep_this_lock = true;
624
610
}
625
611
626
- if (concurrent_part_slots [ i ]. relid == relid &&
627
- concurrent_part_slots [ i ]. dbid == MyDatabaseId )
612
+ if (cur_slot -> relid == relid &&
613
+ cur_slot -> dbid == MyDatabaseId )
628
614
{
615
+ if (empty_slot_idx >=0 )
616
+ SpinLockRelease (& cur_slot -> mutex );
617
+
629
618
elog (ERROR ,
630
619
"Table \"%s\" is already being partitioned" ,
631
620
get_rel_name (relid ));
632
621
}
622
+
623
+ if (!keep_this_lock )
624
+ SpinLockRelease (& cur_slot -> mutex );
633
625
}
634
626
635
- if (my_slot == NULL )
627
+ if (empty_slot_idx < 0 )
636
628
elog (ERROR ,"No empty worker slots found" );
629
+ else
630
+ {
631
+ /* Initialize concurrent part slot */
632
+ InitConcurrentPartSlot (& concurrent_part_slots [empty_slot_idx ],
633
+ GetAuthenticatedUserId (),WS_WORKING ,
634
+ MyDatabaseId ,relid ,1000 ,1.0 );
637
635
638
- /* Initialize concurrent part slot */
639
- InitConcurrentPartSlot (my_slot ,GetAuthenticatedUserId (),
640
- WS_WORKING ,MyDatabaseId ,relid ,
641
- 1000 ,1.0 );
636
+ SpinLockRelease (& concurrent_part_slots [empty_slot_idx ].mutex );
637
+ }
642
638
643
639
/* Start worker (we should not wait) */
644
640
start_bg_worker (concurrent_part_bgw ,
@@ -712,11 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
712
708
{
713
709
ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
714
710
711
+ SpinLockAcquire (& cur_slot -> mutex );
712
+
715
713
if (cur_slot -> worker_status != WS_FREE )
716
714
{
717
715
HeapTuple tuple ;
718
716
Datum values [Natts_pathman_cp_tasks ];
719
- bool isnull [Natts_pathman_cp_tasks ]= {0 , 0 , 0 , 0 , 0 , 0 };
717
+ bool isnull [Natts_pathman_cp_tasks ]= {0 };
720
718
721
719
values [Anum_pathman_cp_tasks_userid - 1 ]= cur_slot -> userid ;
722
720
values [Anum_pathman_cp_tasks_pid - 1 ]= cur_slot -> pid ;
@@ -750,6 +748,8 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
750
748
751
749
SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (tuple ));
752
750
}
751
+
752
+ SpinLockRelease (& cur_slot -> mutex );
753
753
}
754
754
755
755
SRF_RETURN_DONE (funcctx );
@@ -763,22 +763,35 @@ Datum
763
763
stop_concurrent_part_task (PG_FUNCTION_ARGS )
764
764
{
765
765
Oid relid = PG_GETARG_OID (0 );
766
+ bool worker_found = false;
766
767
int i ;
767
- ConcurrentPartSlot * slot ;
768
768
769
- for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
770
- slot = & concurrent_part_slots [i ];
769
+ for (i = 0 ;i < PART_WORKER_SLOTS && !worker_found ;i ++ )
770
+ {
771
+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
772
+
773
+ SpinLockAcquire (& cur_slot -> mutex );
771
774
772
- if (slot -> worker_status != WS_FREE &&
773
- slot -> relid == relid &&
774
- slot -> dbid == MyDatabaseId )
775
+ if (cur_slot -> worker_status != WS_FREE &&
776
+ cur_slot -> relid == relid &&
777
+ cur_slot -> dbid == MyDatabaseId )
775
778
{
776
- slot -> worker_status = WS_STOPPING ;
777
779
elog (NOTICE ,"Worker will stop after it finishes current batch" );
778
780
779
- PG_RETURN_BOOL (true);
781
+ cur_slot -> worker_status = WS_STOPPING ;
782
+ worker_found = true;
780
783
}
781
784
782
- elog (ERROR ,"Cannot find worker for relation \"%s\"" ,
783
- get_rel_name_or_relid (relid ));
785
+ SpinLockRelease (& cur_slot -> mutex );
786
+ }
787
+
788
+ if (worker_found )
789
+ PG_RETURN_BOOL (true);
790
+ else
791
+ {
792
+ elog (ERROR ,"Cannot find worker for relation \"%s\"" ,
793
+ get_rel_name_or_relid (relid ));
794
+
795
+ PG_RETURN_BOOL (false);/* keep compiler happy */
796
+ }
784
797
}