48
48
#include "replication/slot.h"
49
49
#include "port/atomics.h"
50
50
#include "tcop/utility.h"
51
+ #include "nodes/makefuncs.h"
52
+ #include "access/htup_details.h"
53
+ #include "catalog/indexing.h"
51
54
52
55
#include "multimaster.h"
53
56
@@ -863,7 +866,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
863
866
dtm -> nNodes -= 1 ;
864
867
if (!IsTransactionBlock ())
865
868
{
866
- MtmBroadcastUtilityStmt (psprintf ("selectmtm_drop_node (%d,%s)" ,nodeId ,dropSlot ?"true" :"false" ), true);
869
+ MtmBroadcastUtilityStmt (psprintf ("selectmultimaster.drop_node (%d,%s)" ,nodeId ,dropSlot ?"true" :"false" ), true);
867
870
}
868
871
if (dropSlot )
869
872
{
@@ -878,7 +881,7 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
878
881
{
879
882
PG_RETURN_INT64 (dtmTx .snapshot );
880
883
}
881
-
884
+
882
885
/*
883
886
* Execute statement with specified parameters and check its result
884
887
*/
@@ -924,7 +927,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
924
927
failedNode = i ;
925
928
do {
926
929
PQfinish (conns [i ]);
927
- }while (-- i >=0 );
930
+ }while (-- i >=0 );
928
931
elog (ERROR ,"Failed to establish connection '%s' to node %d" ,conn_str ,failedNode );
929
932
}
930
933
}
@@ -933,7 +936,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
933
936
i += 1 ;
934
937
}
935
938
Assert (i == MtmNodes );
936
-
939
+
937
940
for (i = 0 ;i < MtmNodes ;i ++ )
938
941
{
939
942
if (conns [i ])
@@ -970,7 +973,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
970
973
failedNode = i ;
971
974
}
972
975
}
973
- }
976
+ }
974
977
for (i = 0 ;i < MtmNodes ;i ++ )
975
978
{
976
979
if (conns [i ])
@@ -984,6 +987,48 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
984
987
}
985
988
}
986
989
990
+ static void MtmProcessDDLCommand (char const * queryString )
991
+ {
992
+ RangeVar * rv ;
993
+ Relation rel ;
994
+ TupleDesc tupDesc ;
995
+ HeapTuple tup ;
996
+ Datum values [Natts_mtm_ddl_log ];
997
+ bool nulls [Natts_mtm_ddl_log ];
998
+ TimestampTz ts = GetCurrentTimestamp ();
999
+
1000
+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME ,MULTIMASTER_DDL_TABLE ,-1 );
1001
+ rel = heap_openrv_extended (rv ,RowExclusiveLock , true);
1002
+
1003
+ if (rel == NULL ) {
1004
+ return ;
1005
+ }
1006
+
1007
+ tupDesc = RelationGetDescr (rel );
1008
+
1009
+ /* Form a tuple. */
1010
+ memset (nulls , false,sizeof (nulls ));
1011
+
1012
+ values [Anum_mtm_ddl_log_issued - 1 ]= TimestampTzGetDatum (ts );
1013
+ values [Anum_mtm_ddl_log_query - 1 ]= CStringGetTextDatum (queryString );
1014
+
1015
+ tup = heap_form_tuple (tupDesc ,values ,nulls );
1016
+
1017
+ /* Insert the tuple to the catalog. */
1018
+ simple_heap_insert (rel ,tup );
1019
+
1020
+ /* Update the indexes. */
1021
+ CatalogUpdateIndexes (rel ,tup );
1022
+
1023
+ /* Cleanup. */
1024
+ heap_freetuple (tup );
1025
+ heap_close (rel ,RowExclusiveLock );
1026
+
1027
+ elog (WARNING ,"Replicate command: '%s'" ,queryString );
1028
+
1029
+ dtmTx .containsDML = true;
1030
+ }
1031
+
987
1032
static void MtmProcessUtility (Node * parsetree ,const char * queryString ,
988
1033
ProcessUtilityContext context ,ParamListInfo params ,
989
1034
DestReceiver * dest ,char * completionTag )
@@ -1011,22 +1056,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
1011
1056
skipCommand = false;
1012
1057
break ;
1013
1058
}
1014
- if (skipCommand || IsTransactionBlock ()) {
1015
- if (PreviousProcessUtilityHook != NULL )
1016
- {
1017
- PreviousProcessUtilityHook (parsetree ,queryString ,context ,
1018
- params ,dest ,completionTag );
1019
- }
1020
- else
1021
- {
1022
- standard_ProcessUtility (parsetree ,queryString ,context ,
1023
- params ,dest ,completionTag );
1024
- }
1025
- if (!skipCommand ) {
1026
- dtmTx .isDistributed = false;
1027
- }
1028
- }else {
1029
- MtmBroadcastUtilityStmt (queryString , false);
1059
+ if (!skipCommand && !dtmTx .isReplicated ) {
1060
+ MtmProcessDDLCommand (queryString );
1061
+ }
1062
+ if (PreviousProcessUtilityHook != NULL )
1063
+ {
1064
+ PreviousProcessUtilityHook (parsetree ,queryString ,context ,
1065
+ params ,dest ,completionTag );
1066
+ }
1067
+ else
1068
+ {
1069
+ standard_ProcessUtility (parsetree ,queryString ,context ,
1070
+ params ,dest ,completionTag );
1030
1071
}
1031
1072
}
1032
1073