Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit11be0fe

Browse files
committed
ConcurrentPartWorker connects to DB using a given role, new macros for pathman_concurrent_part_tasks, fixes
1 parent4ddfbb5 commit11be0fe

File tree

4 files changed

+158
-70
lines changed

4 files changed

+158
-70
lines changed

‎init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ LANGUAGE plpgsql;
158158
*/
159159
CREATEOR REPLACE FUNCTION @extschema@.show_concurrent_part_tasks()
160160
RETURNS TABLE (
161+
useridREGROLE,
161162
pidINT,
162163
dbidOID,
163164
relidREGCLASS,
@@ -188,7 +189,7 @@ RETURNS BOOL AS 'pg_pathman', 'stop_concurrent_part_task' LANGUAGE C STRICT;
188189
* Copy rows to partitions concurrently.
189190
*/
190191
CREATEOR REPLACE FUNCTION @extschema@._partition_data_concurrent(
191-
p_relationregclass,
192+
p_relationREGCLASS,
192193
p_minANYELEMENT DEFAULTNULL::text,
193194
p_maxANYELEMENT DEFAULTNULL::text,
194195
p_limitINT DEFAULTNULL,

‎src/pathman_workers.c

Lines changed: 123 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include"pathman_workers.h"
1919
#include"relation_info.h"
2020
#include"utils.h"
21+
#include"xact_handling.h"
2122

2223
#include"access/htup_details.h"
2324
#include"access/xact.h"
@@ -31,6 +32,7 @@
3132
#include"storage/latch.h"
3233
#include"utils/builtins.h"
3334
#include"utils/datum.h"
35+
#include"utils/memutils.h"
3436
#include"utils/lsyscache.h"
3537
#include"utils/typcache.h"
3638
#include"utils/resowner.h"
@@ -351,6 +353,17 @@ bgw_main_spawn_partitions(Datum main_arg)
351353
DebugPrintDatum(value,args->value_type),MyProcPid);
352354
#endif
353355

356+
/* Check again if there's a conflicting lock */
357+
if (xact_conflicting_lock_exists(args->partitioned_table))
358+
{
359+
elog(LOG,"%s: there's a conflicting lock on relation \"%s\"",
360+
spawn_partitions_bgw,
361+
get_rel_name_or_relid(args->partitioned_table));
362+
363+
dsm_detach(segment);
364+
return;/* exit quickly */
365+
}
366+
354367
/* Create partitions and save the Oid of the last one */
355368
args->result=create_partitions_internal(args->partitioned_table,
356369
value,/* unpacked Datum */
@@ -378,45 +391,51 @@ bgw_main_spawn_partitions(Datum main_arg)
378391
staticvoid
379392
bgw_main_concurrent_part(Datummain_arg)
380393
{
381-
ConcurrentPartSlot*args;
382-
Oidtypes[2]= {OIDOID,INT4OID };
383-
Datumvals[2];
384-
boolnulls[2]= { false, false };
385394
introws;
386-
intslot_idx=DatumGetInt32(main_arg);
387-
MemoryContextworker_context=CurrentMemoryContext;
388-
intfailures_count=0;
389395
boolfailed;
396+
intfailures_count=0;
390397
char*sql=NULL;
391-
392-
/* Create resource owner */
393-
CurrentResourceOwner=ResourceOwnerCreate(NULL,"PartitionDataWorker");
394-
395-
args=&concurrent_part_slots[slot_idx];
396-
args->pid=MyProcPid;
397-
vals[0]=args->relid;
398-
vals[1]=10000;
398+
ConcurrentPartSlot*part_slot;
399399

400400
/* Establish signal handlers before unblocking signals. */
401401
pqsignal(SIGTERM,handle_sigterm);
402402

403403
/* We're now ready to receive signals */
404404
BackgroundWorkerUnblockSignals();
405405

406+
/* Create resource owner */
407+
CurrentResourceOwner=ResourceOwnerCreate(NULL,concurrent_part_bgw);
408+
409+
/* Update concurrent part slot */
410+
part_slot=&concurrent_part_slots[DatumGetInt32(main_arg)];
411+
part_slot->pid=MyProcPid;
412+
406413
/* Establish connection and start transaction */
407-
BackgroundWorkerInitializeConnectionByOid(args->dbid,InvalidOid);
414+
BackgroundWorkerInitializeConnectionByOid(part_slot->dbid,part_slot->userid);
408415

416+
/* Initialize pg_pathman's local config */
417+
StartTransactionCommand();
418+
bg_worker_load_config(concurrent_part_bgw);
419+
CommitTransactionCommand();
420+
421+
/* Do the job */
409422
do
410423
{
424+
Oidtypes[2]= {OIDOID,INT4OID };
425+
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
426+
boolnulls[2]= { false,false };
427+
428+
/* Reset loop variables */
411429
failed= false;
412430
rows=0;
431+
432+
/* Start new transaction (syscache access etc.) */
413433
StartTransactionCommand();
414-
bg_worker_load_config("PartitionDataWorker");
415434

416435
SPI_connect();
417436
PushActiveSnapshot(GetTransactionSnapshot());
418437

419-
/*Do some preparation withinthefirst iteration */
438+
/*Preparethequery if needed */
420439
if (sql==NULL)
421440
{
422441
MemoryContextoldcontext;
@@ -425,78 +444,104 @@ bgw_main_concurrent_part(Datum main_arg)
425444
* Allocate as SQL query in top memory context because current
426445
* context will be destroyed after transaction finishes
427446
*/
428-
oldcontext=MemoryContextSwitchTo(worker_context);
447+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
429448
sql=psprintf("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)",
430-
get_namespace_name(get_pathman_schema()));
449+
get_namespace_name(get_pathman_schema()));
431450
MemoryContextSwitchTo(oldcontext);
432451
}
433452

453+
/* Exec ret = _partition_data_concurrent() */
434454
PG_TRY();
435455
{
436456
intret;
437457
boolisnull;
438458

439459
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
440-
if (ret>0)
460+
if (ret==SPI_OK_SELECT)
441461
{
442462
TupleDesctupdesc=SPI_tuptable->tupdesc;
443463
HeapTupletuple=SPI_tuptable->vals[0];
444464

445-
Assert(SPI_processed==1);
465+
Assert(SPI_processed==1);/* there should be 1 result at most */
446466

447467
rows=DatumGetInt32(SPI_getbinval(tuple,tupdesc,1,&isnull));
468+
469+
Assert(!isnull);/* ... and ofc it must not be NULL */
448470
}
449471
}
450472
PG_CATCH();
451473
{
452474
ErrorData*error;
475+
453476
EmitErrorReport();
477+
454478
error=CopyErrorData();
455-
elog(LOG,"Worker error: %s",error->message);
479+
elog(LOG,"%s: %s",concurrent_part_bgw,error->message);
456480
FlushErrorState();
481+
FreeErrorData(error);
457482

458483
/*
459484
* The most common exception we can catch here is a deadlock with
460485
* concurrent user queries. Check that attempts count doesn't exceed
461486
* some reasonable value
462487
*/
463-
if (100 <=failures_count++)
488+
if (failures_count++>PART_WORKER_MAX_ATTEMPTS)
464489
{
465-
pfree(sql);
466-
args->worker_status=WS_FREE;
490+
/* Mark slot as FREE */
491+
part_slot->worker_status=WS_FREE;
492+
467493
elog(LOG,
468-
"The concurrent partitioning worker exiting because the "
469-
"maximum attempts count exceeded. See the error message below");
470-
exit(1);
494+
"Concurrent partitioning worker has canceled the task because "
495+
"maximum amount of attempts (%d) had been exceeded. "
496+
"See the error message below",
497+
PART_WORKER_MAX_ATTEMPTS);
498+
499+
return;/* exit quickly */
471500
}
501+
502+
/* Set 'failed' flag */
472503
failed= true;
473504
}
474505
PG_END_TRY();
475506

476507
SPI_finish();
477508
PopActiveSnapshot();
509+
478510
if (failed)
479511
{
480-
/* abort transaction and sleep for a second */
512+
#ifdefUSE_ASSERT_CHECKING
513+
elog(DEBUG2,"%s: could not relocate batch, total: %lu [%u]",
514+
concurrent_part_bgw,part_slot->total_rows,MyProcPid);
515+
#endif
516+
517+
/* Abort transaction and sleep for a second */
481518
AbortCurrentTransaction();
482-
DirectFunctionCall1(pg_sleep,Float8GetDatum(1));
519+
DirectFunctionCall1(pg_sleep,Float8GetDatum(part_slot->sleep_time));
483520
}
484521
else
485522
{
486-
/*Reset failures counterandcommit transaction */
523+
/*Commit transactionandreset 'failures_count' */
487524
CommitTransactionCommand();
488525
failures_count=0;
489-
args->total_rows+=rows;
526+
527+
/* Add rows to total_rows */
528+
part_slot->total_rows+=rows;
529+
530+
#ifdefUSE_ASSERT_CHECKING
531+
elog(DEBUG2,"%s: relocated %d rows, total: %lu [%u]",
532+
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
533+
#endif
490534
}
491535

492-
/* If other backend requested to stopworker then quit */
493-
if (args->worker_status==WS_STOPPING)
536+
/* If other backend requested to stopus, quit */
537+
if (part_slot->worker_status==WS_STOPPING)
494538
break;
495539
}
496-
while(rows>0||failed);/* do while there isstill rows torelocate */
540+
while(rows>0||failed);/* do while there'sstill rows tobe relocated */
497541

542+
/* Reclaim the resources */
498543
pfree(sql);
499-
args->worker_status=WS_FREE;
544+
part_slot->worker_status=WS_FREE;
500545
}
501546

502547

@@ -513,6 +558,8 @@ bgw_main_concurrent_part(Datum main_arg)
513558
Datum
514559
partition_table_concurrently(PG_FUNCTION_ARGS)
515560
{
561+
#definetostr(str) ( #str )
562+
516563
Oidrelid=PG_GETARG_OID(0);
517564
ConcurrentPartSlot*my_slot=NULL;
518565
intempty_slot_idx=-1;
@@ -550,7 +597,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
550597
elog(ERROR,"No empty worker slots found");
551598

552599
/* Initialize concurrent part slot */
553-
InitConcurrentPartSlot(my_slot,WS_WORKING,MyDatabaseId,relid);
600+
InitConcurrentPartSlot(my_slot,GetAuthenticatedUserId(),
601+
WS_WORKING,MyDatabaseId,relid,
602+
1000,1.0);
554603

555604
/* Start worker (we should not wait) */
556605
start_bg_worker(concurrent_part_bgw,
@@ -560,8 +609,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
560609

561610
/* Tell user everything's fine */
562611
elog(NOTICE,
563-
"Worker started. You can stop it with the following command: "
564-
"select stop_concurrent_part_task('%s');",
612+
"Worker started. You can stop it "
613+
"with the following command: select %s('%s');",
614+
tostr(stop_concurrent_part_task),/* convert function's name to literal */
565615
get_rel_name(relid));
566616

567617
PG_RETURN_VOID();
@@ -594,12 +644,20 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
594644
userctx->cur_idx=0;
595645

596646
/* Create tuple descriptor */
597-
tupdesc=CreateTemplateTupleDesc(5, false);
598-
TupleDescInitEntry(tupdesc, (AttrNumber)1,"pid",INT4OID,-1,0);
599-
TupleDescInitEntry(tupdesc, (AttrNumber)2,"dbid",OIDOID,-1,0);
600-
TupleDescInitEntry(tupdesc, (AttrNumber)3,"relid",REGCLASSOID,-1,0);
601-
TupleDescInitEntry(tupdesc, (AttrNumber)4,"processed",INT4OID,-1,0);
602-
TupleDescInitEntry(tupdesc, (AttrNumber)5,"status",TEXTOID,-1,0);
647+
tupdesc=CreateTemplateTupleDesc(Natts_pathman_cp_tasks, false);
648+
649+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_userid,
650+
"userid",REGROLEOID,-1,0);
651+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_pid,
652+
"pid",INT4OID,-1,0);
653+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_dbid,
654+
"dbid",OIDOID,-1,0);
655+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_relid,
656+
"relid",REGCLASSOID,-1,0);
657+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_processed,
658+
"processed",INT4OID,-1,0);
659+
TupleDescInitEntry(tupdesc,Anum_pathman_cp_tasks_status,
660+
"status",TEXTOID,-1,0);
603661

