Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd3fa9f8

Browse files
committed
Merge branch 'master' into rel_future_beta
2 parentsb14937d +6b00d81 commitd3fa9f8

File tree

2 files changed

+87
-54
lines changed

2 files changed

+87
-54
lines changed

‎src/include/pathman_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ typedef struct
7474
pid_tpid;/* worker's PID */
7575
Oiddbid;/* database which contains the relation */
7676
Oidrelid;/* table to be partitioned concurrently */
77-
uint64total_rows;/* total amount of rows processed */
77+
int64total_rows;/* total amount of rows processed */
7878

7979
int32batch_size;/* number of rows in a batch */
8080
float8sleep_time;/* how long should we sleep in case of error? */

‎src/pathman_workers.c

Lines changed: 86 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443
void
444444
bgw_main_concurrent_part(Datummain_arg)
445445
{
446-
introws;
446+
ConcurrentPartSlot*part_slot;
447+
char*sql=NULL;
448+
int64rows;
447449
boolfailed;
448450
intfailures_count=0;
449-
char*sql=NULL;
450-
ConcurrentPartSlot*part_slot;
451+
LOCKMODElockmode=RowExclusiveLock;
451452

452453
/* Update concurrent part slot */
453454
part_slot=&concurrent_part_slots[DatumGetInt32(main_arg)];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480
/* Do the job */
480481
do
481482
{
482-
MemoryContextold_mcxt;
483+
MemoryContextold_mcxt;
483484

484485
Oidtypes[2]= {OIDOID,INT4OID };
485486
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
486487
boolnulls[2]= { false,false };
487488

489+
boolrel_locked= false;
490+
488491
/* Reset loop variables */
489492
failed= false;
490493
rows=0;
@@ -520,66 +523,92 @@ bgw_main_concurrent_part(Datum main_arg)
520523
/* Exec ret = _partition_data_concurrent() */
521524
PG_TRY();
522525
{
523-
/* Make sure that relation exists and has partitions */
524-
if (SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid))&&
525-
get_pathman_relation_info(part_slot->relid)!=NULL)
526-
{
527-
intret;
528-
boolisnull;
526+
intret;
527+
boolisnull;
529528

530-
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
531-
if (ret==SPI_OK_SELECT)
532-
{
533-
TupleDesctupdesc=SPI_tuptable->tupdesc;
534-
HeapTupletuple=SPI_tuptable->vals[0];
529+
/* Lock relation for DELETE and INSERT */
530+
if (!ConditionalLockRelationOid(part_slot->relid,lockmode))
531+
{
532+
elog(ERROR,"could not take lock on relation %u",part_slot->relid);
533+
}
535534

536-
Assert(SPI_processed==1);/* there should be 1 result at most */
535+
/* Great, now relation is locked */
536+
rel_locked= true;
537+
(void)rel_locked;/* mute clang analyzer */
537538

538-
rows=DatumGetInt32(SPI_getbinval(tuple,tupdesc,1,&isnull));
539+
/* Make sure that relation exists */
540+
if (!SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid)))
541+
{
542+
/* Exit after we raise ERROR */
543+
failures_count=PART_WORKER_MAX_ATTEMPTS;
544+
(void)failures_count;/* mute clang analyzer */
539545

540-
Assert(!isnull);/* ... and ofc it must not be NULL */
541-
}
546+
elog(ERROR,"relation %u does not exist",part_slot->relid);
542547
}
543-
/* Otherwise it's time to exit */
544-
else
548+
549+
/* Make sure that relation has partitions */
550+
if (get_pathman_relation_info(part_slot->relid)==NULL)
545551
{
552+
/* Exit after we raise ERROR */
546553
failures_count=PART_WORKER_MAX_ATTEMPTS;
554+
(void)failures_count;/* mute clang analyzer */
555+
556+
elog(ERROR,"relation \"%s\" is not partitioned",
557+
get_rel_name(part_slot->relid));
558+
}
559+
560+
/* Call concurrent partitioning function */
561+
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
562+
if (ret==SPI_OK_SELECT)
563+
{
564+
TupleDesctupdesc=SPI_tuptable->tupdesc;
565+
HeapTupletuple=SPI_tuptable->vals[0];
547566

548-
elog(LOG,"relation \"%u\" is not partitioned (or does not exist)",
549-
part_slot->relid);
567+
/* There should be 1 result at most */
568+
Assert(SPI_processed==1);
569+
570+
/* Extract number of processed rows */
571+
rows=DatumGetInt64(SPI_getbinval(tuple,tupdesc,1,&isnull));
572+
Assert(!isnull);/* ... and ofc it must not be NULL */
550573
}
574+
/* Else raise generic error */
575+
elseelog(ERROR,"partitioning function returned %u",ret);
576+
577+
/* Finally, unlock our partitioned table */
578+
UnlockRelationOid(part_slot->relid,lockmode);
551579
}
552580
PG_CATCH();
553581
{
554582
/*
555583
* The most common exception we can catch here is a deadlock with
556584
* concurrent user queries. Check that attempts count doesn't exceed
557-
* some reasonable value
585+
* some reasonable value.
558586
*/
559-
ErrorData*error;
560-
char*sleep_time_str;
587+
ErrorData*error;
588+
589+
/* Unlock relation if we caught ERROR too early */
590+
if (rel_locked)
591+
UnlockRelationOid(part_slot->relid,lockmode);
592+
593+
/* Increase number of failures and set 'failed' status */
594+
failures_count++;
595+
failed= true;
561596

562597
/* Switch to the original context & copy edata */
563598
MemoryContextSwitchTo(old_mcxt);
564599
error=CopyErrorData();
565600
FlushErrorState();
566601

567602
/* Print messsage for this BGWorker to server log */
568-
sleep_time_str=datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
569-
FLOAT8OID);
570-
failures_count++;
571603
ereport(LOG,
572604
(errmsg("%s: %s",concurrent_part_bgw,error->message),
573-
errdetail("attempt: %d/%d, sleep time: %s",
605+
errdetail("attempt: %d/%d, sleep time: %.2f",
574606
failures_count,
575607
PART_WORKER_MAX_ATTEMPTS,
576-
sleep_time_str)));
577-
pfree(sleep_time_str);/* free the time string */
608+
(float)part_slot->sleep_time)));
578609

