Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita7c4878

Browse files
committed
trying to send utility_stmts on PrePrepare
1 parent41c36e3 commita7c4878

File tree

3 files changed

+60
-22
lines changed

3 files changed

+60
-22
lines changed

‎bgwpool.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ static void BgwPoolMainLoop(Datum arg)
2525
void*work;
2626

2727
BackgroundWorkerUnblockSignals();
28-
BackgroundWorkerInitializeConnection(pool->dbname,NULL);
28+
BackgroundWorkerInitializeConnection(pool->dbname,"stas");
2929

3030
while(true) {
3131
PGSemaphoreLock(&pool->available);
@@ -98,7 +98,7 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
9898
worker.bgw_start_time=BgWorkerStart_ConsistentState;
9999
worker.bgw_main=BgwPoolMainLoop;
100100
worker.bgw_restart_time=MULTIMASTER_BGW_RESTART_TIMEOUT;
101-
101+
102102
for (i=0;i<nWorkers;i++) {
103103
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
104104
snprintf(worker.bgw_name,BGW_MAXLEN,"bgw_pool_worker_%d",i+1);

‎multimaster.c

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ char const* const MtmNodeStatusMnem[] =
197197

198198
boolMtmDoReplication;
199199
char*MtmDatabaseName;
200+
char*MtmUtilityStmt=NULL;
200201

201202
intMtmNodes;
202203
intMtmNodeId;
@@ -213,8 +214,6 @@ int MtmHeartbeatRecvTimeout;
213214
boolMtmUseRaftable;
214215
boolMtmUseDtm;
215216

216-
// static int reset_wrokers = 0;
217-
218217
staticchar*MtmConnStrs;
219218
staticintMtmQueueSize;
220219
staticintMtmWorkers;
@@ -693,6 +692,10 @@ static const char* const isoLevelStr[] =
693692
staticvoid
694693
MtmBeginTransaction(MtmCurrentTrans*x)
695694
{
695+
if (MtmUtilityStmt)
696+
pfree(MtmUtilityStmt);
697+
MtmUtilityStmt=NULL;
698+
696699
if (x->snapshot==INVALID_CSN) {
697700
TransactionIdxmin= (Mtm->gcCount >=MtmGcPeriod) ?PgGetOldestXmin(NULL, false) :InvalidTransactionId;/* Get oldest xmin outside critical section */
698701

@@ -3087,7 +3090,14 @@ MtmGenerateGid(char* gid)
30873090

30883091
staticboolMtmTwoPhaseCommit(MtmCurrentTrans*x)
30893092
{
3090-
if (x->isDistributed&&x->containsDML) {
3093+
if (MtmUtilityStmt&& !MyXactAccessedTempRel)
3094+
{
3095+
MtmProcessDDLCommand(MtmUtilityStmt);
3096+
pfree(MtmUtilityStmt);
3097+
MtmUtilityStmt=NULL;
3098+
}
3099+
3100+
if (!x->isReplicated&& (x->isDistributed&&x->containsDML)) {
30913101
MtmGenerateGid(x->gid);
30923102
if (!x->isTransactionBlock) {
30933103
BeginTransactionBlock();
@@ -3118,6 +3128,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31183128
DestReceiver*dest,char*completionTag)
31193129
{
31203130
boolskipCommand= false;
3131+
3132+
// skipCommand = MyXactAccessedTempRel;
3133+
31213134
MTM_LOG3("%d: Process utility statement %s",MyProcPid,queryString);
31223135
switch (nodeTag(parsetree))
31233136
{
@@ -3198,12 +3211,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31983211
}
31993212
break;
32003213
caseT_CreateTableAsStmt:
3201-
{
3202-
/* Do not replicate temp tables */
3203-
CreateTableAsStmt*stmt= (CreateTableAsStmt*)parsetree;
3204-
skipCommand=stmt->into->rel->relpersistence==RELPERSISTENCE_TEMP||
3205-
(stmt->into->rel->schemaname&&strcmp(stmt->into->rel->schemaname,"pg_temp")==0);
3206-
}
3214+
//{
3215+
///* Do not replicate temp tables */
3216+
//CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3217+
//skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3218+
//(stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3219+
//}
32073220
break;
32083221
caseT_CreateStmt:
32093222
{
@@ -3306,11 +3319,26 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
33063319
skipCommand= false;
33073320
break;
33083321
}
3309-
if (!skipCommand&& !MtmTx.isReplicated&&context==PROCESS_UTILITY_TOPLEVEL) {
3310-
if (MtmProcessDDLCommand(queryString)) {
3311-
return;
3322+
if (context==PROCESS_UTILITY_TOPLEVEL)
3323+
{
3324+
if (!skipCommand&& !MtmTx.isReplicated) {
3325+
// if (MtmProcessDDLCommand(queryString)) {
3326+
// return;
3327+
// }
3328+
3329+
MemoryContextoldcontext;
3330+
3331+
if (MtmUtilityStmt)
3332+
pfree(MtmUtilityStmt);
3333+
3334+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
3335+
MtmUtilityStmt=palloc(strlen(queryString)+1);
3336+
MemoryContextSwitchTo(oldcontext);
3337+
3338+
strncpy(MtmUtilityStmt,queryString,strlen(queryString)+1);
33123339
}
33133340
}
3341+
33143342
if (PreviousProcessUtilityHook!=NULL)
33153343
{
33163344
PreviousProcessUtilityHook(parsetree,queryString,context,

‎pglogical_receiver.c

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -468,14 +468,24 @@ pglogical_receiver_main(Datum main_arg)
468468
{
469469
stmt=copybuf+hdr_len;
470470

471-
if (buf.used >=MtmTransSpillThreshold*MB) {
472-
if (spill_file<0) {
473-
intfile_id;
474-
spill_file=MtmCreateSpillFile(nodeId,&file_id);
475-
pq_sendbyte(&spill_info,'F');
476-
pq_sendint(&spill_info,nodeId,4);
477-
pq_sendint(&spill_info,file_id,4);
478-
}
471+
if (buf.used >=MtmTransSpillThreshold*MB) {
472+
if (spill_file<0) {
473+
intfile_id;
474+
spill_file=MtmCreateSpillFile(nodeId,&file_id);
475+
pq_sendbyte(&spill_info,'F');
476+
pq_sendint(&spill_info,nodeId,4);
477+
pq_sendint(&spill_info,file_id,4);
478+
}
479+
ByteBufferAppend(&buf,")",1);
480+
pq_sendbyte(&spill_info,'(');
481+
pq_sendint(&spill_info,buf.used,4);
482+
MtmSpillToFile(spill_file,buf.data,buf.used);
483+
ByteBufferReset(&buf);
484+
}
485+
ByteBufferAppend(&buf,stmt,rc-hdr_len);
486+
if (stmt[0]=='C')/* commit|prepare */
487+
{
488+
if (spill_file >=0) {
479489
ByteBufferAppend(&buf,")",1);
480490
pq_sendbyte(&spill_info,'(');
481491
pq_sendint(&spill_info,buf.used,4);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp