Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfe96482

Browse files
committed
use atomic flag for worker slots
1 parentdbcce6e commitfe96482

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

‎src/init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ pathman_config_contains_relation(Oid relid, Datum *values, bool *isnull,
622622
}
623623

624624
/*
625-
*Return'enable_parent'parameter of relation
625+
*Loads additional pathman parameters like'enable_parent'or 'auto'
626626
*/
627627
bool
628628
read_pathman_params(Oidrelid,Datum*values,bool*isnull)

‎src/pathman_workers.c

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,19 @@ init_concurrent_part_task_slots(void)
9595
{
9696
boolfound;
9797
Sizesize=estimate_concurrent_part_task_slots_size();
98+
inti;
9899

99100
concurrent_part_slots= (ConcurrentPartSlot*)
100101
ShmemInitStruct("array of ConcurrentPartSlots",size,&found);
101102

102103
/* Initialize 'concurrent_part_slots' if needed */
103-
if (!found)memset(concurrent_part_slots,0,size);
104+
if (!found)
105+
{
106+
memset(concurrent_part_slots,0,size);
107+
108+
for (i=0;i<PART_WORKER_SLOTS;i++)
109+
pg_atomic_init_flag_impl(&concurrent_part_slots[i].slot_used);
110+
}
104111
}
105112

106113

@@ -423,9 +430,9 @@ bgw_main_concurrent_part(Datum main_arg)
423430
{
424431
MemoryContextold_mcxt;
425432

426-
Oidtypes[2]= {OIDOID,INT4OID };
427-
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
428-
boolnulls[2]= { false,false };
433+
Oidtypes[2]= {OIDOID,INT4OID };
434+
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
435+
boolnulls[2]= { false,false };
429436

430437
/* Reset loop variables */
431438
failed= false;
@@ -506,6 +513,7 @@ bgw_main_concurrent_part(Datum main_arg)
506513
{
507514
/* Mark slot as FREE */
508515
part_slot->worker_status=WS_FREE;
516+
pg_atomic_clear_flag(&part_slot->slot_used);
509517

510518
elog(LOG,
511519
"Concurrent partitioning worker has canceled the task because "
@@ -561,7 +569,10 @@ bgw_main_concurrent_part(Datum main_arg)
561569

562570
/* Reclaim the resources */
563571
pfree(sql);
572+
573+
/* Set slot free */
564574
part_slot->worker_status=WS_FREE;
575+
pg_atomic_clear_flag(&part_slot->slot_used);
565576
}
566577

567578

@@ -596,16 +607,24 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
596607
*/
597608
for (i=0;i<PART_WORKER_SLOTS;i++)
598609
{
599-
if (concurrent_part_slots[i].worker_status==WS_FREE)
610+
/*
611+
* Attempt to acquire the flag. If it has alread been used then skip
612+
* this slot and try another one
613+
*/
614+
if (!pg_atomic_test_set_flag(&concurrent_part_slots[i].slot_used))
615+
continue;
616+
617+
/* If atomic flag wasn't used then status should be WS_FREE */
618+
Assert(concurrent_part_slots[i].worker_status==WS_FREE);
619+
620+
if (empty_slot_idx<0)
600621
{
601-
if (empty_slot_idx<0)
602-
{
603-
my_slot=&concurrent_part_slots[i];
604-
empty_slot_idx=i;
605-
}
622+
my_slot=&concurrent_part_slots[i];
623+
empty_slot_idx=i;
606624
}
607-
elseif (concurrent_part_slots[i].relid==relid&&
608-
concurrent_part_slots[i].dbid==MyDatabaseId)
625+
626+
if (concurrent_part_slots[i].relid==relid&&
627+
concurrent_part_slots[i].dbid==MyDatabaseId)
609628
{
610629
elog(ERROR,
611630
"Table \"%s\" is already being partitioned",
@@ -745,13 +764,16 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
745764
{
746765
Oidrelid=PG_GETARG_OID(0);
747766
inti;
767+
ConcurrentPartSlot*slot;
748768

749769
for (i=0;i<PART_WORKER_SLOTS;i++)
750-
if (concurrent_part_slots[i].worker_status!=WS_FREE&&
751-
concurrent_part_slots[i].relid==relid&&
752-
concurrent_part_slots[i].dbid==MyDatabaseId)
770+
slot=&concurrent_part_slots[i];
771+
772+
if (slot->worker_status!=WS_FREE&&
773+
slot->relid==relid&&
774+
slot->dbid==MyDatabaseId)
753775
{
754-
concurrent_part_slots[i].worker_status=WS_STOPPING;
776+
slot->worker_status=WS_STOPPING;
755777
elog(NOTICE,"Worker will stop after it finishes current batch");
756778

757779
PG_RETURN_BOOL(true);

‎src/pathman_workers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct
4646
*/
4747
typedefstruct
4848
{
49+
pg_atomic_flagslot_used;/* flag for atomic slot acquirement */
4950
Oiduserid;/* connect as a specified user */
5051

5152
enum
@@ -57,7 +58,7 @@ typedef struct
5758
}worker_status;/* status of a particular worker */
5859

5960
pid_tpid;/* worker's PID */
60-
Oiddbid;/* database which containsrelation 'relid' */
61+
Oiddbid;/* database which containsthe relation */
6162
Oidrelid;/* table to be partitioned concurrently */
6263
uint64total_rows;/* total amount of rows processed */
6364

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp