18
18
#include "pathman_workers.h"
19
19
#include "relation_info.h"
20
20
#include "utils.h"
21
+ #include "xact_handling.h"
21
22
22
23
#include "access/htup_details.h"
23
24
#include "access/xact.h"
31
32
#include "storage/latch.h"
32
33
#include "utils/builtins.h"
33
34
#include "utils/datum.h"
35
+ #include "utils/memutils.h"
34
36
#include "utils/lsyscache.h"
35
37
#include "utils/typcache.h"
36
38
#include "utils/resowner.h"
@@ -351,6 +353,17 @@ bgw_main_spawn_partitions(Datum main_arg)
351
353
DebugPrintDatum (value ,args -> value_type ),MyProcPid );
352
354
#endif
353
355
356
+ /* Check again if there's a conflicting lock */
357
+ if (xact_conflicting_lock_exists (args -> partitioned_table ))
358
+ {
359
+ elog (LOG ,"%s: there's a conflicting lock on relation \"%s\"" ,
360
+ spawn_partitions_bgw ,
361
+ get_rel_name_or_relid (args -> partitioned_table ));
362
+
363
+ dsm_detach (segment );
364
+ return ;/* exit quickly */
365
+ }
366
+
354
367
/* Create partitions and save the Oid of the last one */
355
368
args -> result = create_partitions_internal (args -> partitioned_table ,
356
369
value ,/* unpacked Datum */
@@ -378,45 +391,51 @@ bgw_main_spawn_partitions(Datum main_arg)
378
391
static void
379
392
bgw_main_concurrent_part (Datum main_arg )
380
393
{
381
- ConcurrentPartSlot * args ;
382
- Oid types [2 ]= {OIDOID ,INT4OID };
383
- Datum vals [2 ];
384
- bool nulls [2 ]= { false, false };
385
394
int rows ;
386
- int slot_idx = DatumGetInt32 (main_arg );
387
- MemoryContext worker_context = CurrentMemoryContext ;
388
- int failures_count = 0 ;
389
395
bool failed ;
396
+ int failures_count = 0 ;
390
397
char * sql = NULL ;
391
-
392
- /* Create resource owner */
393
- CurrentResourceOwner = ResourceOwnerCreate (NULL ,"PartitionDataWorker" );
394
-
395
- args = & concurrent_part_slots [slot_idx ];
396
- args -> pid = MyProcPid ;
397
- vals [0 ]= args -> relid ;
398
- vals [1 ]= 10000 ;
398
+ ConcurrentPartSlot * part_slot ;
399
399
400
400
/* Establish signal handlers before unblocking signals. */
401
401
pqsignal (SIGTERM ,handle_sigterm );
402
402
403
403
/* We're now ready to receive signals */
404
404
BackgroundWorkerUnblockSignals ();
405
405
406
+ /* Create resource owner */
407
+ CurrentResourceOwner = ResourceOwnerCreate (NULL ,concurrent_part_bgw );
408
+
409
+ /* Update concurrent part slot */
410
+ part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
411
+ part_slot -> pid = MyProcPid ;
412
+
406
413
/* Establish connection and start transaction */
407
- BackgroundWorkerInitializeConnectionByOid (args -> dbid ,InvalidOid );
414
+ BackgroundWorkerInitializeConnectionByOid (part_slot -> dbid ,part_slot -> userid );
408
415
416
+ /* Initialize pg_pathman's local config */
417
+ StartTransactionCommand ();
418
+ bg_worker_load_config (concurrent_part_bgw );
419
+ CommitTransactionCommand ();
420
+
421
+ /* Do the job */
409
422
do
410
423
{
424
+ Oid types [2 ]= {OIDOID ,INT4OID };
425
+ Datum vals [2 ]= {part_slot -> relid ,part_slot -> batch_size };
426
+ bool nulls [2 ]= { false,false };
427
+
428
+ /* Reset loop variables */
411
429
failed = false;
412
430
rows = 0 ;
431
+
432
+ /* Start new transaction (syscache access etc.) */
413
433
StartTransactionCommand ();
414
- bg_worker_load_config ("PartitionDataWorker" );
415
434
416
435
SPI_connect ();
417
436
PushActiveSnapshot (GetTransactionSnapshot ());
418
437
419
- /*Do some preparation within thefirst iteration */
438
+ /*Prepare thequery if needed */
420
439
if (sql == NULL )
421
440
{
422
441
MemoryContext oldcontext ;
@@ -425,78 +444,104 @@ bgw_main_concurrent_part(Datum main_arg)
425
444
* Allocate as SQL query in top memory context because current
426
445
* context will be destroyed after transaction finishes
427
446
*/
428
- oldcontext = MemoryContextSwitchTo (worker_context );
447
+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
429
448
sql = psprintf ("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)" ,
430
- get_namespace_name (get_pathman_schema ()));
449
+ get_namespace_name (get_pathman_schema ()));
431
450
MemoryContextSwitchTo (oldcontext );
432
451
}
433
452
453
+ /* Exec ret = _partition_data_concurrent() */
434
454
PG_TRY ();
435
455
{
436
456
int ret ;
437
457
bool isnull ;
438
458
439
459
ret = SPI_execute_with_args (sql ,2 ,types ,vals ,nulls , false,0 );
440
- if (ret > 0 )
460
+ if (ret == SPI_OK_SELECT )
441
461
{
442
462
TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
443
463
HeapTuple tuple = SPI_tuptable -> vals [0 ];
444
464
445
- Assert (SPI_processed == 1 );
465
+ Assert (SPI_processed == 1 );/* there should be 1 result at most */
446
466
447
467
rows = DatumGetInt32 (SPI_getbinval (tuple ,tupdesc ,1 ,& isnull ));
468
+
469
+ Assert (!isnull );/* ... and ofc it must not be NULL */
448
470
}
449
471
}
450
472
PG_CATCH ();
451
473
{
452
474
ErrorData * error ;
475
+
453
476
EmitErrorReport ();
477
+
454
478
error = CopyErrorData ();
455
- elog (LOG ,"Worker error : %s" ,error -> message );
479
+ elog (LOG ,"%s : %s" , concurrent_part_bgw ,error -> message );
456
480
FlushErrorState ();
481
+ FreeErrorData (error );
457
482
458
483
/*
459
484
* The most common exception we can catch here is a deadlock with
460
485
* concurrent user queries. Check that attempts count doesn't exceed
461
486
* some reasonable value
462
487
*/
463
- if (100 <= failures_count ++ )
488
+ if (failures_count ++ > PART_WORKER_MAX_ATTEMPTS )
464
489
{
465
- pfree (sql );
466
- args -> worker_status = WS_FREE ;
490
+ /* Mark slot as FREE */
491
+ part_slot -> worker_status = WS_FREE ;
492
+
467
493
elog (LOG ,
468
- "The concurrent partitioning worker exiting because the "
469
- "maximum attempts count exceeded. See the error message below" );
470
- exit (1 );
494
+ "Concurrent partitioning worker has canceled the task because "
495
+ "maximum amount of attempts (%d) had been exceeded. "
496
+ "See the error message below" ,
497
+ PART_WORKER_MAX_ATTEMPTS );
498
+
499
+ return ;/* exit quickly */
471
500
}
501
+
502
+ /* Set 'failed' flag */
472
503
failed = true;
473
504
}
474
505
PG_END_TRY ();
475
506
476
507
SPI_finish ();
477
508
PopActiveSnapshot ();
509
+
478
510
if (failed )
479
511
{
480
- /* abort transaction and sleep for a second */
512
+ #ifdef USE_ASSERT_CHECKING
513
+ elog (DEBUG2 ,"%s: could not relocate batch, total: %lu [%u]" ,
514
+ concurrent_part_bgw ,part_slot -> total_rows ,MyProcPid );
515
+ #endif
516
+
517
+ /* Abort transaction and sleep for a second */
481
518
AbortCurrentTransaction ();
482
- DirectFunctionCall1 (pg_sleep ,Float8GetDatum (1 ));
519
+ DirectFunctionCall1 (pg_sleep ,Float8GetDatum (part_slot -> sleep_time ));
483
520
}
484
521
else
485
522
{
486
- /*Reset failures counter andcommit transaction */
523
+ /*Commit transaction andreset 'failures_count' */
487
524
CommitTransactionCommand ();
488
525
failures_count = 0 ;
489
- args -> total_rows += rows ;
526
+
527
+ /* Add rows to total_rows */
528
+ part_slot -> total_rows += rows ;
529
+
530
+ #ifdef USE_ASSERT_CHECKING
531
+ elog (DEBUG2 ,"%s: relocated %d rows, total: %lu [%u]" ,
532
+ concurrent_part_bgw ,rows ,part_slot -> total_rows ,MyProcPid );
533
+ #endif
490
534
}
491
535
492
- /* If other backend requested to stopworker then quit */
493
- if (args -> worker_status == WS_STOPPING )
536
+ /* If other backend requested to stopus, quit */
537
+ if (part_slot -> worker_status == WS_STOPPING )
494
538
break ;
495
539
}
496
- while (rows > 0 || failed );/* do while there is still rows torelocate */
540
+ while (rows > 0 || failed );/* do while there's still rows tobe relocated */
497
541
542
+ /* Reclaim the resources */
498
543
pfree (sql );
499
- args -> worker_status = WS_FREE ;
544
+ part_slot -> worker_status = WS_FREE ;
500
545
}
501
546
502
547
@@ -513,6 +558,8 @@ bgw_main_concurrent_part(Datum main_arg)
513
558
Datum
514
559
partition_table_concurrently (PG_FUNCTION_ARGS )
515
560
{
561
+ #define tostr (str ) ( #str )
562
+
516
563
Oid relid = PG_GETARG_OID (0 );
517
564
ConcurrentPartSlot * my_slot = NULL ;
518
565
int empty_slot_idx = -1 ;
@@ -550,7 +597,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
550
597
elog (ERROR ,"No empty worker slots found" );
551
598
552
599
/* Initialize concurrent part slot */
553
- InitConcurrentPartSlot (my_slot ,WS_WORKING ,MyDatabaseId ,relid );
600
+ InitConcurrentPartSlot (my_slot ,GetAuthenticatedUserId (),
601
+ WS_WORKING ,MyDatabaseId ,relid ,
602
+ 1000 ,1.0 );
554
603
555
604
/* Start worker (we should not wait) */
556
605
start_bg_worker (concurrent_part_bgw ,
@@ -560,8 +609,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
560
609
561
610
/* Tell user everything's fine */
562
611
elog (NOTICE ,
563
- "Worker started. You can stop it with the following command: "
564
- "select stop_concurrent_part_task('%s');" ,
612
+ "Worker started. You can stop it "
613
+ "with the following command: select %s('%s');" ,
614
+ tostr (stop_concurrent_part_task ),/* convert function's name to literal */
565
615
get_rel_name (relid ));
566
616
567
617
PG_RETURN_VOID ();
@@ -594,12 +644,20 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
594
644
userctx -> cur_idx = 0 ;
595
645
596
646
/* Create tuple descriptor */
597
- tupdesc = CreateTemplateTupleDesc (5 , false);
598
- TupleDescInitEntry (tupdesc , (AttrNumber )1 ,"pid" ,INT4OID ,-1 ,0 );
599
- TupleDescInitEntry (tupdesc , (AttrNumber )2 ,"dbid" ,OIDOID ,-1 ,0 );
600
- TupleDescInitEntry (tupdesc , (AttrNumber )3 ,"relid" ,REGCLASSOID ,-1 ,0 );
601
- TupleDescInitEntry (tupdesc , (AttrNumber )4 ,"processed" ,INT4OID ,-1 ,0 );
602
- TupleDescInitEntry (tupdesc , (AttrNumber )5 ,"status" ,TEXTOID ,-1 ,0 );
647
+ tupdesc = CreateTemplateTupleDesc (Natts_pathman_cp_tasks , false);
648
+
649
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_userid ,
650
+ "userid" ,REGROLEOID ,-1 ,0 );
651
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_pid ,
652
+ "pid" ,INT4OID ,-1 ,0 );
653
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_dbid ,
654
+ "dbid" ,OIDOID ,-1 ,0 );
655
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_relid ,
656
+ "relid" ,REGCLASSOID ,-1 ,0 );
657
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_processed ,
658
+ "processed" ,INT4OID ,-1 ,0 );
659
+ TupleDescInitEntry (tupdesc ,Anum_pathman_cp_tasks_status ,
660
+ "status" ,TEXTOID ,-1 ,0 );
603
661
604
662
funcctx -> tuple_desc = BlessTupleDesc (tupdesc );
605
663
funcctx -> user_fctx = (void * )userctx ;
@@ -610,35 +668,39 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
610
668
funcctx = SRF_PERCALL_SETUP ();
611
669
userctx = (active_workers_cxt * )funcctx -> user_fctx ;
612
670
613
- /*
614
- * Iterate through worker slots
615
- */
671
+ /* Iterate through worker slots */
616
672
for (i = userctx -> cur_idx ;i < PART_WORKER_SLOTS ;i ++ )
617
673
{
618
- if (concurrent_part_slots [i ].worker_status != WS_FREE )
674
+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
675
+
676
+ if (cur_slot -> worker_status != WS_FREE )
619
677
{
620
678
HeapTuple tuple ;
621
- Datum values [5 ];
622
- bool isnull [5 ]= {false, false, false, false, false };
679
+ Datum values [Natts_pathman_cp_tasks ];
680
+ bool isnull [Natts_pathman_cp_tasks ]= {0 , 0 , 0 , 0 , 0 , 0 };
623
681
624
- values [0 ]= concurrent_part_slots [i ].pid ;
625
- values [1 ]= concurrent_part_slots [i ].dbid ;
626
- values [2 ]= concurrent_part_slots [i ].relid ;
627
- values [3 ]= concurrent_part_slots [i ].total_rows ;
682
+ values [Anum_pathman_cp_tasks_userid - 1 ]= cur_slot -> userid ;
683
+ values [Anum_pathman_cp_tasks_pid - 1 ]= cur_slot -> pid ;
684
+ values [Anum_pathman_cp_tasks_dbid - 1 ]= cur_slot -> dbid ;
685
+ values [Anum_pathman_cp_tasks_relid - 1 ]= cur_slot -> relid ;
686
+ values [Anum_pathman_cp_tasks_processed - 1 ]= cur_slot -> total_rows ;
628
687
629
688
/* Now build a status string */
630
- switch (concurrent_part_slots [ i ]. worker_status )
689
+ switch (cur_slot -> worker_status )
631
690
{
632
691
case WS_WORKING :
633
- values [4 ]= PointerGetDatum (pstrdup ("working" ));
692
+ values [Anum_pathman_cp_tasks_status - 1 ]=
693
+ PointerGetDatum (cstring_to_text ("working" ));
634
694
break ;
635
695
636
696
case WS_STOPPING :
637
- values [4 ]= PointerGetDatum (pstrdup ("stopping" ));
697
+ values [Anum_pathman_cp_tasks_status - 1 ]=
698
+ PointerGetDatum (cstring_to_text ("stopping" ));
638
699
break ;
639
700
640
701
default :
641
- values [4 ]= PointerGetDatum (pstrdup ("[unknown]" ));
702
+ values [Anum_pathman_cp_tasks_status - 1 ]=
703
+ PointerGetDatum (cstring_to_text ("[unknown]" ));
642
704
}
643
705
644
706
/* Form output tuple */
@@ -670,7 +732,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
670
732
concurrent_part_slots [i ].dbid == MyDatabaseId )
671
733
{
672
734
concurrent_part_slots [i ].worker_status = WS_STOPPING ;
673
- elog (NOTICE ,"Worker will stop after current batch's finished " );
735
+ elog (NOTICE ,"Worker will stop afterit finishes current batch" );
674
736
675
737
PG_RETURN_BOOL (true);
676
738
}