604662
funcctx->tuple_desc=BlessTupleDesc(tupdesc);
605663
funcctx->user_fctx= (void*)userctx;
@@ -610,35 +668,39 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
610668
funcctx=SRF_PERCALL_SETUP();
611669
userctx= (active_workers_cxt*)funcctx->user_fctx;
612670

613-
/*
614-
* Iterate through worker slots
615-
*/
671+
/* Iterate through worker slots */
616672
for (i=userctx->cur_idx;i<PART_WORKER_SLOTS;i++)
617673
{
618-
if (concurrent_part_slots[i].worker_status!=WS_FREE)
674+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
675+
676+
if (cur_slot->worker_status!=WS_FREE)
619677
{
620678
HeapTupletuple;
621-
Datumvalues[5];
622-
boolisnull[5]= {false, false, false, false, false };
679+
Datumvalues[Natts_pathman_cp_tasks];
680+
boolisnull[Natts_pathman_cp_tasks]= {0,0,0,0,0,0 };
623681

624-
values[0]=concurrent_part_slots[i].pid;
625-
values[1]=concurrent_part_slots[i].dbid;
626-
values[2]=concurrent_part_slots[i].relid;
627-
values[3]=concurrent_part_slots[i].total_rows;
682+
values[Anum_pathman_cp_tasks_userid-1]=cur_slot->userid;
683+
values[Anum_pathman_cp_tasks_pid-1]=cur_slot->pid;
684+
values[Anum_pathman_cp_tasks_dbid-1]=cur_slot->dbid;
685+
values[Anum_pathman_cp_tasks_relid-1]=cur_slot->relid;
686+
values[Anum_pathman_cp_tasks_processed-1]=cur_slot->total_rows;
628687

629688
/* Now build a status string */
630-
switch(concurrent_part_slots[i].worker_status)
689+
switch(cur_slot->worker_status)
631690
{
632691
caseWS_WORKING:
633-
values[4]=PointerGetDatum(pstrdup("working"));
692+
values[Anum_pathman_cp_tasks_status-1]=
693+
PointerGetDatum(cstring_to_text("working"));
634694
break;
635695

636696
caseWS_STOPPING:
637-
values[4]=PointerGetDatum(pstrdup("stopping"));
697+
values[Anum_pathman_cp_tasks_status-1]=
698+
PointerGetDatum(cstring_to_text("stopping"));
638699
break;
639700

640701
default:
641-
values[4]=PointerGetDatum(pstrdup("[unknown]"));
702+
values[Anum_pathman_cp_tasks_status-1]=
703+
PointerGetDatum(cstring_to_text("[unknown]"));
642704
}
643705

644706
/* Form output tuple */
@@ -670,7 +732,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
670732
concurrent_part_slots[i].dbid==MyDatabaseId)
671733
{
672734
concurrent_part_slots[i].worker_status=WS_STOPPING;
673-
elog(NOTICE,"Worker will stop after current batch's finished");
735+
elog(NOTICE,"Worker will stop afterit finishescurrent batch");
674736

675737
PG_RETURN_BOOL(true);
676738
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp