Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita49fdac

Browse files
committed
bugfixes and improved error handling in ConcurrentPartWorker
1 parent6494216 commita49fdac

File tree

2 files changed

+84
-54
lines changed

2 files changed

+84
-54
lines changed

‎src/include/pathman_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ typedef struct
7474
pid_tpid;/* worker's PID */
7575
Oiddbid;/* database which contains the relation */
7676
Oidrelid;/* table to be partitioned concurrently */
77-
uint64total_rows;/* total amount of rows processed */
77+
int64total_rows;/* total amount of rows processed */
7878

7979
int32batch_size;/* number of rows in a batch */
8080
float8sleep_time;/* how long should we sleep in case of error? */

‎src/pathman_workers.c

Lines changed: 83 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443
void
444444
bgw_main_concurrent_part(Datummain_arg)
445445
{
446-
introws;
446+
ConcurrentPartSlot*part_slot;
447+
char*sql=NULL;
448+
int64rows;
447449
boolfailed;
448450
intfailures_count=0;
449-
char*sql=NULL;
450-
ConcurrentPartSlot*part_slot;
451+
LOCKMODElockmode=RowExclusiveLock;
451452

452453
/* Update concurrent part slot */
453454
part_slot=&concurrent_part_slots[DatumGetInt32(main_arg)];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480
/* Do the job */
480481
do
481482
{
482-
MemoryContextold_mcxt;
483+
MemoryContextold_mcxt;
483484

484485
Oidtypes[2]= {OIDOID,INT4OID };
485486
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
486487
boolnulls[2]= { false,false };
487488

489+
boolrel_locked= false;
490+
488491
/* Reset loop variables */
489492
failed= false;
490493
rows=0;
@@ -520,66 +523,89 @@ bgw_main_concurrent_part(Datum main_arg)
520523
/* Exec ret = _partition_data_concurrent() */
521524
PG_TRY();
522525
{
523-
/* Make sure that relation exists and has partitions */
524-
if (SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid))&&
525-
get_pathman_relation_info(part_slot->relid)!=NULL)
526-
{
527-
intret;
528-
boolisnull;
526+
intret;
527+
boolisnull;
529528

530-
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
531-
if (ret==SPI_OK_SELECT)
532-
{
533-
TupleDesctupdesc=SPI_tuptable->tupdesc;
534-
HeapTupletuple=SPI_tuptable->vals[0];
529+
/* Lock relation for DELETE and INSERT */
530+
if (!ConditionalLockRelationOid(part_slot->relid,lockmode))
531+
{
532+
elog(ERROR,"could not take lock on relation %u",part_slot->relid);
533+
}
535534

536-
Assert(SPI_processed==1);/* there should be 1 result at most */
535+
/* Great, now relation is locked */
536+
rel_locked= true;
537537

538-
rows=DatumGetInt32(SPI_getbinval(tuple,tupdesc,1,&isnull));
538+
/* Make sure that relation exists */
539+
if (!SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid)))
540+
{
541+
/* Exit after we raise ERROR */
542+
failures_count=PART_WORKER_MAX_ATTEMPTS;
539543

540-
Assert(!isnull);/* ... and ofc it must not be NULL */
541-
}
544+
elog(ERROR,"relation %u does not exist",part_slot->relid);
542545
}
543-
/* Otherwise it's time to exit */
544-
else
546+
547+
/* Make sure that relation has partitions */
548+
if (get_pathman_relation_info(part_slot->relid)==NULL)
545549
{
550+
/* Exit after we raise ERROR */
546551
failures_count=PART_WORKER_MAX_ATTEMPTS;
547552

548-
elog(LOG,"relation \"%u\" is not partitioned (or does not exist)",
549-
part_slot->relid);
553+
elog(ERROR,"relation \"%s\" is not partitioned",
554+
get_rel_name(part_slot->relid));
555+
}
556+
557+
/* Call concurrent partitioning function */
558+
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
559+
if (ret==SPI_OK_SELECT)
560+
{
561+
TupleDesctupdesc=SPI_tuptable->tupdesc;
562+
HeapTupletuple=SPI_tuptable->vals[0];
563+
564+
/* There should be 1 result at most */
565+
Assert(SPI_processed==1);
566+
567+
/* Extract number of processed rows */
568+
rows=DatumGetInt64(SPI_getbinval(tuple,tupdesc,1,&isnull));
569+
Assert(!isnull);/* ... and ofc it must not be NULL */
550570
}
571+
/* Else raise generic error */
572+
elseelog(ERROR,"partitioning function returned %u",ret);
573+
574+
/* Finally, unlock our partitioned table */
575+
UnlockRelationOid(part_slot->relid,lockmode);
551576
}
552577
PG_CATCH();
553578
{
554579
/*
555580
* The most common exception we can catch here is a deadlock with
556581
* concurrent user queries. Check that attempts count doesn't exceed
557-
* some reasonable value
582+
* some reasonable value.
558583
*/
559-
ErrorData*error;
560-
char*sleep_time_str;
584+
ErrorData*error;
585+
586+
/* Unlock relation if we caught ERROR too early */
587+
if (rel_locked)
588+
UnlockRelationOid(part_slot->relid,lockmode);
589+
590+
/* Increase number of failures and set 'failed' status */
591+
failures_count++;
592+
failed= true;
561593

562594
/* Switch to the original context & copy edata */
563595
MemoryContextSwitchTo(old_mcxt);
564596
error=CopyErrorData();
565597
FlushErrorState();
566598

567599
/* Print messsage for this BGWorker to server log */
568-
sleep_time_str=datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
569-
FLOAT8OID);
570-
failures_count++;
571600
ereport(LOG,
572601
(errmsg("%s: %s",concurrent_part_bgw,error->message),
573-
errdetail("attempt: %d/%d, sleep time: %s",
602+
errdetail("attempt: %d/%d, sleep time: %.2f",
574603
failures_count,
575604
PART_WORKER_MAX_ATTEMPTS,
576-
sleep_time_str)));
577-
pfree(sleep_time_str);/* free the time string */
605+
(float)part_slot->sleep_time)));
578606

607+
/* Finally, free error data */
579608
FreeErrorData(error);
580-
581-
/* Set 'failed' flag */
582-
failed= true;
583609
}
584610
PG_END_TRY();
585611

@@ -606,9 +632,10 @@ bgw_main_concurrent_part(Datum main_arg)
606632
/* Failed this time, wait */
607633
elseif (failed)
608634
{
609-
/* Abort transactionand sleep for a second*/
635+
/* Abort transaction */
610636
AbortCurrentTransaction();
611637

638+
/* Sleep for a specified amount of time (default 1s) */
612639
DirectFunctionCall1(pg_sleep,Float8GetDatum(part_slot->sleep_time));
613640
}
614641

@@ -626,8 +653,10 @@ bgw_main_concurrent_part(Datum main_arg)
626653

627654
#ifdefUSE_ASSERT_CHECKING
628655
/* Report debug message */
629-
elog(DEBUG1,"%s: relocated %d rows, total: "UINT64_FORMAT" [%u]",
630-
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
656+
elog(DEBUG1,"%s: "
657+
"relocated"INT64_FORMAT"rows, "
658+
"total: "INT64_FORMAT,
659+
concurrent_part_bgw,rows,part_slot->total_rows);
631660
#endif
632661
}
633662

@@ -636,9 +665,6 @@ bgw_main_concurrent_part(Datum main_arg)
636665
break;
637666
}
638667
while(rows>0||failed);/* do while there's still rows to be relocated */
639-
640-
/* Reclaim the resources */
641-
pfree(sql);
642668
}
643669

644670

@@ -824,26 +850,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824850
/* Iterate through worker slots */
825851
for (i=userctx->cur_idx;i<PART_WORKER_SLOTS;i++)
826852
{
827-
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
853+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i],
854+
slot_copy;
828855
HeapTuplehtup=NULL;
829856

830-
HOLD_INTERRUPTS();
857+
/* Copy slot to process local memory */
831858
SpinLockAcquire(&cur_slot->mutex);
859+
memcpy(&slot_copy,cur_slot,sizeof(ConcurrentPartSlot));
860+
SpinLockRelease(&cur_slot->mutex);
832861

833-
if (cur_slot->worker_status!=CPS_FREE)
862+
if (slot_copy.worker_status!=CPS_FREE)
834863
{
835864
Datumvalues[Natts_pathman_cp_tasks];
836865
boolisnull[Natts_pathman_cp_tasks]= {0 };
837866

838-
values[Anum_pathman_cp_tasks_userid-1]=cur_slot->userid;
839-
values[Anum_pathman_cp_tasks_pid-1]=cur_slot->pid;
840-
values[Anum_pathman_cp_tasks_dbid-1]=cur_slot->dbid;
841-
values[Anum_pathman_cp_tasks_relid-1]=cur_slot->relid;
842-
values[Anum_pathman_cp_tasks_processed-1]=cur_slot->total_rows;
867+
values[Anum_pathman_cp_tasks_userid-1]=slot_copy.userid;
868+
values[Anum_pathman_cp_tasks_pid-1]=slot_copy.pid;
869+
values[Anum_pathman_cp_tasks_dbid-1]=slot_copy.dbid;
870+
values[Anum_pathman_cp_tasks_relid-1]=slot_copy.relid;
871+
872+
/* Record processed rows */
873+
values[Anum_pathman_cp_tasks_processed-1]=
874+
/* FIXME: use Int64GetDatum() in release 1.5 */
875+
Int32GetDatum((int32)slot_copy.total_rows);
843876

844877
/* Now build a status string */
845878
values[Anum_pathman_cp_tasks_status-1]=
846-
CStringGetTextDatum(cps_print_status(cur_slot->worker_status));
879+
CStringGetTextDatum(cps_print_status(slot_copy.worker_status));
847880

848881
/* Form output tuple */
849882
htup=heap_form_tuple(funcctx->tuple_desc,values,isnull);
@@ -852,9 +885,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852885
userctx->cur_idx=i+1;
853886
}
854887

855-
SpinLockRelease(&cur_slot->mutex);
856-
RESUME_INTERRUPTS();
857-
858888
/* Return tuple if needed */
859889
if (htup)
860890
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(htup));

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp