Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit788d41c

Browse files
committed
bugfixes for concurrent partitioning
1 parent0fe96ca commit788d41c

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

‎src/pathman_workers.c

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512
if (failures_count++ >=PART_WORKER_MAX_ATTEMPTS)
513513
{
514514
/* Mark slot as FREE */
515-
cps_set_status(part_slot,WS_FREE);
515+
cps_set_status(part_slot,CPS_FREE);
516516

517517
elog(LOG,
518518
"Concurrent partitioning worker has canceled the task because "
@@ -555,7 +555,7 @@ bgw_main_concurrent_part(Datum main_arg)
555555
}
556556

557557
/* If other backend requested to stop us, quit */
558-
if (cps_check_status(part_slot)==WS_STOPPING)
558+
if (cps_check_status(part_slot)==CPS_STOPPING)
559559
break;
560560
}
561561
while(rows>0||failed);/* do while there's still rows to be relocated */
@@ -564,7 +564,7 @@ bgw_main_concurrent_part(Datum main_arg)
564564
pfree(sql);
565565

566566
/* Mark slot as FREE */
567-
cps_set_status(part_slot,WS_FREE);
567+
cps_set_status(part_slot,CPS_FREE);
568568
}
569569

570570

@@ -603,7 +603,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
603603

604604
SpinLockAcquire(&cur_slot->mutex);
605605

606-
if (empty_slot_idx<0)
606+
/* Should we take this slot into account? */
607+
if (empty_slot_idx<0&&cur_slot->worker_status==CPS_FREE)
607608
{
608609
empty_slot_idx=i;
609610
keep_this_lock= true;
@@ -630,7 +631,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
630631
{
631632
/* Initialize concurrent part slot */
632633
InitConcurrentPartSlot(&concurrent_part_slots[empty_slot_idx],
633-
GetAuthenticatedUserId(),WS_WORKING,
634+
GetAuthenticatedUserId(),CPS_WORKING,
634635
MyDatabaseId,relid,1000,1.0);
635636

636637
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
@@ -707,12 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
707708
for (i=userctx->cur_idx;i<PART_WORKER_SLOTS;i++)
708709
{
709710
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
711+
HeapTuplehtup=NULL;
710712

713+
HOLD_INTERRUPTS();
711714
SpinLockAcquire(&cur_slot->mutex);
712715

713-
if (cur_slot->worker_status!=WS_FREE)
716+
if (cur_slot->worker_status!=CPS_FREE)
714717
{
715-
HeapTupletuple;
716718
Datumvalues[Natts_pathman_cp_tasks];
717719
boolisnull[Natts_pathman_cp_tasks]= {0 };
718720

@@ -725,12 +727,12 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
725727
/* Now build a status string */
726728
switch(cur_slot->worker_status)
727729
{
728-
caseWS_WORKING:
730+
caseCPS_WORKING:
729731
values[Anum_pathman_cp_tasks_status-1]=
730732
PointerGetDatum(cstring_to_text("working"));
731733
break;
732734

733-
caseWS_STOPPING:
735+
caseCPS_STOPPING:
734736
values[Anum_pathman_cp_tasks_status-1]=
735737
PointerGetDatum(cstring_to_text("stopping"));
736738
break;
@@ -741,15 +743,18 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
741743
}
742744

743745
/* Form output tuple */
744-
tuple=heap_form_tuple(funcctx->tuple_desc,values,isnull);
746+
htup=heap_form_tuple(funcctx->tuple_desc,values,isnull);
745747

746748
/* Switch to next worker */
747749
userctx->cur_idx=i+1;
748-
749-
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(tuple));
750750
}
751751

752752
SpinLockRelease(&cur_slot->mutex);
753+
RESUME_INTERRUPTS();
754+
755+
/* Return tuple if needed */
756+
if (htup)
757+
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(htup));
753758
}
754759

755760
SRF_RETURN_DONE(funcctx);
@@ -770,19 +775,22 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
770775
{
771776
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
772777

778+
HOLD_INTERRUPTS();
773779
SpinLockAcquire(&cur_slot->mutex);
774780

775-
if (cur_slot->worker_status!=WS_FREE&&
781+
if (cur_slot->worker_status!=CPS_FREE&&
776782
cur_slot->relid==relid&&
777783
cur_slot->dbid==MyDatabaseId)
778784
{
779785
elog(NOTICE,"Worker will stop after it finishes current batch");
780786

781-
cur_slot->worker_status=WS_STOPPING;
787+
/* Change worker's state & set 'worker_found' */
788+
cur_slot->worker_status=CPS_STOPPING;
782789
worker_found= true;
783790
}
784791

785792
SpinLockRelease(&cur_slot->mutex);
793+
RESUME_INTERRUPTS();
786794
}
787795

788796
if (worker_found)

‎src/pathman_workers.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ typedef struct
4444

4545
typedefenum
4646
{
47-
WS_FREE=0,/* slot is empty */
48-
WS_WORKING,/* occupied by live worker */
49-
WS_STOPPING/* worker is going to shutdown */
47+
CPS_FREE=0,/* slot is empty */
48+
CPS_WORKING,/* occupied by live worker */
49+
CPS_STOPPING/* worker is going to shutdown */
5050

5151
}ConcurrentPartSlotStatus;
5252

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp