Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitaa15d7d

Browse files
committed
Merge commit '6b00d812b9396353fff72d42181278c4bd19b68f' into PGPRO9_6
2 parentsbe43149 +6b00d81 commitaa15d7d

File tree

4 files changed

+136
-65
lines changed

4 files changed

+136
-65
lines changed

‎contrib/pg_pathman/expected/pathman_bgw.out

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,19 +181,38 @@ ROLLBACK;
181181
/* Wait until it finises */
182182
DO $$
183183
DECLARE
184-
ops int8;
184+
opsint8;
185+
rowsint8;
186+
rows_oldint8 := 0;
187+
iint4 := 0; -- protect from endless loop
185188
BEGIN
186189
LOOP
187-
SELECTcount(*)
190+
SELECTprocessed
188191
FROM pathman_concurrent_part_tasks
189-
WHERE processed < 500 -- protect from endless loops
190-
INTO ops;
192+
WHERE relid = 'test_bgw.conc_part'::regclass
193+
INTO rows;
194+
195+
-- get number of partitioning tasks
196+
GET DIAGNOSTICS ops = ROW_COUNT;
191197

192198
IF ops > 0 THEN
193199
PERFORM pg_sleep(0.2);
200+
201+
ASSERT rows IS NOT NULL;
202+
203+
IF rows_old = rows THEN
204+
i = i + 1;
205+
END IF;
194206
ELSE
195-
EXIT;
207+
EXIT; -- exit loop
196208
END IF;
209+
210+
IF i > 50 THEN
211+
RAISE WARNING 'looks like partitioning bgw is stuck!';
212+
EXIT; -- exit loop
213+
END IF;
214+
215+
rows_old = rows;
197216
END LOOP;
198217
END
199218
$$ LANGUAGE plpgsql;

‎contrib/pg_pathman/sql/pathman_bgw.sql

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,19 +95,38 @@ ROLLBACK;
9595
/* Wait until it finises*/
9696
DO $$
9797
DECLARE
98-
ops int8;
98+
opsint8;
99+
rowsint8;
100+
rows_oldint8 :=0;
101+
iint4 :=0;-- protect from endless loop
99102
BEGIN
100103
LOOP
101-
SELECTcount(*)
104+
SELECTprocessed
102105
FROM pathman_concurrent_part_tasks
103-
WHERE processed<500-- protect from endless loops
104-
INTO ops;
106+
WHERE relid='test_bgw.conc_part'::regclass
107+
INTO rows;
108+
109+
-- get number of partitioning tasks
110+
GET DIAGNOSTICS ops= ROW_COUNT;
105111

106112
IF ops>0 THEN
107113
PERFORM pg_sleep(0.2);
114+
115+
ASSERT rowsIS NOT NULL;
116+
117+
IF rows_old= rows THEN
118+
i= i+1;
119+
END IF;
108120
ELSE
109-
EXIT;
121+
EXIT;-- exit loop
110122
END IF;
123+
124+
IF i>50 THEN
125+
RAISE WARNING'looks like partitioning bgw is stuck!';
126+
EXIT;-- exit loop
127+
END IF;
128+
129+
rows_old= rows;
111130
END LOOP;
112131
END
113132
$$ LANGUAGE plpgsql;

‎contrib/pg_pathman/src/include/pathman_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ typedef struct
7474
pid_tpid;/* worker's PID */
7575
Oiddbid;/* database which contains the relation */
7676
Oidrelid;/* table to be partitioned concurrently */
77-
uint64total_rows;/* total amount of rows processed */
77+
int64total_rows;/* total amount of rows processed */
7878

7979
int32batch_size;/* number of rows in a batch */
8080
float8sleep_time;/* how long should we sleep in case of error? */

‎contrib/pg_pathman/src/pathman_workers.c

