56
56
#include "catalog/indexing.h"
57
57
#include "catalog/namespace.h"
58
58
#include "pglogical_output/hooks.h"
59
+ #include "parser/analyze.h"
60
+ #include "parser/parse_relation.h"
59
61
60
62
#include "multimaster.h"
61
63
#include "ddd.h"
@@ -148,6 +150,7 @@ static void MtmShmemStartup(void);
148
150
static BgwPool * MtmPoolConstructor (void );
149
151
static bool MtmRunUtilityStmt (PGconn * conn ,char const * sql ,char * * errmsg );
150
152
static void MtmBroadcastUtilityStmt (char const * sql ,bool ignoreError );
153
+ static bool MtmProcessDDLCommand (char const * queryString );
151
154
152
155
MtmState * Mtm ;
153
156
@@ -176,7 +179,8 @@ static TransactionManager MtmTM = {
176
179
MtmGetTransactionStateSize ,
177
180
MtmSerializeTransactionState ,
178
181
MtmDeserializeTransactionState ,
179
- MtmInitializeSequence
182
+ // MtmInitializeSequence
183
+ PgInitializeSequence
180
184
};
181
185
182
186
char const * const MtmNodeStatusMnem []=
@@ -208,6 +212,8 @@ int MtmHeartbeatRecvTimeout;
208
212
bool MtmUseRaftable ;
209
213
bool MtmUseDtm ;
210
214
215
+ // static int reset_wrokers = 0;
216
+
211
217
static char * MtmConnStrs ;
212
218
static int MtmQueueSize ;
213
219
static int MtmWorkers ;
@@ -229,8 +235,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
229
235
ProcessUtilityContext context ,ParamListInfo params ,
230
236
DestReceiver * dest ,char * completionTag );
231
237
232
- static StringInfo MtmGUCBuffer ;
233
- static bool MtmGUCBufferAllocated = false;
238
+ // static StringInfoMtmGUCBuffer;
239
+ // static boolMtmGUCBufferAllocated = false;
234
240
235
241
/*
236
242
* -------------------------------------------
@@ -615,7 +621,7 @@ MtmXactCallback(XactEvent event, void *arg)
615
621
{
616
622
switch (event )
617
623
{
618
- case XACT_EVENT_START :
624
+ case XACT_EVENT_START :
619
625
MtmBeginTransaction (& MtmTx );
620
626
break ;
621
627
case XACT_EVENT_PRE_PREPARE :
@@ -1150,8 +1156,8 @@ void MtmHandleApplyError(void)
1150
1156
case ERRCODE_OUT_OF_MEMORY :
1151
1157
elog (WARNING ,"Node is excluded from cluster because of non-recoverable error %d, %s, pid=%u" ,
1152
1158
edata -> sqlerrcode ,edata -> message ,getpid ());
1153
- MtmSwitchClusterMode (MTM_OUT_OF_SERVICE );
1154
- kill (PostmasterPid ,SIGQUIT );
1159
+ // MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1160
+ // kill(PostmasterPid, SIGQUIT);
1155
1161
break ;
1156
1162
}
1157
1163
FreeErrorData (edata );
@@ -2913,13 +2919,13 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2913
2919
{
2914
2920
if (conns [i ])
2915
2921
{
2916
- if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ],MtmGUCBuffer -> data ,& utility_errmsg )&& !ignoreError )
2917
- {
2918
- errorMsg = "Failed to set GUC variables at node %d" ;
2919
- elog (WARNING ,"%s" ,utility_errmsg );
2920
- failedNode = i ;
2921
- break ;
2922
- }
2922
+ // if (MtmGUCBufferAllocated && !MtmRunUtilityStmt(conns[i], MtmGUCBuffer->data, &utility_errmsg) && !ignoreError)
2923
+ // {
2924
+ // errorMsg = "Failed to set GUC variables at node %d";
2925
+ // elog(WARNING, "%s", utility_errmsg);
2926
+ // failedNode = i;
2927
+ // break;
2928
+ // }
2923
2929
if (!MtmRunUtilityStmt (conns [i ],"BEGIN TRANSACTION" ,& utility_errmsg )&& !ignoreError )
2924
2930
{
2925
2931
errorMsg = "Failed to start transaction at node %d" ;
@@ -2983,7 +2989,7 @@ static bool MtmProcessDDLCommand(char const* queryString)
2983
2989
bool nulls [Natts_mtm_ddl_log ];
2984
2990
TimestampTz ts = GetCurrentTimestamp ();
2985
2991
2986
- rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME ,MULTIMASTER_DDL_TABLE ,-1 );
2992
+ rv = makeRangeVar ("public" ,MULTIMASTER_DDL_TABLE ,-1 );
2987
2993
rel = heap_openrv_extended (rv ,RowExclusiveLock , true);
2988
2994
2989
2995
if (rel == NULL ) {
@@ -3120,18 +3126,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3120
3126
break ;
3121
3127
case T_DiscardStmt :
3122
3128
{
3123
- // DiscardStmt *stmt = (DiscardStmt *) parsetree;
3124
- // skipCommand = stmt->target == DISCARD_TEMP;
3129
+ DiscardStmt * stmt = (DiscardStmt * )parsetree ;
3130
+ skipCommand = stmt -> target == DISCARD_TEMP ;
3125
3131
3126
- skipCommand = true;
3132
+ // skipCommand = true;
3127
3133
3128
- if (MtmGUCBufferAllocated )
3129
- {
3130
- // XXX: move allocation somewhere to backend startup and check
3131
- // where buffer is empty in send routines.
3132
- MtmGUCBufferAllocated = false;
3133
- pfree (MtmGUCBuffer );
3134
- }
3134
+ // if (MtmGUCBufferAllocated)
3135
+ // {
3136
+ // // XXX: move allocation somewhere to backend startup and check
3137
+ // // where buffer is empty in send routines.
3138
+ // MtmGUCBufferAllocated = false;
3139
+ // pfree(MtmGUCBuffer);
3140
+ // }
3135
3141
3136
3142
}
3137
3143
break ;
@@ -3143,22 +3149,31 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3143
3149
3144
3150
/* Prevent SET TRANSACTION from replication */
3145
3151
if (stmt -> kind == VAR_SET_MULTI )
3146
- break ;
3152
+ // break;
3153
+ skipCommand = true;
3147
3154
3148
- if (!MtmGUCBufferAllocated )
3149
- {
3150
- MemoryContext oldcontext ;
3155
+ // if (!MtmGUCBufferAllocated)
3156
+ // {
3157
+ // MemoryContext oldcontext;
3151
3158
3152
- oldcontext = MemoryContextSwitchTo (TopMemoryContext );
3153
- MtmGUCBuffer = makeStringInfo ();
3154
- MemoryContextSwitchTo (oldcontext );
3155
- MtmGUCBufferAllocated = true;
3156
- }
3159
+ // oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3160
+ // MtmGUCBuffer = makeStringInfo();
3161
+ // MemoryContextSwitchTo(oldcontext);
3162
+ // MtmGUCBufferAllocated = true;
3163
+ // }
3157
3164
3158
- appendStringInfoString (MtmGUCBuffer ,queryString );
3165
+ // appendStringInfoString(MtmGUCBuffer, queryString);
3159
3166
3160
3167
// sometimes there is no ';' char at the end.
3161
- appendStringInfoString (MtmGUCBuffer ,";" );
3168
+ // appendStringInfoString(MtmGUCBuffer, ";");
3169
+ }
3170
+ break ;
3171
+ case T_CreateTableAsStmt :
3172
+ {
3173
+ /* Do not replicate temp tables */
3174
+ CreateTableAsStmt * stmt = (CreateTableAsStmt * )parsetree ;
3175
+ skipCommand = stmt -> into -> rel -> relpersistence == RELPERSISTENCE_TEMP ||
3176
+ (stmt -> into -> rel -> schemaname && strcmp (stmt -> into -> rel -> schemaname ,"pg_temp" )== 0 );
3162
3177
}
3163
3178
break ;
3164
3179
case T_CreateStmt :
@@ -3169,6 +3184,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3169
3184
(stmt -> relation -> schemaname && strcmp (stmt -> relation -> schemaname ,"pg_temp" )== 0 );
3170
3185
}
3171
3186
break ;
3187
+ case T_ViewStmt :
3188
+ {
3189
+ ViewStmt * stmt = (ViewStmt * )parsetree ;
3190
+ Query * viewParse ;
3191
+
3192
+ viewParse = parse_analyze ((Node * )copyObject (stmt -> query ),
3193
+ queryString ,NULL ,0 );
3194
+ skipCommand = isQueryUsingTempRelation (viewParse );
3195
+ // ||
3196
+ // (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3197
+ }
3198
+ break ;
3172
3199
case T_IndexStmt :
3173
3200
{
3174
3201
Oid relid ;
@@ -3207,6 +3234,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
3207
3234
heap_close (rel ,ShareLock );
3208
3235
}
3209
3236
}
3237
+ else if (stmt -> removeType == OBJECT_INDEX )
3238
+ {
3239
+ RangeVar * rv = makeRangeVarFromNameList (
3240
+ (List * )lfirst (list_head (stmt -> objects )));
3241
+ Oid relid = RelnameGetRelid (rv -> relname );
3242
+
3243
+ if (OidIsValid (relid ))
3244
+ {
3245
+ Relation irel = index_open (relid ,ShareLock );
3246
+ skipCommand = irel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
3247
+ index_close (irel ,ShareLock );
3248
+ }
3249
+ }
3210
3250
}
3211
3251
break ;
3212
3252
case T_CopyStmt :