2222#include "access/nbtree.h"
2323#include "access/htup_details.h"
2424#include "access/xact.h"
25+ #include "access/sysattr.h"
2526#include "catalog/indexing.h"
2627#include "catalog/pg_trigger.h"
2728#include "catalog/pg_type.h"
2829#include "commands/tablespace.h"
2930#include "commands/trigger.h"
31+ #include "executor/spi.h"
3032#include "funcapi.h"
3133#include "miscadmin.h"
3234#include "utils/builtins.h"
@@ -111,7 +113,10 @@ typedef struct
111113static void on_partitions_created_internal (Oid partitioned_table ,bool add_callbacks );
112114static void on_partitions_updated_internal (Oid partitioned_table ,bool add_callbacks );
113115static void on_partitions_removed_internal (Oid partitioned_table ,bool add_callbacks );
114-
116+ static void delete_tuple (Relation rel ,Datum ctid );
117+ static void insert_tuple (Relation rel ,HeapTuple tup );
118+ static void make_arg_list (StringInfoData * buf ,HeapTuple tup ,TupleDesc tupdesc ,
119+ int * nargs ,Oid * * argtypes ,Datum * * args ,char * * nulls );
115120
116121/*
117122 * ----------------------------
@@ -1116,16 +1121,19 @@ update_trigger_func(PG_FUNCTION_ARGS)
11161121Datum key ;
11171122bool isnull ;
11181123TupleConversionMap * conversion_map ;
1124+ Datum ctid ;
11191125
1126+ Relation source_rel ;
11201127TupleDesc source_tupdesc ;
1121- HeapTuple source_tuple ;
1128+ HeapTuple old_tuple ;
1129+ HeapTuple new_tuple ;
11221130Oid source_relid ;
11231131AttrNumber source_key ;
11241132
11251133Relation target_rel ;
11261134TupleDesc target_tupdesc ;
1127- HeapTuple target_tuple ;
11281135Oid target_relid ;
1136+ HeapTuple target_tuple ;
11291137
11301138/* This function can only be invoked as a trigger */
11311139if (!CALLED_AS_TRIGGER (fcinfo ))
@@ -1135,8 +1143,10 @@ update_trigger_func(PG_FUNCTION_ARGS)
11351143if (!TRIGGER_FIRED_BY_UPDATE (trigdata -> tg_event ))
11361144elog (ERROR ,"This function must only be used as UPDATE trigger" );
11371145
1146+ source_rel = trigdata -> tg_relation ;
11381147source_relid = trigdata -> tg_relation -> rd_id ;
1139- source_tuple = trigdata -> tg_newtuple ;
1148+ old_tuple = trigdata -> tg_trigtuple ;
1149+ new_tuple = trigdata -> tg_newtuple ;
11401150source_tupdesc = trigdata -> tg_relation -> rd_att ;
11411151
11421152/* Find parent relation and partitioning info */
@@ -1154,40 +1164,143 @@ update_trigger_func(PG_FUNCTION_ARGS)
11541164 */
11551165key_name = get_attname (parent ,prel -> attnum );
11561166source_key = get_attnum (source_relid ,key_name );
1157- key = heap_getattr (source_tuple ,source_key ,source_tupdesc ,& isnull );
1167+ // target_key = get_attnum(target_relid, key_name);
1168+ key = heap_getattr (new_tuple ,source_key ,source_tupdesc ,& isnull );
11581169
11591170/* Find partition it should go into */
11601171target_relid = get_partition_for_key (prel ,key );
11611172
11621173/* If target partition is the same then do nothing */
11631174if (target_relid == source_relid )
1164- return PointerGetDatum ( source_tuple );
1175+ PG_RETURN_POINTER ( new_tuple );
11651176
1177+ /* TODO: probably should be another lock level */
11661178target_rel = heap_open (target_relid ,RowExclusiveLock );
11671179target_tupdesc = target_rel -> rd_att ;
11681180
1181+ /* Read tuple id */
1182+ ctid = heap_getsysattr (old_tuple ,
1183+ SelfItemPointerAttributeNumber ,
1184+ source_tupdesc ,
1185+ & isnull );
1186+
11691187/*
11701188 * Else if it's a different partition then build a TupleConversionMap
11711189 * between original partition and new one. And then do a convertation
11721190 */
11731191conversion_map = convert_tuples_by_name (source_tupdesc ,
11741192target_tupdesc ,
11751193"Failed to convert tuple" );
1176- target_tuple = do_convert_tuple (source_tuple ,conversion_map );
1194+ target_tuple = do_convert_tuple (new_tuple ,conversion_map );
1195+
1196+
1197+ if (SPI_connect ()!= SPI_OK_CONNECT )
1198+ elog (ERROR ,"SPI_connect failed" );
11771199
11781200/*
11791201 * To make an UPDATE on a tuple in case when the tuple should be moved from
11801202 * one partition to another we need to perform two actions. First, remove
11811203 * old tuple from original partition and then insert updated version
11821204 * of tuple to the target partition
11831205 */
1184- simple_heap_delete (trigdata -> tg_relation ,& trigdata -> tg_trigtuple -> t_self );
1185- simple_heap_insert (target_rel ,target_tuple );
1206+ delete_tuple (source_rel ,ctid );
1207+ insert_tuple (target_rel ,target_tuple );
1208+
1209+ if (SPI_finish ()!= SPI_OK_FINISH )
1210+ elog (ERROR ,"SPI_finish failed" );
11861211
11871212heap_close (target_rel ,RowExclusiveLock );
1213+
11881214PG_RETURN_VOID ();
11891215}
11901216
1217+ /*
1218+ * Delete record from rel. Caller is responsible for SPI environment setup
1219+ */
1220+ static void
1221+ delete_tuple (Relation rel ,Datum ctid )
1222+ {
1223+ char * query ;
1224+ Datum args [1 ]= {ctid };
1225+ Oid argtypes [1 ]= {TIDOID };
1226+ char nulls [1 ]= {' ' };
1227+ int spi_result ;
1228+
1229+ query = psprintf ("DELETE FROM %s.%s WHERE ctid = $1" ,
1230+ quote_identifier (get_namespace_name (RelationGetNamespace (rel ))),
1231+ quote_identifier (RelationGetRelationName (rel )));
1232+ spi_result = SPI_execute_with_args (query ,1 ,argtypes ,args ,nulls , false,1 );
1233+
1234+ /* Check result */
1235+ if (spi_result != SPI_OK_DELETE )
1236+ elog (ERROR ,"SPI_execute_with_args returned %d" ,spi_result );
1237+ }
1238+
1239+ /*
1240+ * Insert a new tuple to the rel. Caller is responsible for SPI environment
1241+ * setup
1242+ */
1243+ static void
1244+ insert_tuple (Relation rel ,HeapTuple tup )
1245+ {
1246+ TupleDesc tupdesc = rel -> rd_att ;
1247+ StringInfoData querybuf ;
1248+ Datum * args ;
1249+ Oid * argtypes ;
1250+ char * nulls ;
1251+ int nargs ;
1252+ const char * namespace ;
1253+ const char * relname ;
1254+ int spi_result ;
1255+
1256+ namespace = quote_identifier (get_namespace_name (RelationGetNamespace (rel )));
1257+ relname = quote_identifier (RelationGetRelationName (rel ));
1258+
1259+ initStringInfo (& querybuf );
1260+ appendStringInfo (& querybuf ,"INSERT INTO " );
1261+ appendStringInfo (& querybuf ,"%s.%s" ,namespace ,relname );
1262+ appendStringInfo (& querybuf ," VALUES (" );
1263+ make_arg_list (& querybuf ,tup ,tupdesc ,& nargs ,& argtypes ,& args ,& nulls );
1264+ appendStringInfo (& querybuf ,")" );
1265+
1266+ spi_result = SPI_execute_with_args (querybuf .data ,nargs ,argtypes ,
1267+ args ,nulls , false,0 );
1268+
1269+ /* Check result */
1270+ if (spi_result != SPI_OK_INSERT )
1271+ elog (ERROR ,"SPI_execute_with_args returned %d" ,spi_result );
1272+ }
1273+
1274+ static void
1275+ make_arg_list (StringInfoData * buf ,HeapTuple tup ,TupleDesc tupdesc ,
1276+ int * nargs ,Oid * * argtypes ,Datum * * args ,char * * nulls )
1277+ {
1278+ int i ;
1279+ bool isnull ;
1280+
1281+ * nargs = tupdesc -> natts ;
1282+ * args = palloc (sizeof (Datum )* tupdesc -> natts );
1283+ * argtypes = palloc (sizeof (Oid )* tupdesc -> natts );
1284+ * nulls = palloc (sizeof (char )* tupdesc -> natts );
1285+
1286+ for (i = 0 ;i < tupdesc -> natts ;i ++ )
1287+ {
1288+ /* Skip dropped columns */
1289+ if (tupdesc -> attrs [i ]-> attisdropped )
1290+ continue ;
1291+
1292+ * args [i ]= heap_getattr (tup ,i + 1 ,tupdesc ,& isnull );
1293+ * nulls [i ]= isnull ?'n' :' ' ;
1294+ * argtypes [i ]= tupdesc -> attrs [i ]-> atttypid ;
1295+
1296+ /* Add comma separator (except the first time) */
1297+ if (i != 0 )
1298+ appendStringInfo (buf ,"," );
1299+
1300+ /* Add parameter */
1301+ appendStringInfo (buf ,"$%i" ,i + 1 );
1302+ }
1303+ }
11911304
11921305/*
11931306 * Returns Oid of partition corresponding to partitioning key value. Throws