@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443void
444444bgw_main_concurrent_part (Datum main_arg )
445445{
446- int rows ;
446+ ConcurrentPartSlot * part_slot ;
447+ char * sql = NULL ;
448+ int64 rows ;
447449bool failed ;
448450int failures_count = 0 ;
449- char * sql = NULL ;
450- ConcurrentPartSlot * part_slot ;
451+ LOCKMODE lockmode = RowExclusiveLock ;
451452
452453/* Update concurrent part slot */
453454part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480/* Do the job */
480481do
481482{
482- MemoryContext old_mcxt ;
483+ MemoryContext old_mcxt ;
483484
484485Oid types [2 ]= {OIDOID ,INT4OID };
485486Datum vals [2 ]= {part_slot -> relid ,part_slot -> batch_size };
486487bool nulls [2 ]= { false,false };
487488
489+ bool rel_locked = false;
490+
488491/* Reset loop variables */
489492failed = false;
490493rows = 0 ;
@@ -520,66 +523,89 @@ bgw_main_concurrent_part(Datum main_arg)
520523/* Exec ret = _partition_data_concurrent() */
521524PG_TRY ();
522525{
523- /* Make sure that relation exists and has partitions */
524- if (SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (part_slot -> relid ))&&
525- get_pathman_relation_info (part_slot -> relid )!= NULL )
526- {
527- int ret ;
528- bool isnull ;
526+ int ret ;
527+ bool isnull ;
529528
530- ret = SPI_execute_with_args ( sql , 2 , types , vals , nulls , false, 0 );
531- if (ret == SPI_OK_SELECT )
532- {
533- TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
534- HeapTuple tuple = SPI_tuptable -> vals [ 0 ];
529+ /* Lock relation for DELETE and INSERT */
530+ if (! ConditionalLockRelationOid ( part_slot -> relid , lockmode ) )
531+ {
532+ elog ( ERROR , "could not take lock on relation %u" , part_slot -> relid ) ;
533+ }
535534
536- Assert (SPI_processed == 1 );/* there should be 1 result at most */
535+ /* Great, now relation is locked */
536+ rel_locked = true;
537537
538- rows = DatumGetInt32 (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
538+ /* Make sure that relation exists */
539+ if (!SearchSysCacheExists1 (RELOID ,ObjectIdGetDatum (part_slot -> relid )))
540+ {
541+ /* Exit after we raise ERROR */
542+ failures_count = PART_WORKER_MAX_ATTEMPTS ;
539543
540- Assert (!isnull );/* ... and ofc it must not be NULL */
541- }
544+ elog (ERROR ,"relation %u does not exist" ,part_slot -> relid );
542545}
543- /* Otherwise it's time to exit */
544- else
546+
547+ /* Make sure that relation has partitions */
548+ if (get_pathman_relation_info (part_slot -> relid )== NULL )
545549{
550+ /* Exit after we raise ERROR */
546551failures_count = PART_WORKER_MAX_ATTEMPTS ;
547552
548- elog (LOG ,"relation \"%u\" is not partitioned (or does not exist)" ,
549- part_slot -> relid );
553+ elog (ERROR ,"relation \"%s\" is not partitioned" ,
554+ get_rel_name (part_slot -> relid ));
555+ }
556+
557+ /* Call concurrent partitioning function */
558+ ret = SPI_execute_with_args (sql ,2 ,types ,vals ,nulls , false,0 );
559+ if (ret == SPI_OK_SELECT )
560+ {
561+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
562+ HeapTuple tuple = SPI_tuptable -> vals [0 ];
563+
564+ /* There should be 1 result at most */
565+ Assert (SPI_processed == 1 );
566+
567+ /* Extract number of processed rows */
568+ rows = DatumGetInt64 (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
569+ Assert (!isnull );/* ... and ofc it must not be NULL */
550570}
571+ /* Else raise generic error */
572+ else elog (ERROR ,"partitioning function returned %u" ,ret );
573+
574+ /* Finally, unlock our partitioned table */
575+ UnlockRelationOid (part_slot -> relid ,lockmode );
551576}
552577PG_CATCH ();
553578{
554579/*
555580 * The most common exception we can catch here is a deadlock with
556581 * concurrent user queries. Check that attempts count doesn't exceed
557- * some reasonable value
582+ * some reasonable value.
558583 */
559- ErrorData * error ;
560- char * sleep_time_str ;
584+ ErrorData * error ;
585+
586+ /* Unlock relation if we caught ERROR too early */
587+ if (rel_locked )
588+ UnlockRelationOid (part_slot -> relid ,lockmode );
589+
590+ /* Increase number of failures and set 'failed' status */
591+ failures_count ++ ;
592+ failed = true;
561593
562594/* Switch to the original context & copy edata */
563595MemoryContextSwitchTo (old_mcxt );
564596error = CopyErrorData ();
565597FlushErrorState ();
566598
567599/* Print messsage for this BGWorker to server log */
568- sleep_time_str = datum_to_cstring (Float8GetDatum (part_slot -> sleep_time ),
569- FLOAT8OID );
570- failures_count ++ ;
571600ereport (LOG ,
572601(errmsg ("%s: %s" ,concurrent_part_bgw ,error -> message ),
573- errdetail ("attempt: %d/%d, sleep time: %s " ,
602+ errdetail ("attempt: %d/%d, sleep time: %.2f " ,
574603failures_count ,
575604PART_WORKER_MAX_ATTEMPTS ,
576- sleep_time_str )));
577- pfree (sleep_time_str );/* free the time string */
605+ (float )part_slot -> sleep_time )));
578606
607+ /* Finally, free error data */
579608FreeErrorData (error );
580-
581- /* Set 'failed' flag */
582- failed = true;
583609}
584610PG_END_TRY ();
585611
@@ -606,9 +632,10 @@ bgw_main_concurrent_part(Datum main_arg)
606632/* Failed this time, wait */
607633else if (failed )
608634{
609- /* Abort transactionand sleep for a second */
635+ /* Abort transaction */
610636AbortCurrentTransaction ();
611637
638+ /* Sleep for a specified amount of time (default 1s) */
612639DirectFunctionCall1 (pg_sleep ,Float8GetDatum (part_slot -> sleep_time ));
613640}
614641
@@ -626,8 +653,10 @@ bgw_main_concurrent_part(Datum main_arg)
626653
627654#ifdef USE_ASSERT_CHECKING
628655/* Report debug message */
629- elog (DEBUG1 ,"%s: relocated %d rows, total: " UINT64_FORMAT " [%u]" ,
630- concurrent_part_bgw ,rows ,part_slot -> total_rows ,MyProcPid );
656+ elog (DEBUG1 ,"%s: "
657+ "relocated" INT64_FORMAT "rows, "
658+ "total: " INT64_FORMAT ,
659+ concurrent_part_bgw ,rows ,part_slot -> total_rows );
631660#endif
632661}
633662
@@ -636,9 +665,6 @@ bgw_main_concurrent_part(Datum main_arg)
636665break ;
637666}
638667while (rows > 0 || failed );/* do while there's still rows to be relocated */
639-
640- /* Reclaim the resources */
641- pfree (sql );
642668}
643669
644670
@@ -824,26 +850,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824850/* Iterate through worker slots */
825851for (i = userctx -> cur_idx ;i < PART_WORKER_SLOTS ;i ++ )
826852{
827- ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
853+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ],
854+ slot_copy ;
828855HeapTuple htup = NULL ;
829856
830- HOLD_INTERRUPTS ();
857+ /* Copy slot to process local memory */
831858SpinLockAcquire (& cur_slot -> mutex );
859+ memcpy (& slot_copy ,cur_slot ,sizeof (ConcurrentPartSlot ));
860+ SpinLockRelease (& cur_slot -> mutex );
832861
833- if (cur_slot -> worker_status != CPS_FREE )
862+ if (slot_copy . worker_status != CPS_FREE )
834863{
835864Datum values [Natts_pathman_cp_tasks ];
836865bool isnull [Natts_pathman_cp_tasks ]= {0 };
837866
838- values [Anum_pathman_cp_tasks_userid - 1 ]= cur_slot -> userid ;
839- values [Anum_pathman_cp_tasks_pid - 1 ]= cur_slot -> pid ;
840- values [Anum_pathman_cp_tasks_dbid - 1 ]= cur_slot -> dbid ;
841- values [Anum_pathman_cp_tasks_relid - 1 ]= cur_slot -> relid ;
842- values [Anum_pathman_cp_tasks_processed - 1 ]= cur_slot -> total_rows ;
867+ values [Anum_pathman_cp_tasks_userid - 1 ]= slot_copy .userid ;
868+ values [Anum_pathman_cp_tasks_pid - 1 ]= slot_copy .pid ;
869+ values [Anum_pathman_cp_tasks_dbid - 1 ]= slot_copy .dbid ;
870+ values [Anum_pathman_cp_tasks_relid - 1 ]= slot_copy .relid ;
871+
872+ /* Record processed rows */
873+ values [Anum_pathman_cp_tasks_processed - 1 ]=
874+ /* FIXME: use Int64GetDatum() in release 1.5 */
875+ Int32GetDatum ((int32 )slot_copy .total_rows );
843876
844877/* Now build a status string */
845878values [Anum_pathman_cp_tasks_status - 1 ]=
846- CStringGetTextDatum (cps_print_status (cur_slot -> worker_status ));
879+ CStringGetTextDatum (cps_print_status (slot_copy . worker_status ));
847880
848881/* Form output tuple */
849882htup = heap_form_tuple (funcctx -> tuple_desc ,values ,isnull );
@@ -852,9 +885,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852885userctx -> cur_idx = i + 1 ;
853886}
854887
855- SpinLockRelease (& cur_slot -> mutex );
856- RESUME_INTERRUPTS ();
857-
858888/* Return tuple if needed */
859889if (htup )
860890SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (htup ));