610+
/* Finally, free error data */
579611
FreeErrorData(error);
580-
581-
/* Set 'failed' flag */
582-
failed= true;
583612
}
584613
PG_END_TRY();
585614

@@ -606,9 +635,10 @@ bgw_main_concurrent_part(Datum main_arg)
606635
/* Failed this time, wait */
607636
elseif (failed)
608637
{
609-
/* Abort transactionand sleep for a second*/
638+
/* Abort transaction */
610639
AbortCurrentTransaction();
611640

641+
/* Sleep for a specified amount of time (default 1s) */
612642
DirectFunctionCall1(pg_sleep,Float8GetDatum(part_slot->sleep_time));
613643
}
614644

@@ -626,8 +656,10 @@ bgw_main_concurrent_part(Datum main_arg)
626656

627657
#ifdefUSE_ASSERT_CHECKING
628658
/* Report debug message */
629-
elog(DEBUG1,"%s: relocated %d rows, total: "UINT64_FORMAT" [%u]",
630-
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
659+
elog(DEBUG1,"%s: "
660+
"relocated"INT64_FORMAT"rows, "
661+
"total: "INT64_FORMAT,
662+
concurrent_part_bgw,rows,part_slot->total_rows);
631663
#endif
632664
}
633665

@@ -636,9 +668,6 @@ bgw_main_concurrent_part(Datum main_arg)
636668
break;
637669
}
638670
while(rows>0||failed);/* do while there's still rows to be relocated */
639-
640-
/* Reclaim the resources */
641-
pfree(sql);
642671
}
643672

644673

@@ -824,26 +853,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824853
/* Iterate through worker slots */
825854
for (i=userctx->cur_idx;i<PART_WORKER_SLOTS;i++)
826855
{
827-
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
856+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i],
857+
slot_copy;
828858
HeapTuplehtup=NULL;
829859

830-
HOLD_INTERRUPTS();
860+
/* Copy slot to process local memory */
831861
SpinLockAcquire(&cur_slot->mutex);
862+
memcpy(&slot_copy,cur_slot,sizeof(ConcurrentPartSlot));
863+
SpinLockRelease(&cur_slot->mutex);
832864

833-
if (cur_slot->worker_status!=CPS_FREE)
865+
if (slot_copy.worker_status!=CPS_FREE)
834866
{
835867
Datumvalues[Natts_pathman_cp_tasks];
836868
boolisnull[Natts_pathman_cp_tasks]= {0 };
837869

838-
values[Anum_pathman_cp_tasks_userid-1]=cur_slot->userid;
839-
values[Anum_pathman_cp_tasks_pid-1]=cur_slot->pid;
840-
values[Anum_pathman_cp_tasks_dbid-1]=cur_slot->dbid;
841-
values[Anum_pathman_cp_tasks_relid-1]=cur_slot->relid;
842-
values[Anum_pathman_cp_tasks_processed-1]=cur_slot->total_rows;
870+
values[Anum_pathman_cp_tasks_userid-1]=slot_copy.userid;
871+
values[Anum_pathman_cp_tasks_pid-1]=slot_copy.pid;
872+
values[Anum_pathman_cp_tasks_dbid-1]=slot_copy.dbid;
873+
values[Anum_pathman_cp_tasks_relid-1]=slot_copy.relid;
874+
875+
/* Record processed rows */
876+
values[Anum_pathman_cp_tasks_processed-1]=
877+
/* FIXME: use Int64GetDatum() in release 1.5 */
878+
Int32GetDatum((int32)slot_copy.total_rows);
843879

844880
/* Now build a status string */
845881
values[Anum_pathman_cp_tasks_status-1]=
846-
CStringGetTextDatum(cps_print_status(cur_slot->worker_status));
882+
CStringGetTextDatum(cps_print_status(slot_copy.worker_status));
847883

848884
/* Form output tuple */
849885
htup=heap_form_tuple(funcctx->tuple_desc,values,isnull);
@@ -852,9 +888,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852888
userctx->cur_idx=i+1;
853889
}
854890

855-
SpinLockRelease(&cur_slot->mutex);
856-
RESUME_INTERRUPTS();
857-
858891
/* Return tuple if needed */
859892
if (htup)
860893
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(htup));

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp