Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9b80448

Browse files
committed
concurrent partitioning subsystem now uses spinlocks
1 parent67ff4a8 commit9b80448

File tree

2 files changed

+102
-65
lines changed

2 files changed

+102
-65
lines changed

‎src/pathman_workers.c

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ init_concurrent_part_task_slots(void)
106106
memset(concurrent_part_slots,0,size);
107107

108108
for (i=0;i<PART_WORKER_SLOTS;i++)
109-
pg_atomic_init_flag_impl(&concurrent_part_slots[i].slot_used);
109+
SpinLockInit(&concurrent_part_slots[i].mutex);
110110
}
111111
}
112112

@@ -235,10 +235,10 @@ start_bg_worker(const char bgworker_name[BGW_MAXLEN],
235235
staticdsm_segment*
236236
create_partitions_bg_worker_segment(Oidrelid,Datumvalue,Oidvalue_type)
237237
{
238-
TypeCacheEntry*typcache;
239-
Sizedatum_size;
240-
Sizesegment_size;
241-
dsm_segment*segment;
238+
TypeCacheEntry*typcache;
239+
Sizedatum_size;
240+
Sizesegment_size;
241+
dsm_segment*segment;
242242
SpawnPartitionArgs*args;
243243

244244
typcache=lookup_type_cache(value_type,0);
@@ -314,10 +314,10 @@ create_partitions_bg_worker(Oid relid, Datum value, Oid value_type)
314314
staticvoid
315315
bgw_main_spawn_partitions(Datummain_arg)
316316
{
317-
dsm_handlehandle=DatumGetUInt32(main_arg);
318-
dsm_segment*segment;
319-
SpawnPartitionArgs*args;
320-
Datumvalue;
317+
dsm_handlehandle=DatumGetUInt32(main_arg);
318+
dsm_segment*segment;
319+
SpawnPartitionArgs*args;
320+
Datumvalue;
321321

322322
/* Establish signal handlers before unblocking signals. */
323323
pqsignal(SIGTERM,handle_sigterm);
@@ -512,8 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512
if (failures_count++ >=PART_WORKER_MAX_ATTEMPTS)
513513
{
514514
/* Mark slot as FREE */
515-
part_slot->worker_status=WS_FREE;
516-
pg_atomic_clear_flag(&part_slot->slot_used);
515+
cps_set_status(part_slot,WS_FREE);
517516

518517
elog(LOG,
519518
"Concurrent partitioning worker has canceled the task because "
@@ -534,14 +533,6 @@ bgw_main_concurrent_part(Datum main_arg)
534533

535534
if (failed)
536535
{
537-
#ifdefUSE_ASSERT_CHECKING
538-
elog(DEBUG1,"%s: could not relocate batch (%d/%d), total: %lu [%u]",
539-
concurrent_part_bgw,
540-
failures_count,PART_WORKER_MAX_ATTEMPTS,/* current/max */
541-
part_slot->total_rows,
542-
MyProcPid);
543-
#endif
544-
545536
/* Abort transaction and sleep for a second */
546537
AbortCurrentTransaction();
547538
DirectFunctionCall1(pg_sleep,Float8GetDatum(part_slot->sleep_time));
@@ -553,26 +544,27 @@ bgw_main_concurrent_part(Datum main_arg)
553544
failures_count=0;
554545

555546
/* Add rows to total_rows */
547+
SpinLockAcquire(&part_slot->mutex);
556548
part_slot->total_rows+=rows;
557-
549+
/* Report debug message */
558550
#ifdefUSE_ASSERT_CHECKING
559551
elog(DEBUG1,"%s: relocated %d rows, total: %lu [%u]",
560552
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
561553
#endif
554+
SpinLockRelease(&part_slot->mutex);
562555
}
563556

564557
/* If other backend requested to stop us, quit */
565-
if (part_slot->worker_status==WS_STOPPING)
558+
if (cps_check_status(part_slot)==WS_STOPPING)
566559
break;
567560
}
568561
while(rows>0||failed);/* do while there's still rows to be relocated */
569562

570563
/* Reclaim the resources */
571564
pfree(sql);
572565

573-
/* Set slot free */
574-
part_slot->worker_status=WS_FREE;
575-
pg_atomic_clear_flag(&part_slot->slot_used);
566+
/* Mark slot as FREE */
567+
cps_set_status(part_slot,WS_FREE);
576568
}
577569

578570

@@ -589,12 +581,11 @@ bgw_main_concurrent_part(Datum main_arg)
589581
Datum
590582
partition_table_concurrently(PG_FUNCTION_ARGS)
591583
{
592-
#definetostr(str) ( #str )
584+
#definetostr(str) ( #str )/* convert function's name to literal */
593585

594-
Oidrelid=PG_GETARG_OID(0);
595-
ConcurrentPartSlot*my_slot=NULL;
596-
intempty_slot_idx=-1;
597-
inti;
586+
Oidrelid=PG_GETARG_OID(0);
587+
intempty_slot_idx=-1;
588+
inti;
598589

599590
/* Check if relation is a partitioned table */
600591
shout_if_prel_is_invalid(relid,
@@ -607,38 +598,43 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
607598
*/
608599
for (i=0;i<PART_WORKER_SLOTS;i++)
609600
{
610-
/*
611-
* Attempt to acquire the flag. If it has alread been used then skip
612-
* this slot and try another one
613-
*/
614-
if (!pg_atomic_test_set_flag(&concurrent_part_slots[i].slot_used))
615-
continue;
601+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
602+
boolkeep_this_lock= false;
616603

617-
/* If atomic flag wasn't used then status should be WS_FREE */
618-
Assert(concurrent_part_slots[i].worker_status==WS_FREE);
604+
SpinLockAcquire(&cur_slot->mutex);
619605

620606
if (empty_slot_idx<0)
621607
{
622-
my_slot=&concurrent_part_slots[i];
623608
empty_slot_idx=i;
609+
keep_this_lock= true;
624610
}
625611

626-
if (concurrent_part_slots[i].relid==relid&&
627-
concurrent_part_slots[i].dbid==MyDatabaseId)
612+
if (cur_slot->relid==relid&&
613+
cur_slot->dbid==MyDatabaseId)
628614
{
615+
if (empty_slot_idx >=0)
616+
SpinLockRelease(&cur_slot->mutex);
617+
629618
elog(ERROR,
630619
"Table \"%s\" is already being partitioned",
631620
get_rel_name(relid));
632621
}
622+
623+
if (!keep_this_lock)
624+
SpinLockRelease(&cur_slot->mutex);
633625
}
634626

635-
if (my_slot==NULL)
627+
if (empty_slot_idx<0)
636628
elog(ERROR,"No empty worker slots found");
629+
else
630+
{
631+
/* Initialize concurrent part slot */
632+
InitConcurrentPartSlot(&concurrent_part_slots[empty_slot_idx],
633+
GetAuthenticatedUserId(),WS_WORKING,
634+
MyDatabaseId,relid,1000,1.0);
637635

638-
/* Initialize concurrent part slot */
639-
InitConcurrentPartSlot(my_slot,GetAuthenticatedUserId(),
640-
WS_WORKING,MyDatabaseId,relid,
641-
1000,1.0);
636+
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
637+
}
642638

643639
/* Start worker (we should not wait) */
644640
start_bg_worker(concurrent_part_bgw,
@@ -712,11 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
712708
{
713709
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
714710

711+
SpinLockAcquire(&cur_slot->mutex);
712+
715713
if (cur_slot->worker_status!=WS_FREE)
716714
{
717715
HeapTupletuple;
718716
Datumvalues[Natts_pathman_cp_tasks];
719-
boolisnull[Natts_pathman_cp_tasks]= {0,0,0,0,0,0 };
717+
boolisnull[Natts_pathman_cp_tasks]= {0 };
720718

721719
values[Anum_pathman_cp_tasks_userid-1]=cur_slot->userid;
722720
values[Anum_pathman_cp_tasks_pid-1]=cur_slot->pid;
@@ -750,6 +748,8 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
750748

751749
SRF_RETURN_NEXT(funcctx,HeapTupleGetDatum(tuple));
752750
}
751+
752+
SpinLockRelease(&cur_slot->mutex);
753753
}
754754

755755
SRF_RETURN_DONE(funcctx);
@@ -763,22 +763,35 @@ Datum
763763
stop_concurrent_part_task(PG_FUNCTION_ARGS)
764764
{
765765
Oidrelid=PG_GETARG_OID(0);
766+
boolworker_found= false;
766767
inti;
767-
ConcurrentPartSlot*slot;
768768

769-
for (i=0;i<PART_WORKER_SLOTS;i++)
770-
slot=&concurrent_part_slots[i];
769+
for (i=0;i<PART_WORKER_SLOTS&& !worker_found;i++)
770+
{
771+
ConcurrentPartSlot*cur_slot=&concurrent_part_slots[i];
772+
773+
SpinLockAcquire(&cur_slot->mutex);
771774

772-
if (slot->worker_status!=WS_FREE&&
773-
slot->relid==relid&&
774-
slot->dbid==MyDatabaseId)
775+
if (cur_slot->worker_status!=WS_FREE&&
776+
cur_slot->relid==relid&&
777+
cur_slot->dbid==MyDatabaseId)
775778
{
776-
slot->worker_status=WS_STOPPING;
777779
elog(NOTICE,"Worker will stop after it finishes current batch");
778780

779-
PG_RETURN_BOOL(true);
781+
cur_slot->worker_status=WS_STOPPING;
782+
worker_found= true;
780783
}
781784

782-
elog(ERROR,"Cannot find worker for relation \"%s\"",
783-
get_rel_name_or_relid(relid));
785+
SpinLockRelease(&cur_slot->mutex);
786+
}
787+
788+
if (worker_found)
789+
PG_RETURN_BOOL(true);
790+
else
791+
{
792+
elog(ERROR,"Cannot find worker for relation \"%s\"",
793+
get_rel_name_or_relid(relid));
794+
795+
PG_RETURN_BOOL(false);/* keep compiler happy */
796+
}
784797
}

‎src/pathman_workers.h

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#definePATHMAN_WORKERS_H
1919

2020
#include"postgres.h"
21+
#include"storage/spin.h"
2122

2223

2324
/*
@@ -41,22 +42,24 @@ typedef struct
4142
}SpawnPartitionArgs;
4243

4344

45+
typedefenum
46+
{
47+
WS_FREE=0,/* slot is empty */
48+
WS_WORKING,/* occupied by live worker */
49+
WS_STOPPING/* worker is going to shutdown */
50+
51+
}ConcurrentPartSlotStatus;
52+
4453
/*
4554
* Store args and execution status of a single ConcurrentPartWorker.
4655
*/
4756
typedefstruct
4857
{
49-
pg_atomic_flagslot_used;/* flag for atomic slot acquirement */
50-
Oiduserid;/* connect as a specified user */
58+
slock_tmutex;/* protect slot from race conditions */
5159

52-
enum
53-
{
54-
WS_FREE=0,/* slot is empty */
55-
WS_WORKING,/* occupied by live worker */
56-
WS_STOPPING/* worker is going to shutdown */
57-
58-
}worker_status;/* status of a particular worker */
60+
ConcurrentPartSlotStatusworker_status;/* status of a particular worker */
5961

62+
Oiduserid;/* connect as a specified user */
6063
pid_tpid;/* worker's PID */
6164
Oiddbid;/* database which contains the relation */
6265
Oidrelid;/* table to be partitioned concurrently */
@@ -78,6 +81,27 @@ typedef struct
7881
(slot)->sleep_time = (sleep_t); \
7982
} while (0)
8083

84+
staticinlineConcurrentPartSlotStatus
85+
cps_check_status(ConcurrentPartSlot*slot)
86+
{
87+
ConcurrentPartSlotStatusstatus;
88+
89+
SpinLockAcquire(&slot->mutex);
90+
status=slot->worker_status;
91+
SpinLockRelease(&slot->mutex);
92+
93+
returnstatus;
94+
}
95+
96+
staticinlinevoid
97+
cps_set_status(ConcurrentPartSlot*slot,ConcurrentPartSlotStatusstatus)
98+
{
99+
SpinLockAcquire(&slot->mutex);
100+
slot->worker_status=status;
101+
SpinLockRelease(&slot->mutex);
102+
}
103+
104+
81105

82106
/* Number of worker slots for concurrent partitioning */
83107
#definePART_WORKER_SLOTS10

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp