Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit86d25e9

Browse files
committed
check for temp tables access in the end of MtmProcessUtility()
1 parent3754bb9 commit86d25e9

File tree

3 files changed

+76
-28
lines changed

3 files changed

+76
-28
lines changed

‎multimaster.c

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ bool MtmUseRaftable;
209209
boolMtmUseDtm;
210210
boolMtmVolksWagenMode;
211211

212+
TransactionIdMtmUtilityProcessedInXid;
213+
212214
staticchar*MtmConnStrs;
213215
staticintMtmQueueSize;
214216
staticintMtmWorkers;
@@ -687,7 +689,12 @@ MtmXactCallback(XactEvent event, void *arg)
687689
staticbool
688690
MtmIsUserTransaction()
689691
{
690-
return !IsAutoVacuumLauncherProcess()&&IsNormalProcessingMode()&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
692+
return !IsAutoVacuumLauncherProcess()&&
693+
IsNormalProcessingMode()&&
694+
MtmDoReplication&&
695+
!am_walsender&&
696+
!IsBackgroundWorker&&
697+
!IsAutoVacuumWorkerProcess();
691698
}
692699

693700
void
@@ -699,7 +706,6 @@ MtmResetTransaction()
699706
x->gtid.xid=InvalidTransactionId;
700707
x->isDistributed= false;
701708
x->isPrepared= false;
702-
x->isPrepared= false;
703709
x->status=TRANSACTION_STATUS_UNKNOWN;
704710
}
705711

@@ -3334,20 +3340,20 @@ MtmGenerateGid(char* gid)
33343340

33353341
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
33363342
{
3337-
if (MyXactAccessedTempRel)
3338-
{
3339-
/*
3340-
* XXX: this tx anyway goes to subscribers later, but without
3341-
* surrounding begin/commit. Now it will be filtered out on receiver side.
3342-
* Probably there is more clever way to do that.
3343-
*/
3344-
x->isDistributed= false;
3345-
if (!MtmVolksWagenMode)
3346-
elog(NOTICE,"MTM: Transaction was not replicated as it accesed temporary relation");
3347-
return false;
3348-
}
3349-
3350-
if (!x->isReplicated&&(x->isDistributed&&x->containsDML)) {
3343+
//if (MyXactAccessedTempRel)
3344+
//{
3345+
///*
3346+
// * XXX: this tx anyway goes to subscribers later, but without
3347+
// * surrounding begin/commit. Now it will be filtered out on receiver side.
3348+
// * Probably there is more clever way to do that.
3349+
// */
3350+
//x->isDistributed = false;
3351+
//if (!MtmVolksWagenMode)
3352+
//elog(NOTICE, "MTM: Transaction was not replicated as it accesed temporary relation");
3353+
//return false;
3354+
//}
3355+
3356+
if (!x->isReplicated&&x->isDistributed&&x->containsDML) {
33513357
MtmGenerateGid(x->gid);
33523358
if (!x->isTransactionBlock) {
33533359
BeginTransactionBlock();
@@ -3357,7 +3363,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
33573363
}
33583364
if (!PrepareTransactionBlock(x->gid))
33593365
{
3360-
elog(WARNING,"Failed to prepare transaction %s",x->gid);
3366+
if (!MtmVolksWagenMode)
3367+
elog(WARNING,"Failed to prepare transaction %s",x->gid);
33613368
/* ??? Should we do explicit rollback */
33623369
}else {
33633370
CommitTransactionCommand();
@@ -3550,6 +3557,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35503557
switch (stmt->kind)
35513558
{
35523559
caseTRANS_STMT_BEGIN:
3560+
caseTRANS_STMT_START:
35533561
MtmTx.isTransactionBlock= true;
35543562
break;
35553563
caseTRANS_STMT_COMMIT:
@@ -3586,16 +3594,34 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
35863594
caseT_LoadStmt:
35873595
caseT_ClusterStmt:
35883596
caseT_VacuumStmt:
3589-
caseT_ExplainStmt:
35903597
caseT_VariableShowStmt:
35913598
caseT_ReassignOwnedStmt:
35923599
caseT_LockStmt:
35933600
caseT_CheckPointStmt:
35943601
caseT_ReindexStmt:
3595-
caseT_RefreshMatViewStmt:
35963602
skipCommand= true;
35973603
break;
35983604

3605+
caseT_ExplainStmt:
3606+
/*
3607+
* EXPLAIN ANALYZE can create side-effects.
3608+
* Better to catch that by some general mechanism of detecting
3609+
* catalog and heap writes.
3610+
*/
3611+
{
3612+
ExplainStmt*stmt= (ExplainStmt*)parsetree;
3613+
ListCell*lc;
3614+
3615+
skipCommand= true;
3616+
foreach(lc,stmt->options)
3617+
{
3618+
DefElem*opt= (DefElem*)lfirst(lc);
3619+
if (strcmp(opt->defname,"analyze")==0)
3620+
skipCommand= false;
3621+
}
3622+
}
3623+
break;
3624+
35993625
/* Save GUC context for consequent DDL execution */
36003626
caseT_DiscardStmt:
36013627
{
@@ -3656,12 +3682,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36563682
break;
36573683
}
36583684

3659-
if (context==PROCESS_UTILITY_TOPLEVEL)// || context == PROCESS_UTILITY_QUERY)
3685+
/* XXX: dirty. Clear on new tx */
3686+
if (!skipCommand&& (context==PROCESS_UTILITY_TOPLEVEL||MtmUtilityProcessedInXid!=GetCurrentTransactionId()))
3687+
MtmUtilityProcessedInXid=InvalidTransactionId;
3688+
3689+
if (context==PROCESS_UTILITY_TOPLEVEL||context==PROCESS_UTILITY_QUERY)
36603690
{
3661-
if (!skipCommand&& !MtmTx.isReplicated) {
3662-
if (MtmProcessDDLCommand(queryString)) {
3663-
return;
3664-
}
3691+
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
3692+
MtmUtilityProcessedInXid=GetCurrentTransactionId();
3693+
MtmProcessDDLCommand(queryString);
36653694
}
36663695
}
36673696

@@ -3675,12 +3704,22 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
36753704
standard_ProcessUtility(parsetree,queryString,context,
36763705
params,dest,completionTag);
36773706
}
3707+
3708+
if (MyXactAccessedTempRel)
3709+
{
3710+
MTM_LOG1("Xact accessed temp table, stopping replication");
3711+
MtmTx.isDistributed= false;/* Skip */
3712+
}
3713+
36783714
}
36793715

36803716

36813717
staticvoid
36823718
MtmExecutorFinish(QueryDesc*queryDesc)
36833719
{
3720+
/*
3721+
* If tx didn't wrote to XLOG then there is nothing to commit on other nodes.
3722+
*/
36843723
if (MtmDoReplication) {
36853724
CmdTypeoperation=queryDesc->operation;
36863725
EState*estate=queryDesc->estate;
@@ -3698,12 +3737,20 @@ MtmExecutorFinish(QueryDesc *queryDesc)
36983737
continue;
36993738
}
37003739
}
3740+
MTM_LOG1("MtmTx.containsDML = true // WAL");
37013741
MtmTx.containsDML= true;
37023742
break;
37033743
}
37043744
}
37053745
}
37063746
}
3747+
3748+
// if (MyXactAccessedRel)
3749+
// {
3750+
// MTM_LOG1("MtmTx.containsDML = true");
3751+
// MtmTx.containsDML = true;
3752+
// }
3753+
37073754
if (PreviousExecutorFinishHook!=NULL)
37083755
{
37093756
PreviousExecutorFinishHook(queryDesc);

‎pglogical_apply.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -686,9 +686,9 @@ process_remote_insert(StringInfo s, Relation rel)
686686
ExecStoreTuple(tup,newslot,InvalidBuffer, true);
687687
}
688688

689-
if (rel->rd_rel->relkind!=RELKIND_RELATION)
690-
elog(ERROR,"unexpected relkind '%c' rel \"%s\"",
691-
rel->rd_rel->relkind,RelationGetRelationName(rel));
689+
//if (rel->rd_rel->relkind != RELKIND_RELATION) // RELKIND_MATVIEW
690+
//elog(ERROR, "unexpected relkind '%c' rel \"%s\"",
691+
// rel->rd_rel->relkind, RelationGetRelationName(rel));
692692

693693
/* debug output */
694694
#ifdefVERBOSE_INSERT
@@ -978,7 +978,7 @@ void MtmExecutor(int id, void* work, size_t size)
978978
{
979979
while (true) {
980980
charaction=pq_getmsgbyte(&s);
981-
MTM_LOG2("%d: REMOTE process action %c",MyProcPid,action);
981+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
982982
#if0
983983
if (Mtm->status==MTM_RECOVERY) {
984984
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);

‎pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
112112

113113
if (!isRecovery&&csn==INVALID_CSN) {
114114
MtmIsFilteredTxn= true;
115+
MTM_LOG1("MtmIsFilteredTxn");
115116
}else {
116117
pq_sendbyte(out,'B');/* BEGIN */
117118
pq_sendint(out,MtmNodeId,4);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp