@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865
865
866
866
void MtmJoinTransaction (GlobalTransactionId * gtid ,csn_t globalSnapshot )
867
867
{
868
- MtmLock (LW_EXCLUSIVE );
869
- MtmSyncClock (globalSnapshot );
870
- MtmUnlock ();
871
-
868
+ if (globalSnapshot != INVALID_CSN ) {
869
+ MtmLock (LW_EXCLUSIVE );
870
+ MtmSyncClock (globalSnapshot );
871
+ MtmUnlock ();
872
+ }else {
873
+ globalSnapshot = MtmTx .snapshot ;
874
+ }
872
875
if (!TransactionIdIsValid (gtid -> xid )) {
873
876
/* In case of recovery InvalidTransactionId is passed */
874
877
Assert (Mtm -> status == MTM_RECOVERY );
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
1877
1880
}
1878
1881
}
1879
1882
}
1883
+ static void
1884
+ MtmOnProcExit (int code ,Datum arg )
1885
+ {
1886
+ if (MtmReplicationNodeId >=0 ) {
1887
+ elog (WARNING ,"WAL-sender to %d is terminated" ,MtmReplicationNodeId );
1888
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1889
+ }
1890
+ }
1880
1891
1881
1892
static void
1882
1893
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1923
1934
elog (NOTICE ,"Node %d start logical replication to node %d in normal mode" ,MtmNodeId ,MtmReplicationNodeId );
1924
1935
}
1925
1936
MtmUnlock ();
1937
+ on_proc_exit (MtmOnProcExit ,0 );
1926
1938
}
1927
1939
1928
1940
static void
1929
1941
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
1930
1942
{
1931
- elog (WARNING ,"Logical replication to node %d is stopped" ,MtmReplicationNodeId );
1932
- MtmOnNodeDisconnect (MtmReplicationNodeId );
1943
+ if (MtmReplicationNodeId >=0 ) {
1944
+ elog (WARNING ,"Logical replication to node %d is stopped" ,MtmReplicationNodeId );
1945
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1946
+ MtmReplicationNodeId = -1 ;/* defuse on_proc_exit hook */
1947
+ }
1933
1948
}
1934
1949
1935
1950
static bool
@@ -2159,14 +2174,34 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
2159
2174
2160
2175
* errmsg = palloc0 (errlen );
2161
2176
2162
- /* Strip "ERROR:\t " from beginning and "\n" from end of error string */
2177
+ /* Strip "ERROR: " from beginning and "\n" from end of error string */
2163
2178
strncpy (* errmsg ,errstr + 8 ,errlen - 1 - 8 );
2164
2179
}
2165
2180
2166
2181
PQclear (result );
2167
2182
return ret ;
2168
2183
}
2169
2184
2185
+ static void
2186
+ MtmNoticeReceiver (void * i ,const PGresult * res )
2187
+ {
2188
+ char * notice = PQresultErrorMessage (res );
2189
+ char * stripped_notice ;
2190
+ int len = strlen (notice );
2191
+
2192
+ /* Skip notices from other nodes */
2193
+ if ( (* (int * )i )!= MtmNodeId - 1 )
2194
+ return ;
2195
+
2196
+ stripped_notice = palloc0 (len );
2197
+
2198
+ /* Strip "NOTICE: " from beginning and "\n" from end of error string */
2199
+ strncpy (stripped_notice ,notice + 9 ,len - 1 - 9 );
2200
+
2201
+ elog (NOTICE ,stripped_notice );
2202
+ pfree (stripped_notice );
2203
+ }
2204
+
2170
2205
static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError )
2171
2206
{
2172
2207
int i = 0 ;
@@ -2195,6 +2230,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2195
2230
elog (ERROR ,"Failed to establish connection '%s' to node %d" ,Mtm -> nodes [i ].con .connStr ,failedNode );
2196
2231
}
2197
2232
}
2233
+ PQsetNoticeReceiver (conns [i ],MtmNoticeReceiver ,& i );
2198
2234
}
2199
2235
}
2200
2236
Assert (i == MtmNodes );
@@ -2211,9 +2247,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2211
2247
}
2212
2248
if (!MtmRunUtilityStmt (conns [i ],sql ,& utility_errmsg )&& !ignoreError )
2213
2249
{
2214
- // errorMsg = "Failed to run command at node %d";
2215
- // XXX: add check for our node
2216
- errorMsg = utility_errmsg ;
2250
+ if (i + 1 == MtmNodeId )
2251
+ errorMsg = utility_errmsg ;
2252
+ else
2253
+ errorMsg = "Failed to run command at node %d" ;
2217
2254
2218
2255
failedNode = i ;
2219
2256
break ;
@@ -2418,6 +2455,23 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2418
2455
skipCommand = stmt -> relation -> relpersistence == RELPERSISTENCE_TEMP ;
2419
2456
}
2420
2457
break ;
2458
+ case T_IndexStmt :
2459
+ {
2460
+ Oid relid ;
2461
+ Relation rel ;
2462
+ IndexStmt * stmt = (IndexStmt * )parsetree ;
2463
+ bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL );
2464
+
2465
+ if (stmt -> concurrent )
2466
+ PreventTransactionChain (isTopLevel ,
2467
+ "CREATE INDEX CONCURRENTLY" );
2468
+
2469
+ relid = RelnameGetRelid (stmt -> relation -> relname );
2470
+ rel = heap_open (relid ,ShareLock );
2471
+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2472
+ heap_close (rel ,NoLock );
2473
+ }
2474
+ break ;
2421
2475
default :
2422
2476
skipCommand = false;
2423
2477
break ;