22#include "utils/lsyscache.h"
33#include "utils/typcache.h"
44#include "utils/array.h"
5+ #include "utils/snapmgr.h"
56#include "access/nbtree.h"
67#include "access/xact.h"
78#include "catalog/pg_type.h"
89#include "executor/spi.h"
10+ #include "storage/lmgr.h"
911
1012
1113/* declarations */
1214PG_FUNCTION_INFO_V1 (on_partitions_created );
1315PG_FUNCTION_INFO_V1 (on_partitions_updated );
1416PG_FUNCTION_INFO_V1 (on_partitions_removed );
15- PG_FUNCTION_INFO_V1 (find_range_partition );
1617PG_FUNCTION_INFO_V1 (find_or_create_range_partition );
1718PG_FUNCTION_INFO_V1 (get_range_by_idx );
1819PG_FUNCTION_INFO_V1 (get_partition_range );
@@ -71,54 +72,11 @@ on_partitions_removed(PG_FUNCTION_ARGS)
7172}
7273
7374/*
74- * Returns partition oid for specified parent relid and value
75+ * Returns partition oid for specified parent relid and value.
76+ * In case when partition isn't exist try to create one.
7577 */
76- // Datum
77- // find_range_partition(PG_FUNCTION_ARGS)
78- // {
79- // intrelid = DatumGetInt32(PG_GETARG_DATUM(0));
80- // Datumvalue = PG_GETARG_DATUM(1);
81- // Oidvalue_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
82- // intpos;
83- // boolfound;
84- // RangeRelation*rangerel;
85- // RangeEntry*ranges;
86- // TypeCacheEntry*tce;
87- // PartRelationInfo *prel;
88- // Oid cmp_proc_oid;
89- // FmgrInfo cmp_func;
90-
91- // tce = lookup_type_cache(value_type,
92- // TYPECACHE_EQ_OPR | TYPECACHE_LT_OPR | TYPECACHE_GT_OPR |
93- // TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO);
94-
95- // prel = (PartRelationInfo *)
96- // hash_search(relations, (const void *) &relid, HASH_FIND, NULL);
97-
98- // cmp_proc_oid = get_opfamily_proc(tce->btree_opf,
99- // value_type,
100- // prel->atttype,
101- // BTORDER_PROC);
102- // fmgr_info(cmp_proc_oid, &cmp_func);
103-
104- // rangerel = (RangeRelation *)
105- // hash_search(range_restrictions, (const void *) &relid, HASH_FIND, NULL);
106-
107- // if (!rangerel)
108- // PG_RETURN_NULL();
109-
110- // ranges = dsm_array_get_pointer(&rangerel->ranges);
111- // pos = range_binary_search(rangerel, &cmp_func, value, &found);
112-
113- // if (found)
114- // PG_RETURN_OID(ranges[pos].child_oid);
115-
116- // PG_RETURN_NULL();
117- // }
118-
11978Datum
120- find_range_partition (PG_FUNCTION_ARGS )
121- // find_or_create_range_partition(PG_FUNCTION_ARGS)
79+ find_or_create_range_partition (PG_FUNCTION_ARGS )
12280{
12381int relid = DatumGetInt32 (PG_GETARG_DATUM (0 ));
12482Datum value = PG_GETARG_DATUM (1 );
@@ -158,14 +116,30 @@ find_range_partition(PG_FUNCTION_ARGS)
158116PG_RETURN_OID (ranges [pos ].child_oid );
159117else
160118{
161- int ret ;
162- Datum vals [4 ];
163- Oid oids []= {OIDOID ,value_type ,value_type ,value_type };
164- bool nulls []= {false, false, false, false};
119+ int ret ;
120+ Datum vals [4 ];
121+ Oid oids []= {OIDOID ,value_type ,value_type ,value_type };
122+ bool nulls []= {false, false, false, false};
165123RangeEntry * re = & ranges [rangerel -> ranges .length - 1 ];
166- int cmp_upper = FunctionCall2 (& cmp_func ,value ,ranges [rangerel -> ranges .length - 1 ].max );
167- int cmp_lower = FunctionCall2 (& cmp_func ,value ,ranges [0 ].min );
124+ int cmp_upper = FunctionCall2 (& cmp_func ,value ,ranges [rangerel -> ranges .length - 1 ].max );
125+ int cmp_lower = FunctionCall2 (& cmp_func ,value ,ranges [0 ].min );
126+ char * sql ;
127+
128+ /* Lock relation before appending new partitions */
129+ LWLockAcquire (load_config_lock ,LW_EXCLUSIVE );
130+
131+ /*
132+ * Check if someone else has already created partition.
133+ */
134+ ranges = dsm_array_get_pointer (& rangerel -> ranges );
135+ pos = range_binary_search (rangerel ,& cmp_func ,value ,& found );
136+ if (found )
137+ {
138+ LWLockRelease (load_config_lock );
139+ PG_RETURN_OID (ranges [pos ].child_oid );
140+ }
168141
142+ /* Determine nearest range partition */
169143if (cmp_upper > 0 )
170144re = & ranges [rangerel -> ranges .length - 1 ];
171145else if (cmp_lower < 0 )
@@ -176,35 +150,34 @@ find_range_partition(PG_FUNCTION_ARGS)
176150vals [2 ]= re -> max ;
177151vals [3 ]= value ;
178152
179- LWLockAcquire (load_config_lock ,LW_EXCLUSIVE );
180-
181- /* create new partitions */
153+ /* Create new partitions */
182154SPI_connect ();
183- ret = SPI_execute_with_args ("select append_partitions_on_demand_internal($1, $2, $3, $4)" ,
184- 4 ,oids ,vals ,nulls , false,0 );
155+ sql = psprintf ("SELECT %s.append_partitions_on_demand_internal($1, $2, $3, $4)" ,
156+ get_extension_schema ());
157+ ret = SPI_execute_with_args (sql ,4 ,oids ,vals ,nulls , false,0 );
158+ // ret = SPI_execute_with_args("SELECT append_partitions_on_demand_internal($1, $2, $3, $4)",
159+ // 4, oids, vals, nulls, false, 0);
185160if (ret > 0 )
186161{
162+ /* Update relation info */
187163free_dsm_array (& rangerel -> ranges );
188164free_dsm_array (& prel -> children );
189- load_check_constraints (relid );
165+ load_check_constraints (relid , GetCatalogSnapshot ( relid ) );
190166}
191167else
192- {
193168elog (WARNING ,"Attempt to create new partitions failed" );
194- }
195169
196- /* update relation info */
197170SPI_finish ();
198171
172+ /* Release locks */
199173LWLockRelease (load_config_lock );
174+ // pfree(sql);
200175
201- /*repeat binary search */
176+ /*Repeat binary search */
202177ranges = dsm_array_get_pointer (& rangerel -> ranges );
203178pos = range_binary_search (rangerel ,& cmp_func ,value ,& found );
204179if (found )
205- {
206180PG_RETURN_OID (ranges [pos ].child_oid );
207- }
208181}
209182
210183PG_RETURN_NULL ();