Lines changed: 87 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ bgw_main_spawn_partitions(Datum main_arg)
432432
staticvoid
433433
free_cps_slot(intcode,Datumarg)
434434
{
435-
ConcurrentPartSlot*part_slot=(ConcurrentPartSlot*)DatumGetPointer(arg);
435+
ConcurrentPartSlot*part_slot=(ConcurrentPartSlot*)DatumGetPointer(arg);
436436

437437
cps_set_status(part_slot,CPS_FREE);
438438
}
@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443443
void
444444
bgw_main_concurrent_part(Datummain_arg)
445445
{
446-
introws;
446+
ConcurrentPartSlot*part_slot;
447+
char*sql=NULL;
448+
int64rows;
447449
boolfailed;
448450
intfailures_count=0;
449-
char*sql=NULL;
450-
ConcurrentPartSlot*part_slot;
451+
LOCKMODElockmode=RowExclusiveLock;
451452

452453
/* Update concurrent part slot */
453454
part_slot=&concurrent_part_slots[DatumGetInt32(main_arg)];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479480
/* Do the job */
480481
do
481482
{
482-
MemoryContextold_mcxt;
483+
MemoryContextold_mcxt;
483484

484485
Oidtypes[2]= {OIDOID,INT4OID };
485486
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
486487
boolnulls[2]= { false,false };
487488

489+
boolrel_locked= false;
490+
488491
/* Reset loop variables */
489492
failed= false;
490493
rows=0;
@@ -520,66 +523,92 @@ bgw_main_concurrent_part(Datum main_arg)
520523
/* Exec ret = _partition_data_concurrent() */
521524
PG_TRY();
522525
{
523-
/* Make sure that relation exists and has partitions */
524-
if (SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid))&&
525-
get_pathman_relation_info(part_slot->relid)!=NULL)
526-
{
527-
intret;
528-
boolisnull;
526+
intret;
527+
boolisnull;
529528

530-
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
531-
if (ret==SPI_OK_SELECT)
532-
{
533-
TupleDesctupdesc=SPI_tuptable->tupdesc;
534-
HeapTupletuple=SPI_tuptable->vals[0];
529+
/* Lock relation for DELETE and INSERT */
530+
if (!ConditionalLockRelationOid(part_slot->relid,lockmode))
531+
{
532+
elog(ERROR,"could not take lock on relation %u",part_slot->relid);
533+
}
535534

536-
Assert(SPI_processed==1);/* there should be 1 result at most */
535+
/* Great, now relation is locked */
536+
rel_locked= true;
537+
(void)rel_locked;/* mute clang analyzer */
537538

538-
rows=DatumGetInt32(SPI_getbinval(tuple,tupdesc,1,&isnull));
539+
/* Make sure that relation exists */
540+
if (!SearchSysCacheExists1(RELOID,ObjectIdGetDatum(part_slot->relid)))
541+
{
542+
/* Exit after we raise ERROR */
543+
failures_count=PART_WORKER_MAX_ATTEMPTS;
544+
(void)failures_count;/* mute clang analyzer */
539545

540-
Assert(!isnull);/* ... and ofc it must not be NULL */
541-
}
546+
elog(ERROR,"relation %u does not exist",part_slot->relid);
542547
}
543-
/* Otherwise it's time to exit */
544-
else
548+
549+
/* Make sure that relation has partitions */
550+
if (get_pathman_relation_info(part_slot->relid)==NULL)
545551
{
552+
/* Exit after we raise ERROR */
546553
failures_count=PART_WORKER_MAX_ATTEMPTS;
554+
(void)failures_count;/* mute clang analyzer */
555+
556+
elog(ERROR,"relation \"%s\" is not partitioned",
557+
get_rel_name(part_slot->relid));
558+
}
559+
560+
/* Call concurrent partitioning function */
561+
ret=SPI_execute_with_args(sql,2,types,vals,nulls, false,0);
562+
if (ret==SPI_OK_SELECT)
563+
{
564+
TupleDesctupdesc=SPI_tuptable->tupdesc;
565+
HeapTupletuple=SPI_tuptable->vals[0];
547566

548-
elog(LOG,"relation \"%u\" is not partitioned (or does not exist)",
549-
part_slot->relid);
567+
/* There should be 1 result at most */
568+
Assert(SPI_processed==1);
569+
570+
/* Extract number of processed rows */
571+
rows=DatumGetInt64(SPI_getbinval(tuple,tupdesc,1,&isnull));
572+
Assert(!isnull);/* ... and ofc it must not be NULL */
550573
}
574+
/* Else raise generic error */
575+
elseelog(ERROR,"partitioning function returned %u",ret);
576+
577+
/* Finally, unlock our partitioned table */
578+
UnlockRelationOid(part_slot->relid,lockmode);
551579
}
552580
PG_CATCH();
553581
{
554582
/*
555583
* The most common exception we can catch here is a deadlock with
556584
* concurrent user queries. Check that attempts count doesn't exceed
557-
* some reasonable value
585+
* some reasonable value.
558586
*/
559-
ErrorData*error;
560-
char*sleep_time_str;
587+
ErrorData*error;
588+
589+
/* Unlock relation if we caught ERROR too early */
590+
if (rel_locked)
591+
UnlockRelationOid(part_slot->relid,lockmode);
592+
593+
/* Increase number of failures and set 'failed' status */
594+
failures_count++;
595+
failed= true;
561596

562597
/* Switch to the original context & copy edata */
563598
MemoryContextSwitchTo(old_mcxt);
564599
error=CopyErrorData();
565600
FlushErrorState();
566601

567602
/* Print messsage for this BGWorker to server log */
568-
sleep_time_str=datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
569-
FLOAT8OID);
570-
failures_count++;
571603
ereport(LOG,
572604
(errmsg("%s: %s",concurrent_part_bgw,error->message),
573-
errdetail("attempt: %d/%d, sleep time: %s",
605+
errdetail("attempt: %d/%d, sleep time: %.2f",
574606
failures_count,
575607
PART_WORKER_MAX_ATTEMPTS,
576-
sleep_time_str)));
577-
pfree(sleep_time_str);/* free the time string */
608+
(float)part_slot->sleep_time)));
578609

610+
/* Finally, free error data */
579611
FreeErrorData(error);
580-
581-
/* Set 'failed' flag */
582-
failed= true;
583612
}
584613
PG_END_TRY();
585614

@@ -606,9 +635,10 @@ bgw_main_concurrent_part(Datum main_arg)
606635
/* Failed this time, wait */
607636
elseif (failed)
608637
{
609-
/* Abort transactionand sleep for a second*/
638+
/* Abort transaction */
610639
AbortCurrentTransaction();
611640

641+
/* Sleep for a specified amount of time (default 1s) */
612642
DirectFunctionCall1(pg_sleep,Float8GetDatum(part_slot->sleep_time));
613643
}
614644

@@ -626,8 +656,10 @@ bgw_main_concurrent_part(Datum main_arg)
626656

627657
#ifdefUSE_ASSERT_CHECKING
628658
/* Report debug message */
629-
elog(DEBUG1,"%s: relocated %d rows, total: "UINT64_FORMAT" [%u]",
630-
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
659+
elog(DEBUG1,"%s: "
660+
"relocated"INT64_FORMAT"rows, "
661+
"total: "INT64_FORMAT,
662+
concurrent_part_bgw,rows,part_slot->total_rows);
631663
#endif
632664
}
633665

@@ -636,9 +668,6 @@ bgw_main_concurrent_part(Datum main_arg)
636668
break;
637669
}
638670
while(rows>0||failed);/* do while there's still rows to be relocated */
639-
640-
/* Reclaim the resources */
641-
pfree(sql);
642671
}
643672

644673

@@ -824,26 +853,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824853
/* Iterate through worker slots */
825854
for (i=userctx->cur_idx;i<PART_WORKER_SLOTS;i++)
826855
{
827-
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
856+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i],
857+
slot_copy;
828858
HeapTuplehtup=NULL;
829859

830-
HOLD_INTERRUPTS();
860+
/* Copy slot to process local memory */
831861
SpinLockAcquire(&cur_slot->mutex);
862+
memcpy(&slot_copy,cur_slot,sizeof(ConcurrentPartSlot));
863+
SpinLockRelease(&cur_slot->mutex);
832864

833-
if (cur_slot->worker_status!=CPS_FREE)
865+
if (slot_copy.worker_status!=CPS_FREE)
834866
{
835867
Datumvalues[Natts_pathman_cp_tasks];
836868
boolisnull[Natts_pathman_cp_tasks]= {0 };
837869

838-
values[Anum_pathman_cp_tasks_userid-1]=cur_slot->userid;
839-
values[Anum_pathman_cp_tasks_pid-1]=cur_slot->pid;
840-
values[Anum_pathman_cp_tasks_dbid-1]=cur_slot->dbid;
841-
values[Anum_pathman_cp_tasks_relid-1]=cur_slot->relid;
842-
values[Anum_pathman_cp_tasks_processed-1]=cur_slot->total_rows;
870+
values[Anum_pathman_cp_tasks_userid-1]=slot_copy.userid;
871+
values[Anum_pathman_cp_tasks_pid-1]=slot_copy.pid;
872+
values[Anum_pathman_cp_tasks_dbid-1]=slot_copy.dbid;
873+
values[Anum_pathman_cp_tasks_relid-1]=slot_copy.relid;
874+
875+
/* Record processed rows */
876+
values[Anum_pathman_cp_tasks_processed-1]=
877+
/* FIXME: use Int64GetDatum() in release 1.5 */
878+
Int32GetDatum((int32)slot_copy.total_rows);
843879

844880
/* Now build a status string */
845881
values[Anum_pathman_cp_tasks_status-1]=
846-
CStringGetTextDatum(cps_print_status(cur_slot->worker_status));
882+
CStringGetTextDatum(cps_print_status(slot_copy.worker_status));
847883

848884
/* Form output tuple */
849885
htup=heap_form_tuple(funcctx->tuple_desc,values,isnull);
@@ -852,9 +888,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852888
userctx->cur_idx=i+1;
853889
}
854890

855-
SpinLockRelease(&cur_slot->mutex);
856-
RESUME_INTERRUPTS();
857-
858891
/* Return tuple if needed */
859892
if (htup)
860893
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(htup));

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp