22
22
#include "access/nbtree.h"
23
23
#include "access/htup_details.h"
24
24
#include "access/xact.h"
25
+ #include "access/sysattr.h"
25
26
#include "catalog/indexing.h"
26
27
#include "catalog/pg_trigger.h"
27
28
#include "catalog/pg_type.h"
28
29
#include "commands/tablespace.h"
29
30
#include "commands/trigger.h"
31
+ #include "executor/spi.h"
30
32
#include "funcapi.h"
31
33
#include "miscadmin.h"
32
34
#include "utils/builtins.h"
@@ -111,7 +113,10 @@ typedef struct
111
113
static void on_partitions_created_internal (Oid partitioned_table ,bool add_callbacks );
112
114
static void on_partitions_updated_internal (Oid partitioned_table ,bool add_callbacks );
113
115
static void on_partitions_removed_internal (Oid partitioned_table ,bool add_callbacks );
114
-
116
+ static void delete_tuple (Relation rel ,Datum ctid );
117
+ static void insert_tuple (Relation rel ,HeapTuple tup );
118
+ static void make_arg_list (StringInfoData * buf ,HeapTuple tup ,TupleDesc tupdesc ,
119
+ int * nargs ,Oid * * argtypes ,Datum * * args ,char * * nulls );
115
120
116
121
/*
117
122
* ----------------------------
@@ -1116,16 +1121,19 @@ update_trigger_func(PG_FUNCTION_ARGS)
1116
1121
Datum key ;
1117
1122
bool isnull ;
1118
1123
TupleConversionMap * conversion_map ;
1124
+ Datum ctid ;
1119
1125
1126
+ Relation source_rel ;
1120
1127
TupleDesc source_tupdesc ;
1121
- HeapTuple source_tuple ;
1128
+ HeapTuple old_tuple ;
1129
+ HeapTuple new_tuple ;
1122
1130
Oid source_relid ;
1123
1131
AttrNumber source_key ;
1124
1132
1125
1133
Relation target_rel ;
1126
1134
TupleDesc target_tupdesc ;
1127
- HeapTuple target_tuple ;
1128
1135
Oid target_relid ;
1136
+ HeapTuple target_tuple ;
1129
1137
1130
1138
/* This function can only be invoked as a trigger */
1131
1139
if (!CALLED_AS_TRIGGER (fcinfo ))
@@ -1135,8 +1143,10 @@ update_trigger_func(PG_FUNCTION_ARGS)
1135
1143
if (!TRIGGER_FIRED_BY_UPDATE (trigdata -> tg_event ))
1136
1144
elog (ERROR ,"This function must only be used as UPDATE trigger" );
1137
1145
1146
+ source_rel = trigdata -> tg_relation ;
1138
1147
source_relid = trigdata -> tg_relation -> rd_id ;
1139
- source_tuple = trigdata -> tg_newtuple ;
1148
+ old_tuple = trigdata -> tg_trigtuple ;
1149
+ new_tuple = trigdata -> tg_newtuple ;
1140
1150
source_tupdesc = trigdata -> tg_relation -> rd_att ;
1141
1151
1142
1152
/* Find parent relation and partitioning info */
@@ -1154,40 +1164,143 @@ update_trigger_func(PG_FUNCTION_ARGS)
1154
1164
*/
1155
1165
key_name = get_attname (parent ,prel -> attnum );
1156
1166
source_key = get_attnum (source_relid ,key_name );
1157
- key = heap_getattr (source_tuple ,source_key ,source_tupdesc ,& isnull );
1167
+ // target_key = get_attnum(target_relid, key_name);
1168
+ key = heap_getattr (new_tuple ,source_key ,source_tupdesc ,& isnull );
1158
1169
1159
1170
/* Find partition it should go into */
1160
1171
target_relid = get_partition_for_key (prel ,key );
1161
1172
1162
1173
/* If target partition is the same then do nothing */
1163
1174
if (target_relid == source_relid )
1164
- return PointerGetDatum ( source_tuple );
1175
+ PG_RETURN_POINTER ( new_tuple );
1165
1176
1177
+ /* TODO: probably should be another lock level */
1166
1178
target_rel = heap_open (target_relid ,RowExclusiveLock );
1167
1179
target_tupdesc = target_rel -> rd_att ;
1168
1180
1181
+ /* Read tuple id */
1182
+ ctid = heap_getsysattr (old_tuple ,
1183
+ SelfItemPointerAttributeNumber ,
1184
+ source_tupdesc ,
1185
+ & isnull );
1186
+
1169
1187
/*
1170
1188
* Else if it's a different partition then build a TupleConversionMap
1171
1189
* between original partition and new one. And then do a convertation
1172
1190
*/
1173
1191
conversion_map = convert_tuples_by_name (source_tupdesc ,
1174
1192
target_tupdesc ,
1175
1193
"Failed to convert tuple" );
1176
- target_tuple = do_convert_tuple (source_tuple ,conversion_map );
1194
+ target_tuple = do_convert_tuple (new_tuple ,conversion_map );
1195
+
1196
+
1197
+ if (SPI_connect ()!= SPI_OK_CONNECT )
1198
+ elog (ERROR ,"SPI_connect failed" );
1177
1199
1178
1200
/*
1179
1201
* To make an UPDATE on a tuple in case when the tuple should be moved from
1180
1202
* one partition to another we need to perform two actions. First, remove
1181
1203
* old tuple from original partition and then insert updated version
1182
1204
* of tuple to the target partition
1183
1205
*/
1184
- simple_heap_delete (trigdata -> tg_relation ,& trigdata -> tg_trigtuple -> t_self );
1185
- simple_heap_insert (target_rel ,target_tuple );
1206
+ delete_tuple (source_rel ,ctid );
1207
+ insert_tuple (target_rel ,target_tuple );
1208
+
1209
+ if (SPI_finish ()!= SPI_OK_FINISH )
1210
+ elog (ERROR ,"SPI_finish failed" );
1186
1211
1187
1212
heap_close (target_rel ,RowExclusiveLock );
1213
+
1188
1214
PG_RETURN_VOID ();
1189
1215
}
1190
1216
1217
+ /*
1218
+ * Delete record from rel. Caller is responsible for SPI environment setup
1219
+ */
1220
+ static void
1221
+ delete_tuple (Relation rel ,Datum ctid )
1222
+ {
1223
+ char * query ;
1224
+ Datum args [1 ]= {ctid };
1225
+ Oid argtypes [1 ]= {TIDOID };
1226
+ char nulls [1 ]= {' ' };
1227
+ int spi_result ;
1228
+
1229
+ query = psprintf ("DELETE FROM %s.%s WHERE ctid = $1" ,
1230
+ quote_identifier (get_namespace_name (RelationGetNamespace (rel ))),
1231
+ quote_identifier (RelationGetRelationName (rel )));
1232
+ spi_result = SPI_execute_with_args (query ,1 ,argtypes ,args ,nulls , false,1 );
1233
+
1234
+ /* Check result */
1235
+ if (spi_result != SPI_OK_DELETE )
1236
+ elog (ERROR ,"SPI_execute_with_args returned %d" ,spi_result );
1237
+ }
1238
+
1239
+ /*
1240
+ * Insert a new tuple to the rel. Caller is responsible for SPI environment
1241
+ * setup
1242
+ */
1243
+ static void
1244
+ insert_tuple (Relation rel ,HeapTuple tup )
1245
+ {
1246
+ TupleDesc tupdesc = rel -> rd_att ;
1247
+ StringInfoData querybuf ;
1248
+ Datum * args ;
1249
+ Oid * argtypes ;
1250
+ char * nulls ;
1251
+ int nargs ;
1252
+ const char * namespace ;
1253
+ const char * relname ;
1254
+ int spi_result ;
1255
+
1256
+ namespace = quote_identifier (get_namespace_name (RelationGetNamespace (rel )));
1257
+ relname = quote_identifier (RelationGetRelationName (rel ));
1258
+
1259
+ initStringInfo (& querybuf );
1260
+ appendStringInfo (& querybuf ,"INSERT INTO " );
1261
+ appendStringInfo (& querybuf ,"%s.%s" ,namespace ,relname );
1262
+ appendStringInfo (& querybuf ," VALUES (" );
1263
+ make_arg_list (& querybuf ,tup ,tupdesc ,& nargs ,& argtypes ,& args ,& nulls );
1264
+ appendStringInfo (& querybuf ,")" );
1265
+
1266
+ spi_result = SPI_execute_with_args (querybuf .data ,nargs ,argtypes ,
1267
+ args ,nulls , false,0 );
1268
+
1269
+ /* Check result */
1270
+ if (spi_result != SPI_OK_INSERT )
1271
+ elog (ERROR ,"SPI_execute_with_args returned %d" ,spi_result );
1272
+ }
1273
+
1274
+ static void
1275
+ make_arg_list (StringInfoData * buf ,HeapTuple tup ,TupleDesc tupdesc ,
1276
+ int * nargs ,Oid * * argtypes ,Datum * * args ,char * * nulls )
1277
+ {
1278
+ int i ;
1279
+ bool isnull ;
1280
+
1281
+ * nargs = tupdesc -> natts ;
1282
+ * args = palloc (sizeof (Datum )* tupdesc -> natts );
1283
+ * argtypes = palloc (sizeof (Oid )* tupdesc -> natts );
1284
+ * nulls = palloc (sizeof (char )* tupdesc -> natts );
1285
+
1286
+ for (i = 0 ;i < tupdesc -> natts ;i ++ )
1287
+ {
1288
+ /* Skip dropped columns */
1289
+ if (tupdesc -> attrs [i ]-> attisdropped )
1290
+ continue ;
1291
+
1292
+ * args [i ]= heap_getattr (tup ,i + 1 ,tupdesc ,& isnull );
1293
+ * nulls [i ]= isnull ?'n' :' ' ;
1294
+ * argtypes [i ]= tupdesc -> attrs [i ]-> atttypid ;
1295
+
1296
+ /* Add comma separator (except the first time) */
1297
+ if (i != 0 )
1298
+ appendStringInfo (buf ,"," );
1299
+
1300
+ /* Add parameter */
1301
+ appendStringInfo (buf ,"$%i" ,i + 1 );
1302
+ }
1303
+ }
1191
1304
1192
1305
/*
1193
1306
* Returns Oid of partition corresponding to partitioning key value. Throws