Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1e1c75b

Browse files
committed
Merge Dmitry's patch for split_range_partitions concurrency
1 parent2567c39 commit1e1c75b

File tree

7 files changed

+122
-21
lines changed

7 files changed

+122
-21
lines changed

‎.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ pg_pathman--*.sql
1313
tags
1414
cscope*
1515
Dockerfile
16+
testgres

‎src/hooks.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ pathman_shmem_startup_hook(void)
706706
void
707707
pathman_relcache_hook(Datumarg,Oidrelid)
708708
{
709-
Oidparent_relid;
709+
Oidparent_relid;
710+
boolparent_found;
710711

711712
/* Hooks can be disabled */
712713
if (!pathman_hooks_enabled)
@@ -732,6 +733,9 @@ pathman_relcache_hook(Datum arg, Oid relid)
732733
/* It *might have been a partition*, invalidate parent */
733734
if (OidIsValid(parent_relid))
734735
{
736+
/* Invalidate PartRelationInfo cache if needed */
737+
invalidate_pathman_relation_info(parent_relid,&parent_found);
738+
735739
delay_invalidation_parent_rel(parent_relid);
736740

737741
elog(DEBUG2,"Invalidation message for partition %u [%u]",

‎src/include/partition_filter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ typedef struct
4040
Oidpartid;/* partition's relid */
4141
ResultRelInfo*result_rel_info;/* cached ResultRelInfo */
4242
TupleConversionMap*tuple_map;/* tuple conversion map (parent => child) */
43+
boolfirst_time;/* did this entry exist? */
4344
}ResultRelInfoHolder;
4445

4546

@@ -133,6 +134,7 @@ Oid * find_partitions_for_value(Datum value, Oid value_type,
133134
constPartRelationInfo*prel,
134135
int*nparts);
135136

137+
/* Transform partition's Oid into ResultRelInfo */
136138
ResultRelInfoHolder*select_partition_for_insert(Datumvalue,Oidvalue_type,
137139
constPartRelationInfo*prel,
138140
ResultPartsStorage*parts_storage,

‎src/partition_filter.c

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include"catalog/pg_type.h"
2020
#include"foreign/fdwapi.h"
2121
#include"foreign/foreign.h"
22+
#include"miscadmin.h"
2223
#include"nodes/nodeFuncs.h"
2324
#include"rewrite/rewriteManip.h"
2425
#include"utils/guc.h"
@@ -239,6 +240,9 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
239240
(constvoid*)&partid,
240241
HASH_ENTER,&found);
241242

243+
/* Don't execute recheck_partition_for_value() every time */
244+
rri_holder->first_time= !found;
245+
242246
/* If not found, create & cache new ResultRelInfo */
243247
if (!found)
244248
{
@@ -432,43 +436,52 @@ select_partition_for_insert(Datum value, Oid value_type,
432436

433437
do
434438
{
439+
boolrefresh_prel;
440+
435441
/* Search for matching partitions */
436442
parts=find_partitions_for_value(value,value_type,prel,&nparts);
437443

438444
if (nparts>1)
439445
elog(ERROR,ERR_PART_ATTR_MULTIPLE);
440446
elseif (nparts==0)
441447
{
442-
selected_partid=create_partitions_for_value(parent_relid,
443-
value,value_type);
448+
selected_partid=create_partitions_for_value(parent_relid,
449+
value,value_type);
444450

445-
/* get_pathman_relation_info() will refresh this entry */
446-
invalidate_pathman_relation_info(parent_relid,NULL);
451+
/* get_pathman_relation_info() will refresh this entry */
452+
invalidate_pathman_relation_info(parent_relid,NULL);
447453
}
448454
elseselected_partid=parts[0];
449455

450-
/* Replace parent table with a suitable partition */
456+
pfree(parts);
457+
458+
/* Get ResultRelationInfo of a suitable partition (may invalidate 'prel') */
451459
old_mcxt=MemoryContextSwitchTo(estate->es_query_cxt);
452460
rri_holder=scan_result_parts_storage(selected_partid,parts_storage);
453461
MemoryContextSwitchTo(old_mcxt);
454462

455-
/* This partition has been dropped, repeat with a new 'prel' */
456-
if (rri_holder==NULL)
463+
/*
464+
* We'd like to refresh PartRelationInfo if:
465+
*1) Partition we've just chosen had been removed;
466+
*2) It changed and we've chosen a visible partition for the 1st time.
467+
*/
468+
refresh_prel= !PointerIsValid(rri_holder)||
469+
(!PrelIsValid(prel)&&rri_holder->first_time&&nparts>0);
470+
471+
if (refresh_prel)
457472
{
458-
/* get_pathman_relation_info() will refresh this entry */
459473
invalidate_pathman_relation_info(parent_relid,NULL);
460474

461475
/* Get a fresh PartRelationInfo */
462476
prel=get_pathman_relation_info(parent_relid);
477+
shout_if_prel_is_invalid(parent_relid,prel,PT_ANY);
463478

464-
/* Paranoid check (all partitions have vanished) */
465-
if (!prel)
466-
elog(ERROR,"table \"%s\" is not partitioned",
467-
get_rel_name_or_relid(parent_relid));
479+
/* Reset selected partition */
480+
rri_holder=NULL;
468481
}
469482
}
470483
/* Loop until we get some result */
471-
while (rri_holder==NULL);
484+
while (!PointerIsValid(rri_holder));
472485

473486
returnrri_holder;
474487
}

‎src/pg_pathman.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,10 @@ walk_expr_tree(Expr *expr, const WalkerContext *context)
778778
{
779779
WrapperNode*result= (WrapperNode*)palloc0(sizeof(WrapperNode));
780780

781+
Assert(PrelIsValid(context->prel));
782+
783+
CHECK_FOR_INTERRUPTS();
784+
781785
switch (nodeTag(expr))
782786
{
783787
/* Useful for INSERT optimization */
@@ -897,6 +901,8 @@ handle_const(const Const *c,
897901
{
898902
constPartRelationInfo*prel=context->prel;
899903

904+
CHECK_FOR_INTERRUPTS();
905+
900906
/* Deal with missing strategy */
901907
if (strategy==0)
902908
gotohandle_const_return;
@@ -957,7 +963,7 @@ handle_const(const Const *c,
957963
&cast_success);
958964

959965
if (!cast_success)
960-
elog(ERROR,"Cannot select partition: "
966+
elog(ERROR,"cannot select partition: "
961967
"unable to perform type cast");
962968
}
963969
/* Else use the Const's value */

‎src/pl_range_funcs.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -816,15 +816,15 @@ drop_range_partition_expand_next(PG_FUNCTION_ARGS)
816816
/* Expand next partition if it exists */
817817
if (i<PrelChildrenCount(prel)-1)
818818
{
819-
RangeEntry*cur=&ranges[i],
820-
*next=&ranges[i+1];
819+
RangeEntrycur=ranges[i],
820+
next=ranges[i+1];
821821

822822
/* Drop old constraint and create a new one */
823-
modify_range_constraint(next->child_oid,
824-
prel->expr_cstr,
823+
modify_range_constraint(next.child_oid,
824+
pstrdup(prel->expr_cstr),
825825
prel->ev_type,
826-
&cur->min,
827-
&next->max);
826+
&cur.min,
827+
&next.max);
828828
}
829829

830830
/* Finally drop this partition */

‎tests/python/partitioning_test.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,81 @@ def con2_thread():
806806
node.stop()
807807
node.cleanup()
808808

809+
deftest_conc_part_split_insert(self):
810+
"""Test concurrent split_range_partition() + INSERT"""
811+
812+
# Create and start new instance
813+
node=self.start_new_pathman_cluster(allows_streaming=False)
814+
815+
# Create table 'ins_test' and partition it
816+
withnode.connect()ascon0:
817+
con0.begin()
818+
con0.execute('create table ins_test(val int not null)')
819+
con0.execute("select create_range_partitions('ins_test', 'val', 1, 10, 10)")
820+
con0.commit()
821+
822+
# Create two separate connections for this test
823+
withnode.connect()ascon1,node.connect()ascon2:
824+
825+
# Thread for connection #2 (it has to wait)
826+
defcon2_thread():
827+
con2.begin()
828+
con2.execute('insert into ins_test values (3), (6)')
829+
con2.commit()
830+
831+
# Step 1: initilize con1
832+
con1.begin()
833+
con1.execute('select count(*) from ins_test')# load pathman's cache
834+
835+
# Step 2: initilize con2
836+
con2.begin()
837+
con2.execute('select count(*) from ins_test')# load pathman's cache
838+
con2.commit()# unlock relations
839+
840+
# Step 3: split 'ins_test1' in con1 (success)
841+
con1.execute("select split_range_partition('ins_test_1', 5)")
842+
843+
# Step 4: try inserting new values in con2 (waiting)
844+
t=threading.Thread(target=con2_thread)
845+
t.start()
846+
847+
# Step 5: wait until 't' locks
848+
whileTrue:
849+
withnode.connect()ascon0:
850+
locks=con0.execute("""
851+
select count(*) from pg_locks where granted = 'f'
852+
""")
853+
854+
ifint(locks[0][0])>0:
855+
break
856+
857+
# Step 6: finish split in con1 (success, unlock)
858+
con1.commit()
859+
860+
# Step 7: wait for con2
861+
t.join()
862+
863+
rows=con1.execute("""
864+
select *, tableoid::regclass::text
865+
from ins_test
866+
order by val asc
867+
""")
868+
869+
# check number of rows in table
870+
self.assertEqual(len(rows),2)
871+
872+
# check values that have been inserted
873+
self.assertEqual(int(rows[0][0]),3)
874+
self.assertEqual(int(rows[1][0]),6)
875+
876+
# check partitions that were chosen for insert
877+
self.assertEqual(str(rows[0][1]),'ins_test_1')
878+
self.assertEqual(str(rows[1][1]),'ins_test_11')
879+
880+
# Stop instance and finish work
881+
node.stop()
882+
node.cleanup()
883+
809884
deftest_pg_dump(self):
810885
"""
811886
Test using dump and restore of partitioned table through pg_dump and pg_restore tools.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp