@@ -95,12 +95,19 @@ init_concurrent_part_task_slots(void)
95
95
{
96
96
bool found ;
97
97
Size size = estimate_concurrent_part_task_slots_size ();
98
+ int i ;
98
99
99
100
concurrent_part_slots = (ConcurrentPartSlot * )
100
101
ShmemInitStruct ("array of ConcurrentPartSlots" ,size ,& found );
101
102
102
103
/* Initialize 'concurrent_part_slots' if needed */
103
- if (!found )memset (concurrent_part_slots ,0 ,size );
104
+ if (!found )
105
+ {
106
+ memset (concurrent_part_slots ,0 ,size );
107
+
108
+ for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
109
+ pg_atomic_init_flag_impl (& concurrent_part_slots [i ].slot_used );
110
+ }
104
111
}
105
112
106
113
@@ -423,9 +430,9 @@ bgw_main_concurrent_part(Datum main_arg)
423
430
{
424
431
MemoryContext old_mcxt ;
425
432
426
- Oid types [2 ]= {OIDOID ,INT4OID };
427
- Datum vals [2 ]= {part_slot -> relid ,part_slot -> batch_size };
428
- bool nulls [2 ]= { false,false };
433
+ Oid types [2 ]= {OIDOID ,INT4OID };
434
+ Datum vals [2 ]= {part_slot -> relid ,part_slot -> batch_size };
435
+ bool nulls [2 ]= { false,false };
429
436
430
437
/* Reset loop variables */
431
438
failed = false;
@@ -506,6 +513,7 @@ bgw_main_concurrent_part(Datum main_arg)
506
513
{
507
514
/* Mark slot as FREE */
508
515
part_slot -> worker_status = WS_FREE ;
516
+ pg_atomic_clear_flag (& part_slot -> slot_used );
509
517
510
518
elog (LOG ,
511
519
"Concurrent partitioning worker has canceled the task because "
@@ -561,7 +569,10 @@ bgw_main_concurrent_part(Datum main_arg)
561
569
562
570
/* Reclaim the resources */
563
571
pfree (sql );
572
+
573
+ /* Set slot free */
564
574
part_slot -> worker_status = WS_FREE ;
575
+ pg_atomic_clear_flag (& part_slot -> slot_used );
565
576
}
566
577
567
578
@@ -596,16 +607,24 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
596
607
*/
597
608
for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
598
609
{
599
- if (concurrent_part_slots [i ].worker_status == WS_FREE )
610
+ /*
611
+ * Attempt to acquire the flag. If it has alread been used then skip
612
+ * this slot and try another one
613
+ */
614
+ if (!pg_atomic_test_set_flag (& concurrent_part_slots [i ].slot_used ))
615
+ continue ;
616
+
617
+ /* If atomic flag wasn't used then status should be WS_FREE */
618
+ Assert (concurrent_part_slots [i ].worker_status == WS_FREE );
619
+
620
+ if (empty_slot_idx < 0 )
600
621
{
601
- if (empty_slot_idx < 0 )
602
- {
603
- my_slot = & concurrent_part_slots [i ];
604
- empty_slot_idx = i ;
605
- }
622
+ my_slot = & concurrent_part_slots [i ];
623
+ empty_slot_idx = i ;
606
624
}
607
- else if (concurrent_part_slots [i ].relid == relid &&
608
- concurrent_part_slots [i ].dbid == MyDatabaseId )
625
+
626
+ if (concurrent_part_slots [i ].relid == relid &&
627
+ concurrent_part_slots [i ].dbid == MyDatabaseId )
609
628
{
610
629
elog (ERROR ,
611
630
"Table \"%s\" is already being partitioned" ,
@@ -745,13 +764,16 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
745
764
{
746
765
Oid relid = PG_GETARG_OID (0 );
747
766
int i ;
767
+ ConcurrentPartSlot * slot ;
748
768
749
769
for (i = 0 ;i < PART_WORKER_SLOTS ;i ++ )
750
- if (concurrent_part_slots [i ].worker_status != WS_FREE &&
751
- concurrent_part_slots [i ].relid == relid &&
752
- concurrent_part_slots [i ].dbid == MyDatabaseId )
770
+ slot = & concurrent_part_slots [i ];
771
+
772
+ if (slot -> worker_status != WS_FREE &&
773
+ slot -> relid == relid &&
774
+ slot -> dbid == MyDatabaseId )
753
775
{
754
- concurrent_part_slots [ i ]. worker_status = WS_STOPPING ;
776
+ slot -> worker_status = WS_STOPPING ;
755
777
elog (NOTICE ,"Worker will stop after it finishes current batch" );
756
778
757
779
PG_RETURN_BOOL (true);