1
1
2
2
#include "postgres.h"
3
3
4
+ //#include "access/xact.h"
4
5
#include "commands/extension.h"
5
6
#include "executor/execdesc.h"
7
+ #include "executor/executor.h"
6
8
#include "fmgr.h"
9
+ #include "libpq/libpq.h"
10
+ #include "libpq-fe.h"
11
+ #include "nodes/params.h"
7
12
#include "optimizer/planner.h"
8
13
#include "storage/lmgr.h"
9
14
#include "utils/builtins.h"
15
+ #include "utils/guc.h"
16
+ #include "utils/plancache.h"
17
+ #include "utils/snapmgr.h"
10
18
11
19
PG_MODULE_MAGIC ;
12
20
@@ -15,51 +23,142 @@ PG_FUNCTION_INFO_V1(pg_execute_plan);
15
23
void _PG_init (void );
16
24
17
25
static planner_hook_type prev_planner_hook = NULL ;
26
+ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL ;
18
27
19
28
static PlannedStmt * HOOK_Planner_injection (Query * parse ,int cursorOptions ,
20
29
ParamListInfo boundParams );
30
+ static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
31
+ static char * serialize_plan (PlannedStmt * pstmt ,ParamListInfo boundParams ,
32
+ int * size );
33
+ static PGconn * conn = NULL ;
21
34
35
+ int node_number1 = 0 ;
36
+ //#include "utils/guc.h"
22
37
/*
23
38
* Module load/unload callback
24
39
*/
25
40
void
26
41
_PG_init (void )
27
42
{
43
+ elog (LOG ,"_PG_Init" );
44
+ DefineCustomIntVariable ("pargres.node" ,
45
+ "Node number in instances collaboration" ,
46
+ NULL ,
47
+ & node_number1 ,
48
+ 0 ,
49
+ 0 ,
50
+ 1023 ,
51
+ PGC_SIGHUP ,
52
+ GUC_NOT_IN_SAMPLE ,
53
+ NULL ,
54
+ NULL ,
55
+ NULL );
56
+
28
57
/* Planner hook */
29
58
prev_planner_hook = planner_hook ;
30
59
planner_hook = HOOK_Planner_injection ;
60
+
61
+ prev_ExecutorEnd = ExecutorEnd_hook ;
62
+ ExecutorEnd_hook = HOOK_ExecEnd_injection ;
31
63
}
32
64
33
- static PlannedStmt * HOOK_Planner_injection (Query * parse ,int cursorOptions ,
65
+ static void
66
+ HOOK_ExecEnd_injection (QueryDesc * queryDesc )
67
+ {
68
+ PGresult * result ;
69
+
70
+ /* Execute before hook because it destruct memory context of exchange list */
71
+ if (conn )
72
+ while ((result = PQgetResult (conn ))!= NULL )
73
+ Assert (PQresultStatus (result )!= PGRES_FATAL_ERROR );
74
+
75
+ if (prev_ExecutorEnd )
76
+ prev_ExecutorEnd (queryDesc );
77
+ else
78
+ standard_ExecutorEnd (queryDesc );
79
+ }
80
+
81
+ static PlannedStmt *
82
+ HOOK_Planner_injection (Query * parse ,int cursorOptions ,
34
83
ParamListInfo boundParams )
35
84
{
36
- PlannedStmt * stmt ;
37
- char * serialized_plan ;
85
+ PlannedStmt * pstmt ;
86
+
87
+ conn = NULL ;
38
88
39
89
if (prev_planner_hook )
40
- stmt = prev_planner_hook (parse ,cursorOptions ,boundParams );
90
+ pstmt = prev_planner_hook (parse ,cursorOptions ,boundParams );
91
+ else
92
+ pstmt = standard_planner (parse ,cursorOptions ,boundParams );
93
+
94
+ if (node_number1 > 0 )
95
+ return pstmt ;
41
96
else
42
- stmt = standard_planner ( parse , cursorOptions , boundParams );
97
+ printf ( "SEND Query\n" );
43
98
44
99
/* Extension is not initialized. */
45
100
if (OidIsValid (get_extension_oid ("execplan" , true)))
46
101
{
47
- FILE * f = fopen ("/home/andrey/plans.txt" ,"at" );
48
- if (stmt -> paramExecTypes == NIL )
49
- {
50
- elog (LOG ,"commandType: %d\n" ,stmt -> commandType );
51
- }
52
- //Assert(stmt->paramExecTypes != NIL);
53
- serialized_plan = nodeToString (stmt );
54
- //fprintf(f, "\n%s\n", serialized_plan);
55
- fclose (f );
102
+ char conninfo [1024 ];
103
+ char * data ,
104
+ * SQLCommand ;
105
+ int status ,
106
+ data_size ;
107
+
108
+ /* Connect to slave and send it a query plan */
109
+ sprintf (conninfo ,"host=localhost port=5433%c" ,'\0' );
110
+ conn = PQconnectdb (conninfo );
111
+ if (PQstatus (conn )== CONNECTION_BAD )
112
+ elog (LOG ,"Connection error. conninfo: %s" ,conninfo );
113
+
114
+ data = serialize_plan (pstmt ,boundParams ,& data_size );
115
+ SQLCommand = (char * )palloc0 (strlen (data )+ 100 );
116
+ sprintf (SQLCommand ,"SELECT pg_execute_plan('%s')" ,data );
117
+ elog (LOG ,"query: %s" ,SQLCommand );
118
+ status = PQsendQuery (conn ,SQLCommand );
119
+ if (status == 0 )
120
+ elog (ERROR ,"Query sending error: %s" ,PQerrorMessage (conn ));
56
121
}
57
- return stmt ;
122
+ return pstmt ;
58
123
}
59
124
60
- #include "executor/executor.h"
61
- #include "utils/plancache.h"
62
- #include "utils/snapmgr.h"
125
+ #include "utils/fmgrprotos.h"
126
+
127
+ static char *
128
+ serialize_plan (PlannedStmt * pstmt ,ParamListInfo boundParams ,int * size )
129
+ {
130
+ int splan_len ,
131
+ sparams_len ,
132
+ econtainer_len ;
133
+ char * serialized_plan ,
134
+ * container ,
135
+ * start_address ,
136
+ * econtainer ;
137
+
138
+ Assert (size != NULL );
139
+
140
+ serialized_plan = nodeToString (pstmt );
141
+
142
+ /* We use splan_len+1 bytes for include end-of-string symbol. */
143
+ splan_len = strlen (serialized_plan )+ 1 ;
144
+
145
+ sparams_len = EstimateParamListSpace (boundParams );
146
+
147
+ container = (char * )palloc0 (splan_len + sparams_len );
148
+ //elog(LOG, "Serialize sizes: plan: %d params: %d, numParams: %d", splan_len, sparams_len, boundParams->numParams);
149
+ memcpy (container ,serialized_plan ,splan_len );
150
+ start_address = container + splan_len ;
151
+ SerializeParamList (boundParams ,& start_address );
152
+
153
+ econtainer_len = esc_enc_len (container ,splan_len + sparams_len );
154
+ econtainer = (char * )palloc0 (econtainer_len + 1 );
155
+
156
+ Assert (econtainer_len == esc_encode (container ,splan_len + sparams_len ,econtainer ));
157
+ econtainer [econtainer_len ]= '\0' ;
158
+ * size = econtainer_len + 1 ;
159
+ elog (LOG ,"Serialize sizes: econtainer: %d" ,* size );
160
+ return econtainer ;
161
+ }
63
162
64
163
static void
65
164
ScanQueryForLocks (PlannedStmt * pstmt ,bool acquire )
@@ -107,17 +206,34 @@ AcquirePlannerLocks(PlannedStmt *pstmt, bool acquire)
107
206
Datum
108
207
pg_execute_plan (PG_FUNCTION_ARGS )
109
208
{
110
- char * data = TextDatumGetCString (PG_GETARG_DATUM (0 ));
111
- PlannedStmt * pstmt ;
112
- QueryDesc * queryDesc ;
113
- char queryString [5 ]= "NONE" ;
114
- ParamListInfo paramLI = NULL ;
115
-
116
- elog (INFO ,"MESSAGE: %s" ,data );
117
-
209
+ char * data = TextDatumGetCString (PG_GETARG_DATUM (0 ));
210
+ PlannedStmt * pstmt ;
211
+ QueryDesc * queryDesc ;
212
+ char queryString [5 ]= "NONE" ;
213
+ ParamListInfo paramLI = NULL ;
214
+ int dec_tot_len ;
215
+ char * dcontainer ,
216
+ * start_addr ;
217
+
218
+ elog (LOG ,"datalen=%lu\n" ,strlen (data ));
219
+ /* Compute decoded size of bytea data */
220
+ dec_tot_len = esc_dec_len (data ,strlen (data ));
221
+ elog (LOG ,"dec_tot_len=%d datalen=%lu\n" ,dec_tot_len ,strlen (data ));
222
+ dcontainer = (char * )palloc0 (dec_tot_len );
223
+ Assert (dec_tot_len == esc_decode (data ,strlen (data ),dcontainer ));
224
+
225
+ pstmt = (PlannedStmt * )stringToNode ((char * )dcontainer );
226
+ elog (LOG ,"Serialize Plan Size=%lu\n" ,strlen (dcontainer ));
227
+ start_addr = dcontainer + strlen (dcontainer )+ 1 ;
228
+ paramLI = RestoreParamList ((char * * )& start_addr );
229
+ elog (LOG ,"Decoded params. numParams: %d\n" ,paramLI -> numParams );
230
+ //printf("INCOMING: %s\n", data);
231
+ //PG_RETURN_BOOL(true);
118
232
/* Execute query plan. Based on execParallel.c ParallelQueryMain() */
119
- pstmt = (PlannedStmt * )stringToNode (data );
120
- //pstmt->paramExecTypes = NIL;
233
+ //
234
+ //ptr += strlen((const char *) ptr);
235
+ //
236
+
121
237
queryDesc = CreateQueryDesc (pstmt ,queryString ,GetActiveSnapshot (),
122
238
InvalidSnapshot ,None_Receiver ,paramLI ,NULL ,
123
239
0 );