@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108
108
PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
109
109
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
110
110
PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
111
+ PG_FUNCTION_INFO_V1 (mtm_get_cluster_info );
111
112
PG_FUNCTION_INFO_V1 (mtm_make_table_local );
112
113
PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
113
114
@@ -1609,7 +1610,7 @@ _PG_init(void)
1609
1610
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes" ,
1610
1611
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1611
1612
& Mtm2PCMinTimeout ,
1612
- 10000 ,
1613
+ 100000 , /* 100 seconds */
1613
1614
0 ,
1614
1615
INT_MAX ,
1615
1616
PGC_BACKEND ,
@@ -1624,7 +1625,7 @@ _PG_init(void)
1624
1625
"Percent of prepare time for maximal time of second phase of two-pahse commit" ,
1625
1626
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
1626
1627
& Mtm2PCPrepareRatio ,
1627
- 100 ,
1628
+ 1000 , /* 10 times */
1628
1629
0 ,
1629
1630
INT_MAX ,
1630
1631
PGC_BACKEND ,
@@ -2178,10 +2179,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
2178
2179
typedef struct
2179
2180
{
2180
2181
int nodeId ;
2181
- char * connStrPtr ;
2182
2182
TupleDesc desc ;
2183
- Datum values [8 ];
2184
- bool nulls [8 ];
2183
+ Datum values [Natts_mtm_nodes_state ];
2184
+ bool nulls [Natts_mtm_nodes_state ];
2185
2185
}MtmGetNodeStateCtx ;
2186
2186
2187
2187
Datum
@@ -2190,7 +2190,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2190
2190
FuncCallContext * funcctx ;
2191
2191
MtmGetNodeStateCtx * usrfctx ;
2192
2192
MemoryContext oldcontext ;
2193
- char * p ;
2194
2193
int64 lag ;
2195
2194
bool is_first_call = SRF_IS_FIRSTCALL ();
2196
2195
@@ -2200,7 +2199,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2200
2199
usrfctx = (MtmGetNodeStateCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2201
2200
get_call_result_type (fcinfo ,NULL ,& usrfctx -> desc );
2202
2201
usrfctx -> nodeId = 1 ;
2203
- usrfctx -> connStrPtr = pstrdup (MtmConnStrs );
2204
2202
memset (usrfctx -> nulls , false,sizeof (usrfctx -> nulls ));
2205
2203
funcctx -> user_fctx = usrfctx ;
2206
2204
MemoryContextSwitchTo (oldcontext );
@@ -2219,23 +2217,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
2219
2217
usrfctx -> nulls [4 ]= lag < 0 ;
2220
2218
usrfctx -> values [5 ]= Int64GetDatum (Mtm -> transCount ?Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount :0 );
2221
2219
usrfctx -> values [6 ]= TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2222
- p = strchr (usrfctx -> connStrPtr ,',' );
2223
- if (p != NULL ) {
2224
- * p ++ = '\0' ;
2225
- }
2226
- usrfctx -> values [7 ]= CStringGetTextDatum (usrfctx -> connStrPtr );
2227
- usrfctx -> connStrPtr = p ;
2220
+ usrfctx -> values [7 ]= CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2228
2221
usrfctx -> nodeId += 1 ;
2229
2222
2230
2223
SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (heap_form_tuple (usrfctx -> desc ,usrfctx -> values ,usrfctx -> nulls )));
2231
2224
}
2232
2225
2226
+
2233
2227
Datum
2234
2228
mtm_get_cluster_state (PG_FUNCTION_ARGS )
2235
2229
{
2236
2230
TupleDesc desc ;
2237
- Datum values [10 ];
2238
- bool nulls [10 ]= {false};
2231
+ Datum values [Natts_mtm_cluster_state ];
2232
+ bool nulls [Natts_mtm_cluster_state ]= {false};
2239
2233
get_call_result_type (fcinfo ,NULL ,& desc );
2240
2234
2241
2235
values [0 ]= CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
@@ -2244,16 +2238,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
2244
2238
values [3 ]= Int64GetDatum (Mtm -> nodeLockerMask );
2245
2239
values [4 ]= Int32GetDatum (Mtm -> nNodes );
2246
2240
values [5 ]= Int32GetDatum ((int )Mtm -> pool .active );
2247
- values [6 ]= Int64GetDatum ( BgwPoolGetQueueSize ( & Mtm -> pool ) );
2248
- values [7 ]= Int64GetDatum (Mtm -> transCount );
2249
- values [8 ]= Int64GetDatum (Mtm -> timeShift );
2250
- values [9 ]= Int32GetDatum (Mtm -> recoverySlot );
2251
- nulls [ 9 ]= Mtm -> recoverySlot == 0 ;
2241
+ values [6 ]= Int32GetDatum (( int ) Mtm -> pool . pending );
2242
+ values [7 ]= Int64GetDatum (BgwPoolGetQueueSize ( & Mtm -> pool ) );
2243
+ values [8 ]= Int64GetDatum (Mtm -> transCount );
2244
+ values [9 ]= Int64GetDatum (Mtm -> timeShift );
2245
+ values [ 10 ]= Int32GetDatum ( Mtm -> recoverySlot ) ;
2252
2246
2253
2247
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc ,values ,nulls )));
2254
2248
}
2255
2249
2256
2250
2251
+ typedef struct
2252
+ {
2253
+ int nodeId ;
2254
+ }MtmGetClusterInfoCtx ;
2255
+
2256
+
2257
+ Datum
2258
+ mtm_get_cluster_info (PG_FUNCTION_ARGS )
2259
+ {
2260
+
2261
+ FuncCallContext * funcctx ;
2262
+ MtmGetClusterInfoCtx * usrfctx ;
2263
+ MemoryContext oldcontext ;
2264
+ TupleDesc desc ;
2265
+ bool is_first_call = SRF_IS_FIRSTCALL ();
2266
+ int i ;
2267
+ PGconn * conn ;
2268
+ PGresult * result ;
2269
+ char * values [Natts_mtm_cluster_state ];
2270
+ HeapTuple tuple ;
2271
+
2272
+ if (is_first_call ) {
2273
+ funcctx = SRF_FIRSTCALL_INIT ();
2274
+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
2275
+ usrfctx = (MtmGetClusterInfoCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2276
+ get_call_result_type (fcinfo ,NULL ,& desc );
2277
+ funcctx -> attinmeta = TupleDescGetAttInMetadata (desc );
2278
+ usrfctx -> nodeId = 1 ;
2279
+ funcctx -> user_fctx = usrfctx ;
2280
+ MemoryContextSwitchTo (oldcontext );
2281
+ }
2282
+ funcctx = SRF_PERCALL_SETUP ();
2283
+ usrfctx = (MtmGetClusterInfoCtx * )funcctx -> user_fctx ;
2284
+ if (usrfctx -> nodeId > MtmNodes ) {
2285
+ SRF_RETURN_DONE (funcctx );
2286
+ }
2287
+ conn = PQconnectdb (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2288
+ if (PQstatus (conn )!= CONNECTION_OK ) {
2289
+ elog (ERROR ,"Failed to establish connection '%s' to node %d" ,Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr ,usrfctx -> nodeId );
2290
+ }
2291
+ result = PQexec (conn ,"select * from mtm.get_cluster_state()" );
2292
+
2293
+ if (PQresultStatus (result )!= PGRES_TUPLES_OK || PQntuples (result )!= 1 ) {
2294
+ elog (ERROR ,"Failed to receive data from %d" ,usrfctx -> nodeId );
2295
+ }
2296
+
2297
+ for (i = 0 ;i < Natts_mtm_cluster_state ;i ++ ) {
2298
+ values [i ]= PQgetvalue (result ,0 ,i );
2299
+ }
2300
+ tuple = BuildTupleFromCStrings (funcctx -> attinmeta ,values );
2301
+ PQclear (result );
2302
+ PQfinish (conn );
2303
+ usrfctx -> nodeId += 1 ;
2304
+ SRF_RETURN_NEXT (funcctx ,HeapTupleGetDatum (tuple ));
2305
+ }
2306
+
2307
+
2257
2308
Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2258
2309
{
2259
2310
Oid reloid = PG_GETARG_OID (1 );