Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfc033ef

Browse files
knizhnikkelvich
authored andcommitted
Support concurrent messages: VACUUM and CREATE INDEX CONCURRENTLY
1 parent85f9100 commitfc033ef

File tree

4 files changed

+93
-27
lines changed

4 files changed

+93
-27
lines changed

‎multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bo
159159

160160
MtmState*Mtm;
161161

162+
VacuumStmt*MtmVacuumStmt;
163+
IndexStmt*MtmIndexStmt;
164+
MemoryContextMtmApplyContext;
165+
162166
HTAB*MtmXid2State;
163167
HTAB*MtmGid2State;
164168
staticHTAB*MtmLocalTables;
@@ -3874,8 +3878,8 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
38743878
}
38753879

38763880
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
3877-
ProcessUtilityContextcontext,ParamListInfoparams,
3878-
DestReceiver*dest,char*completionTag)
3881+
ProcessUtilityContextcontext,ParamListInfoparams,
3882+
DestReceiver*dest,char*completionTag)
38793883
{
38803884
boolskipCommand= false;
38813885
boolexecuted= false;
@@ -3936,11 +3940,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39363940
break;
39373941

39383942
caseT_VacuumStmt:
3939-
context=PROCESS_UTILITY_TOPLEVEL;
3940-
MtmProcessDDLCommand(queryString, false, true);
3941-
MtmTx.isDistributed= false;
3942-
skipCommand= true;
3943-
break;
3943+
if (context==PROCESS_UTILITY_TOPLEVEL) {
3944+
MtmProcessDDLCommand(queryString, false, true);
3945+
MtmTx.isDistributed= false;
3946+
skipCommand= true;
3947+
break;
3948+
}else {
3949+
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
3950+
Assert(oldContext!=MtmApplyContext);
3951+
MtmVacuumStmt= (VacuumStmt*)copyObject(parsetree);
3952+
MemoryContextSwitchTo(oldContext);
3953+
return;
3954+
}
39443955

39453956
caseT_CreateDomainStmt:
39463957
/* Detect temp tables access */
@@ -4031,11 +4042,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
40314042
caseT_IndexStmt:
40324043
{
40334044
IndexStmt*indexStmt= (IndexStmt*)parsetree;
4034-
if (indexStmt->concurrent&& !IsTransactionBlock()&& !MtmTx.isReplicated)
4045+
if (indexStmt->concurrent)
40354046
{
4036-
skipCommand= true;
4037-
MtmProcessDDLCommand(queryString, false, false);
4038-
MtmTx.isDistributed= false;
4047+
if (context==PROCESS_UTILITY_TOPLEVEL) {
4048+
MtmProcessDDLCommand(queryString, false, true);
4049+
MtmTx.isDistributed= false;
4050+
skipCommand= true;
4051+
}else {
4052+
MemoryContextoldContext=MemoryContextSwitchTo(MtmApplyContext);
4053+
Assert(oldContext!=MtmApplyContext);
4054+
MtmIndexStmt= (IndexStmt*)copyObject(parsetree);
4055+
MemoryContextSwitchTo(oldContext);
4056+
return;
4057+
}
40394058
}
40404059
}
40414060
break;

‎multimaster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include"access/clog.h"
99
#include"pglogical_output/hooks.h"
10+
#include"commands/vacuum.h"
1011
#include"libpq-fe.h"
1112

1213
#defineDEBUG_LEVEL 0
@@ -301,6 +302,9 @@ extern bool MtmUseDtm;
301302
externboolMtmPreserveCommitOrder;
302303
externHTAB*MtmXid2State;
303304
externHTAB*MtmGid2State;
305+
externVacuumStmt*MtmVacuumStmt;
306+
externIndexStmt*MtmIndexStmt;
307+
externMemoryContextMtmApplyContext;
304308

305309
externvoidMtmArbiterInitialize(void);
306310
externvoidMtmStartReceivers(void);

‎pglogical_apply.c

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
#include"catalog/pg_type.h"
1919

2020
#include"executor/spi.h"
21+
#include"commands/vacuum.h"
22+
#include"commands/defrem.h"
23+
#include"parser/parse_utilcmd.h"
2124

2225
#include"libpq/pqformat.h"
2326

@@ -70,7 +73,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7073
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7174

7275
staticboolprocess_remote_begin(StringInfos);
73-
staticvoidprocess_remote_message(StringInfos);
76+
staticboolprocess_remote_message(StringInfos);
7477
staticvoidprocess_remote_commit(StringInfos);
7578
staticvoidprocess_remote_insert(StringInfos,Relationrel);
7679
staticvoidprocess_remote_update(StringInfos,Relationrel);
@@ -353,20 +356,22 @@ process_remote_begin(StringInfo s)
353356
return true;
354357
}
355358

