11
11
* ------------------------------------------------------------------------
12
12
*/
13
13
14
- #include "utility_stmt_hooking.h"
15
14
#include "init.h"
15
+ #include "utility_stmt_hooking.h"
16
16
#include "partition_filter.h"
17
17
#include "relation_info.h"
18
18
19
19
#include "access/htup_details.h"
20
20
#include "access/sysattr.h"
21
21
#include "access/xact.h"
22
22
#include "catalog/namespace.h"
23
- #include "catalog/pg_attribute.h"
24
23
#include "commands/copy.h"
25
24
#include "commands/trigger.h"
26
25
#include "commands/tablecmds.h"
27
- #include "executor/executor.h"
28
26
#include "foreign/fdwapi.h"
29
27
#include "miscadmin.h"
30
- #include "nodes/makefuncs.h"
31
28
#include "utils/builtins.h"
32
29
#include "utils/lsyscache.h"
33
30
#include "utils/memutils.h"
34
- #include "utils/rel.h"
35
31
#include "utils/rls.h"
36
32
37
33
#include "libpq/libpq.h"
74
70
is_pathman_related_copy (Node * parsetree )
75
71
{
76
72
CopyStmt * copy_stmt = (CopyStmt * )parsetree ;
77
- Oid partitioned_table ;
73
+ Oid parent_relid ;
78
74
79
75
Assert (IsPathmanReady ());
80
76
@@ -93,14 +89,14 @@ is_pathman_related_copy(Node *parsetree)
93
89
return false;
94
90
95
91
/* Get partition's Oid while locking it */
96
- partitioned_table = RangeVarGetRelid (copy_stmt -> relation ,
97
- (copy_stmt -> is_from ?
98
- RowExclusiveLock :
99
- AccessShareLock ),
100
- false);
92
+ parent_relid = RangeVarGetRelid (copy_stmt -> relation ,
93
+ (copy_stmt -> is_from ?
94
+ RowExclusiveLock :
95
+ AccessShareLock ),
96
+ false);
101
97
102
98
/* Check that relation is partitioned */
103
- if (get_pathman_relation_info (partitioned_table ))
99
+ if (get_pathman_relation_info (parent_relid ))
104
100
{
105
101
ListCell * lc ;
106
102
@@ -121,7 +117,7 @@ is_pathman_related_copy(Node *parsetree)
121
117
elog (ERROR ,"COPY is not supported for partitioned tables on Windows" );
122
118
#else
123
119
elog (DEBUG1 ,"Overriding default behavior for COPY [%u]" ,
124
- partitioned_table );
120
+ parent_relid );
125
121
#endif
126
122
127
123
return true;
@@ -130,6 +126,57 @@ is_pathman_related_copy(Node *parsetree)
130
126
return false;
131
127
}
132
128
129
+ /*
130
+ * Is pg_pathman supposed to handle this table rename stmt?
131
+ */
132
+ bool
133
+ is_pathman_related_table_rename (Node * parsetree ,
134
+ Oid * partition_relid_out ,/* ret value */
135
+ AttrNumber * partitioned_col_out )/* ret value */
136
+ {
137
+ RenameStmt * rename_stmt = (RenameStmt * )parsetree ;
138
+ Oid partition_relid ,
139
+ parent_relid ;
140
+ const PartRelationInfo * prel ;
141
+ PartParentSearch parent_search ;
142
+
143
+ Assert (IsPathmanReady ());
144
+
145
+ /* Set default values */
146
+ if (partition_relid_out )* partition_relid_out = InvalidOid ;
147
+ if (partitioned_col_out )* partitioned_col_out = InvalidAttrNumber ;
148
+
149
+ if (!IsA (parsetree ,RenameStmt ))
150
+ return false;
151
+
152
+ /* Are we going to rename some table? */
153
+ if (rename_stmt -> renameType != OBJECT_TABLE )
154
+ return false;
155
+
156
+ /* Assume it's a partition, fetch its Oid */
157
+ partition_relid = RangeVarGetRelid (rename_stmt -> relation ,
158
+ AccessShareLock ,
159
+ false);
160
+
161
+ /* Try fetching parent of this table */
162
+ parent_relid = get_parent_of_partition (partition_relid ,& parent_search );
163
+ if (parent_search != PPS_ENTRY_PART_PARENT )
164
+ return false;
165
+
166
+ /* Is parent partitioned? */
167
+ if ((prel = get_pathman_relation_info (parent_relid ))!= NULL )
168
+ {
169
+ /* Return 'partition_relid' and 'prel->attnum' */
170
+ if (partition_relid_out )* partition_relid_out = partition_relid ;
171
+ if (partitioned_col_out )* partitioned_col_out = prel -> attnum ;
172
+
173
+ return true;
174
+ }
175
+
176
+ return false;
177
+ }
178
+
179
+
133
180
/*
134
181
* CopyGetAttnums - build an integer list of attnums to be copied
135
182
*
@@ -238,6 +285,7 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
238
285
"psql's \\copy command also works for anyone." )));
239
286
}
240
287
288
+ /* Check that we have a relation */
241
289
if (stmt -> relation )
242
290
{
243
291
TupleDesc tupDesc ;
@@ -363,13 +411,9 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
363
411
rel = NULL ;
364
412
}
365
413
}
366
- else
367
- {
368
- Assert (stmt -> query );
369
414
370
- query = stmt -> query ;
371
- rel = NULL ;
372
- }
415
+ /* This should never happen (see is_pathman_related_copy()) */
416
+ else elog (ERROR ,"error in function \"%s\"" ,CppAsString (PathmanDoCopy ));
373
417
374
418
/* COPY ... FROM ... */
375
419
if (is_from )
@@ -626,45 +670,35 @@ prepare_rri_fdw_for_copy(EState *estate,
626
670
}
627
671
628
672
/*
629
- * Rename check constraint oftable if it's a partition
673
+ * RenameRANGE\HASH check constraint ofa partition on table rename event.
630
674
*/
631
675
void
632
- PathmanRenameConstraint (const RenameStmt * stmt )
676
+ PathmanRenameConstraint (Oid partition_relid ,/* cached partition Oid */
677
+ AttrNumber partitioned_col ,/* partitioned column */
678
+ const RenameStmt * part_rename_stmt )/* partition rename stmt */
633
679
{
634
- Oid partition_relid ,
635
- parent_relid ;
636
- char * old_constraint_name ,
637
- * new_constraint_name ;
638
- RenameStmt * rename_stmt ;
639
- const PartRelationInfo * prel ;
640
-
641
- partition_relid = RangeVarGetRelid (stmt -> relation ,AccessShareLock , false);
642
- parent_relid = get_rel_parent (partition_relid );
643
-
644
- /* Skip if there's no parent */
645
- if (!OidIsValid (parent_relid ))return ;
646
-
647
- /* Fetch partitioning data */
648
- prel = get_pathman_relation_info (parent_relid );
649
-
650
- /* Skip if this table is not partitioned */
651
- if (!prel )return ;
680
+ char * old_constraint_name ,
681
+ * new_constraint_name ;
682
+ RenameStmt rename_stmt ;
652
683
653
684
/* Generate old constraint name */
654
- old_constraint_name = build_check_constraint_name_relid_internal (partition_relid ,
655
- prel -> attnum );
685
+ old_constraint_name =
686
+ build_check_constraint_name_relid_internal (partition_relid ,
687
+ partitioned_col );
656
688
657
689
/* Generate new constraint name */
658
- new_constraint_name = build_check_constraint_name_relname_internal (stmt -> newname ,
659
- prel -> attnum );
690
+ new_constraint_name =
691
+ build_check_constraint_name_relname_internal (part_rename_stmt -> newname ,
692
+ partitioned_col );
660
693
661
694
/* Build check constraint RENAME statement */
662
- rename_stmt = makeNode (RenameStmt );
663
- rename_stmt -> renameType = OBJECT_TABCONSTRAINT ;
664
- rename_stmt -> relation = stmt -> relation ;
665
- rename_stmt -> subname = old_constraint_name ;
666
- rename_stmt -> newname = new_constraint_name ;
667
- rename_stmt -> missing_ok = false;
668
-
669
- RenameConstraint (rename_stmt );
695
+ memset ((void * )& rename_stmt ,0 ,sizeof (RenameStmt ));
696
+ NodeSetTag (& rename_stmt ,T_RenameStmt );
697
+ rename_stmt .renameType = OBJECT_TABCONSTRAINT ;
698
+ rename_stmt .relation = part_rename_stmt -> relation ;
699
+ rename_stmt .subname = old_constraint_name ;
700
+ rename_stmt .newname = new_constraint_name ;
701
+ rename_stmt .missing_ok = false;
702
+
703
+ RenameConstraint (& rename_stmt );
670
704
}