@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443void
444444bgw_main_concurrent_part (Datum main_arg )
445445{
446- int rows ;
446+ ConcurrentPartSlot * part_slot ;
447+ char * sql = NULL ;
448+ int64 rows ;
447449bool failed ;
448450int failures_count = 0 ;
449- char * sql = NULL ;
450- ConcurrentPartSlot * part_slot ;
451+ LOCKMODE lockmode = RowExclusiveLock ;
451452
452453/* Update concurrent part slot */
453454part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480/* Do the job */
480481do
481482{
482- MemoryContext old_mcxt ;
483+ MemoryContext old_mcxt ;
483484
484485Oid types [2 ]= {OIDOID ,INT4OID };
485486Datum vals [2 ]= {part_slot -> relid ,part_slot -> batch_size };
486487bool nulls [2 ]= { false,false };
487488
489+ bool rel_locked = false;
490+
488491/* Reset loop variables */
489492failed = false;
490493rows = 0 ;
@@ -520,66 +523,92 @@ bgw_main_concurrent_part(Datum main_arg)
520523/* Exec ret = _partition_data_concurrent() */
521524PG_TRY ();
522525{
523- /* Make sure that relation exists and has partitions */
524- if (SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (part_slot -> relid ))&&
525- get_pathman_relation_info (part_slot -> relid )!= NULL )
526- {
527- int ret ;
528- bool isnull ;
526+ int ret ;
527+ bool isnull ;
529528
530- ret = SPI_execute_with_args ( sql , 2 , types , vals , nulls , false, 0 );
531- if (ret == SPI_OK_SELECT )
532- {
533- TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
534- HeapTuple tuple = SPI_tuptable -> vals [ 0 ];
529+ /* Lock relation for DELETE and INSERT */
530+ if (! ConditionalLockRelationOid ( part_slot -> relid , lockmode ) )
531+ {
532+ elog ( ERROR , "could not take lock on relation %u" , part_slot -> relid ) ;
533+ }
535534
536- Assert (SPI_processed == 1 );/* there should be 1 result at most */
535+ /* Great, now relation is locked */
536+ rel_locked = true;
537+ (void )rel_locked ;/* mute clang analyzer */
537538
538- rows = DatumGetInt32 (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
539+ /* Make sure that relation exists */
540+ if (!SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (part_slot -> relid )))
541+ {
542+ /* Exit after we raise ERROR */
543+ failures_count = PART_WORKER_MAX_ATTEMPTS ;
544+ (void )failures_count ;/* mute clang analyzer */
539545
540- Assert (!isnull );/* ... and ofc it must not be NULL */
541- }
546+ elog (ERROR ,"relation %u does not exist" ,part_slot -> relid );
542547}
543- /* Otherwise it's time to exit */
544- else
548+
549+ /* Make sure that relation has partitions */
550+ if (get_pathman_relation_info (part_slot -> relid )== NULL )
545551{
552+ /* Exit after we raise ERROR */
546553failures_count = PART_WORKER_MAX_ATTEMPTS ;
554+ (void )failures_count ;/* mute clang analyzer */
555+
556+ elog (ERROR ,"relation \"%s\" is not partitioned" ,
557+ get_rel_name (part_slot -> relid ));
558+ }
559+
560+ /* Call concurrent partitioning function */
561+ ret = SPI_execute_with_args (sql ,2 ,types ,vals ,nulls , false,0 );
562+ if (ret == SPI_OK_SELECT )
563+ {
564+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
565+ HeapTuple tuple = SPI_tuptable -> vals [0 ];
547566
548- elog (LOG ,"relation \"%u\" is not partitioned (or does not exist)" ,
549- part_slot -> relid );
567+ /* There should be 1 result at most */
568+ Assert (SPI_processed == 1 );
569+
570+ /* Extract number of processed rows */
571+ rows = DatumGetInt64 (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
572+ Assert (!isnull );/* ... and ofc it must not be NULL */
550573}
574+ /* Else raise generic error */
575+ else elog (ERROR ,"partitioning function returned %u" ,ret );
576+
577+ /* Finally, unlock our partitioned table */
578+ UnlockRelationOid (part_slot -> relid ,lockmode );
551579}
552580PG_CATCH ();
553581{
554582/*
555583 * The most common exception we can catch here is a deadlock with
556584 * concurrent user queries. Check that attempts count doesn't exceed
557- * some reasonable value
585+ * some reasonable value.
558586 */
559- ErrorData * error ;
560- char * sleep_time_str ;
587+ ErrorData * error ;
588+
589+ /* Unlock relation if we caught ERROR too early */
590+ if (rel_locked )
591+ UnlockRelationOid (part_slot -> relid ,lockmode );
592+
593+ /* Increase number of failures and set 'failed' status */
594+ failures_count ++ ;
595+ failed = true;
561596
562597/* Switch to the original context & copy edata */
563598MemoryContextSwitchTo (old_mcxt );
564599error = CopyErrorData ();
565600FlushErrorState ();
566601
567602/* Print messsage for this BGWorker to server log */
568- sleep_time_str = datum_to_cstring (Float8GetDatum (part_slot -> sleep_time ),
569- FLOAT8OID );
570- failures_count ++ ;
571603ereport (LOG ,
572604(errmsg ("%s: %s" ,concurrent_part_bgw ,error -> message ),
573- errdetail ("attempt: %d/%d, sleep time: %s " ,
605+ errdetail ("attempt: %d/%d, sleep time: %.2f " ,
574606failures_count ,
575607PART_WORKER_MAX_ATTEMPTS ,
576- sleep_time_str )));
577- pfree (sleep_time_str );/* free the time string */
608+ (float )part_slot -> sleep_time )));
578609
610+ /* Finally, free error data */
579611FreeErrorData (error );
580-
581- /* Set 'failed' flag */
582- failed = true;
583612}
584613PG_END_TRY ();
585614
@@ -606,9 +635,10 @@ bgw_main_concurrent_part(Datum main_arg)
606635/* Failed this time, wait */
607636else if (failed )
608637{
609- /* Abort transactionand sleep for a second */
638+ /* Abort transaction */
610639AbortCurrentTransaction ();
611640
641+ /* Sleep for a specified amount of time (default 1s) */
612642DirectFunctionCall1 (pg_sleep ,Float8GetDatum (part_slot -> sleep_time ));
613643}
614644
@@ -626,8 +656,10 @@ bgw_main_concurrent_part(Datum main_arg)
626656
627657#ifdef USE_ASSERT_CHECKING
628658/* Report debug message */
629- elog (DEBUG1 ,"%s: relocated %d rows, total: " UINT64_FORMAT " [%u]" ,
630- concurrent_part_bgw ,rows ,part_slot -> total_rows ,MyProcPid );
659+ elog (DEBUG1 ,"%s: "
660+ "relocated" INT64_FORMAT "rows, "
661+ "total: " INT64_FORMAT ,
662+ concurrent_part_bgw ,rows ,part_slot -> total_rows );
631663#endif
632664}
633665
@@ -636,9 +668,6 @@ bgw_main_concurrent_part(Datum main_arg)
636668break ;
637669}
638670while (rows > 0 || failed );/* do while there's still rows to be relocated */
639-
640- /* Reclaim the resources */
641- pfree (sql );
642671}
643672
644673
@@ -824,26 +853,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824853/* Iterate through worker slots */
825854for (i = userctx -> cur_idx ;i < PART_WORKER_SLOTS ;i ++ )
826855{
827- ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
856+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ],
857+ slot_copy ;
828858HeapTuple htup = NULL ;
829859
830- HOLD_INTERRUPTS ();
860+ /* Copy slot to process local memory */
831861SpinLockAcquire (& cur_slot -> mutex );
862+ memcpy (& slot_copy ,cur_slot ,sizeof (ConcurrentPartSlot ));
863+ SpinLockRelease (& cur_slot -> mutex );
832864
833- if (cur_slot -> worker_status != CPS_FREE )
865+ if (slot_copy . worker_status != CPS_FREE )
834866{
835867Datum values [Natts_pathman_cp_tasks ];
836868bool isnull [Natts_pathman_cp_tasks ]= {0 };
837869
838- values [Anum_pathman_cp_tasks_userid - 1 ]= cur_slot -> userid ;
839- values [Anum_pathman_cp_tasks_pid - 1 ]= cur_slot -> pid ;
840- values [Anum_pathman_cp_tasks_dbid - 1 ]= cur_slot -> dbid ;
841- values [Anum_pathman_cp_tasks_relid - 1 ]= cur_slot -> relid ;
842- values [Anum_pathman_cp_tasks_processed - 1 ]= cur_slot -> total_rows ;
870+ values [Anum_pathman_cp_tasks_userid - 1 ]= slot_copy .userid ;
871+ values [Anum_pathman_cp_tasks_pid - 1 ]= slot_copy .pid ;
872+ values [Anum_pathman_cp_tasks_dbid - 1 ]= slot_copy .dbid ;
873+ values [Anum_pathman_cp_tasks_relid - 1 ]= slot_copy .relid ;
874+
875+ /* Record processed rows */
876+ values [Anum_pathman_cp_tasks_processed - 1 ]=
877+ /* FIXME: use Int64GetDatum() in release 1.5 */
878+ Int32GetDatum ((int32 )slot_copy .total_rows );
843879
844880/* Now build a status string */
845881values [Anum_pathman_cp_tasks_status - 1 ]=
846- CStringGetTextDatum (cps_print_status (cur_slot -> worker_status ));
882+ CStringGetTextDatum (cps_print_status (slot_copy . worker_status ));
847883
848884/* Form output tuple */
849885htup = heap_form_tuple (funcctx -> tuple_desc ,values ,isnull );
@@ -852,9 +888,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852888userctx -> cur_idx = i + 1 ;
853889}
854890
855- SpinLockRelease (& cur_slot -> mutex );
856- RESUME_INTERRUPTS ();
857-
858891/* Return tuple if needed */
859892if (htup )
860893SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (htup ));