356-
staticvoid
359+
staticbool
357360
process_remote_message(StringInfos)
358361
{
359362
charaction=pq_getmsgbyte(s);
360363
intmessageSize=pq_getmsgint(s,4);
361364
charconst*messageBody=pq_getmsgbytes(s,messageSize);
362-
365+
boolstandalone= false;
366+
363367
switch (action)
364368
{
365369
case'C':
366370
{
367371
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);
368372
SetCurrentStatementStartTimestamp();
369373
StartTransactionCommand();
374+
standalone= true;
370375
/* intentional falldown to the next case */
371376
}
372377
case'D':
@@ -376,21 +381,59 @@ process_remote_message(StringInfo s)
376381
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,messageBody);
377382
SPI_connect();
378383
ActivePortal->sourceText=messageBody;
384+
MtmVacuumStmt=NULL;
385+
MtmIndexStmt=NULL;
379386
rc=SPI_execute(messageBody, false,0);
380387
SPI_finish();
381-
if (rc<0)
388+
if (rc<0) {
382389
elog(ERROR,"Failed to execute utility statement %s",messageBody);
390+
}else {
391+
if (MtmVacuumStmt!=NULL) {
392+
ExecVacuum(MtmVacuumStmt,1);
393+
}elseif (MtmIndexStmt!=NULL) {
394+
MemoryContextsaveCtx=TopTransactionContext;
395+
Oidrelid;
396+
397+
TopTransactionContext=MtmApplyContext;
398+
relid=RangeVarGetRelidExtended(MtmIndexStmt->relation,ShareUpdateExclusiveLock,
399+
false, false,
400+
NULL,
401+
NULL);
402+
403+
/* Run parse analysis ... */
404+
MtmIndexStmt=transformIndexStmt(relid,MtmIndexStmt,messageBody);
405+
406+
PushActiveSnapshot(GetTransactionSnapshot());
407+
408+
DefineIndex(relid,/* OID of heap relation */
409+
MtmIndexStmt,
410+
InvalidOid,/* no predefined OID */
411+
false,/* is_alter_table */
412+
true,/* check_rights */
413+
false,/* skip_build */
414+
false);/* quiet */
415+
416+
TopTransactionContext=saveCtx;
417+
418+
if (ActiveSnapshotSet())
419+
PopActiveSnapshot();
420+
421+
}
422+
}
423+
if (standalone) {
424+
CommitTransactionCommand();
425+
}
383426
break;
384427
}
385428
case'L':
386429
{
387430
MTM_LOG3("%ld: Process deadlock message with size %d from %d",MtmGetSystemTime(),messageSize,MtmReplicationNodeId);
388431
MtmUpdateLockGraph(MtmReplicationNodeId,messageBody,messageSize);
432+
standalone= true;
389433
break;
390434
}
391435
}
392-
393-
436+
returnstandalone;
394437
}
395438

396439
staticvoid
@@ -968,8 +1011,6 @@ process_remote_delete(StringInfo s, Relation rel)
9681011
CommandCounterIncrement();
9691012
}
9701013

971-
staticMemoryContextApplyContext;
972-
9731014
voidMtmExecutor(void*work,size_tsize)
9741015
{
9751016
StringInfoDatas;
@@ -982,14 +1023,14 @@ void MtmExecutor(void* work, size_t size)
9821023
s.maxlen=-1;
9831024
s.cursor=0;
9841025

985-
if (ApplyContext==NULL) {
986-
ApplyContext=AllocSetContextCreate(TopMemoryContext,
1026+
if (MtmApplyContext==NULL) {
1027+
MtmApplyContext=AllocSetContextCreate(TopMemoryContext,
9871028
"MessageContext",
9881029
ALLOCSET_DEFAULT_MINSIZE,
9891030
ALLOCSET_DEFAULT_INITSIZE,
9901031
ALLOCSET_DEFAULT_MAXSIZE);
9911032
}
992-
MemoryContextSwitchTo(ApplyContext);
1033+
MemoryContextSwitchTo(MtmApplyContext);
9931034
replorigin_session_origin=InvalidRepOriginId;
9941035
PG_TRY();
9951036
{
@@ -1058,7 +1099,9 @@ void MtmExecutor(void* work, size_t size)
10581099
}
10591100
case'M':
10601101
{
1061-
process_remote_message(&s);
1102+
if (process_remote_message(&s)) {
1103+
break;
1104+
}
10621105
continue;
10631106
}
10641107
default:
@@ -1069,7 +1112,7 @@ void MtmExecutor(void* work, size_t size)
10691112
}
10701113
PG_CATCH();
10711114
{
1072-
MemoryContextoldcontext=MemoryContextSwitchTo(ApplyContext);
1115+
MemoryContextoldcontext=MemoryContextSwitchTo(MtmApplyContext);
10731116
MtmHandleApplyError();
10741117
MemoryContextSwitchTo(oldcontext);
10751118
EmitErrorReport();
@@ -1083,6 +1126,6 @@ void MtmExecutor(void* work, size_t size)
10831126
if (spill_file >=0) {
10841127
MtmCloseSpillFile(spill_file);
10851128
}
1086-
MemoryContextResetAndDeleteChildren(ApplyContext);
1129+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
10871130
}
10881131

‎pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ pglogical_receiver_main(Datum main_arg)
529529
MTM_LOG3("Process deadlock message from %d",nodeId);
530530
MtmExecutor(stmt,rc-hdr_len);
531531
}elseif (stmt[0]=='M'&&stmt[1]=='C') {
532-
MTM_LOG3("Process concurrent DDL message from %d",nodeId);
533-
MtmExecutor(stmt,rc-hdr_len);
532+
MTM_LOG1("Process concurrent DDL message from %d",nodeId);
533+
MtmExecute(stmt,rc-hdr_len);
534534
}else {
535535
ByteBufferAppend(&buf,stmt,rc-hdr_len);
536536
if (stmt[0]=='C')/* commit */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp