Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5433e7a

Browse files
committed
filter XLOG records produced by DLL statementsfixes#16
1 parente2424b9 commit5433e7a

File tree

3 files changed

+110
-60
lines changed

3 files changed

+110
-60
lines changed

‎multimaster.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3746,6 +3746,11 @@ static bool MtmProcessDDLCommand(char const* queryString)
37463746
return false;
37473747
}
37483748

3749+
staticvoidMtmFinishDDLCommand()
3750+
{
3751+
LogLogicalMessage("E","",1, true);
3752+
}
3753+
37493754
voidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize)
37503755
{
37513756
intallocated;
@@ -3767,6 +3772,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
37673772
DestReceiver*dest,char*completionTag)
37683773
{
37693774
boolskipCommand= false;
3775+
boolexecuted= false;
37703776

37713777
MTM_LOG3("%d: Process utility statement %s",MyProcPid,queryString);
37723778
switch (nodeTag(parsetree))
@@ -3911,6 +3917,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39113917
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
39123918
MtmUtilityProcessedInXid=GetCurrentTransactionId();
39133919
MtmProcessDDLCommand(queryString);
3920+
executed= true;
39143921
}
39153922
}
39163923

@@ -3932,6 +3939,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39323939
MtmTx.snapshot=INVALID_CSN;
39333940
}
39343941

3942+
if (executed)
3943+
{
3944+
MtmFinishDDLCommand();
3945+
}
39353946
}
39363947

39373948

@@ -3966,12 +3977,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
39663977
}
39673978
}
39683979

3969-
// if (MyXactAccessedRel)
3970-
// {
3971-
// MTM_LOG1("MtmTx.containsDML = true");
3972-
// MtmTx.containsDML = true;
3973-
// }
3974-
39753980
if (PreviousExecutorFinishHook!=NULL)
39763981
{
39773982
PreviousExecutorFinishHook(queryDesc);

‎pglogical_apply.c

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62-
staticboolinside_tx= false;
63-
6462
staticRelationread_rel(StringInfos,LOCKMODEmode);
6563
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
6664
staticEState*create_rel_estate(Relationrel);
@@ -353,8 +351,6 @@ process_remote_begin(StringInfo s)
353351
StartTransactionCommand();
354352
MtmJoinTransaction(&gtid,snapshot);
355353

356-
inside_tx= true;
357-
358354
return true;
359355
}
360356

@@ -364,11 +360,6 @@ process_remote_transactional_message(StringInfo s)
364360
intrc;
365361
intmessageSize=pq_getmsgint(s,4);
366362
charconst*stmt=pq_getmsgbytes(s,messageSize);
367-
if (!inside_tx)
368-
{
369-
MTM_LOG1("%d: Ignoring utility statement %s",MyProcPid,stmt);
370-
return;
371-
}
372363

373364
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
374365
SPI_connect();
@@ -667,7 +658,6 @@ process_remote_commit(StringInfo in)
667658
if (flags&PGLOGICAL_CAUGHT_UP) {
668659
MtmRecoveryCompleted();
669660
}
670-
inside_tx= false;
671661
}
672662

673663
staticvoid
@@ -992,7 +982,7 @@ void MtmExecutor(void* work, size_t size)
992982
{
993983
while (true) {
994984
charaction=pq_getmsgbyte(&s);
995-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
985+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
996986
#if0
997987
if (Mtm->status==MTM_RECOVERY) {
998988
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
@@ -1054,6 +1044,7 @@ void MtmExecutor(void* work, size_t size)
10541044
continue;
10551045
}
10561046
case'G':
1047+
case'E':
10571048
{
10581049
process_remote_transactional_message(&s);
10591050
continue;

‎pglogical_proto.c

Lines changed: 97 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
#include"multimaster.h"
3939
#include"pglogical_relid_map.h"
4040

41-
staticboolMtmIsFilteredTxn;
42-
staticintMtmTransactionRecords;
41+
staticintMtmTransactionRecords;
42+
staticTransactionIdMtmCurrentXid;
43+
staticboolDDLInProress= false;
4344

4445
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4546

@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7475
constchar*relname;
7576
uint8relnamelen;
7677
Oidrelid;
77-
if (MtmIsFilteredTxn) {
78+
79+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN) {
80+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
7881
return;
7982
}
80-
83+
84+
if (DDLInProress) {
85+
MTM_LOG1("%d: pglogical_write_message filtered DDLInProress",MyProcPid);
86+
return;
87+
}
88+
8189
relid=RelationGetRelid(rel);
8290
pq_sendbyte(out,'R');/* sending RELATION */
8391
pq_sendint(out,relid,sizeofrelid);/* use Oid as relation identifier */
@@ -107,30 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107115
{
108116
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
109117
csn_tcsn=MtmTransactionSnapshot(txn->xid);
118+
119+
MtmCurrentXid=txn->xid;
120+
110121
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111122
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
112-
113-
if (!isRecovery&&csn==INVALID_CSN) {
114-
MtmIsFilteredTxn= true;
115-
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
116-
}else {
117-
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
118-
MtmIsFilteredTxn= false;
119-
pq_sendbyte(out,'B');/* BEGIN */
120-
pq_sendint(out,MtmNodeId,4);
121-
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
122-
pq_sendint64(out,csn);
123-
MtmTransactionRecords=0;
124-
}
123+
124+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
125+
pq_sendbyte(out,'B');/* BEGIN */
126+
pq_sendint(out,MtmNodeId,4);
127+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
128+
pq_sendint64(out,csn);
129+
MtmTransactionRecords=0;
125130
}
126131

127132
staticvoid
128133
pglogical_write_message(StringInfoout,
129134
constchar*prefix,Sizesz,constchar*message)
130135
{
131-
if (*prefix=='L') {
136+
if (*prefix=='L')
137+
{
132138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
133139
}
140+
elseif (*prefix=='G')
141+
{
142+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
143+
{
144+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
145+
return;
146+
}
147+
DDLInProress= true;
148+
}
149+
elseif (*prefix=='E')
150+
{
151+
DDLInProress= false;
152+
}
153+
134154
pq_sendbyte(out,*prefix);
135155
pq_sendint(out,sz,4);
136156
pq_sendbytes(out,message,sz);
@@ -163,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163183
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
164184

165185
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
166-
if (MtmIsFilteredTxn) {
167-
Assert(MtmTransactionRecords==0);
168-
return;
169-
}
186+
//if (MtmIsFilteredTxn) {
187+
//Assert(MtmTransactionRecords == 0);
188+
//return;
189+
//}
170190
}else {
171191
csn_tcsn=MtmTransactionSnapshot(txn->xid);
172192
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
@@ -236,11 +256,20 @@ static void
236256
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
237257
Relationrel,HeapTuplenewtuple)
238258
{
239-
if (!MtmIsFilteredTxn) {
240-
MtmTransactionRecords+=1;
241-
pq_sendbyte(out,'I');/* action INSERT */
242-
pglogical_write_tuple(out,data,rel,newtuple);
259+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
260+
MTM_LOG1("%d: pglogical_write_insert filtered",MyProcPid);
261+
return;
262+
}
263+
264+
if (DDLInProress) {
265+
MTM_LOG1("%d: pglogical_write_insert filtered DDLInProress",MyProcPid);
266+
return;
243267
}
268+
269+
MtmTransactionRecords+=1;
270+
pq_sendbyte(out,'I');/* action INSERT */
271+
pglogical_write_tuple(out,data,rel,newtuple);
272+
244273
}
245274

246275
/*
@@ -250,23 +279,30 @@ static void
250279
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
251280
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
252281
{
253-
if (!MtmIsFilteredTxn) {
254-
MtmTransactionRecords+=1;
282+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
283+
MTM_LOG1("%d: pglogical_write_update filtered",MyProcPid);
284+
return;
285+
}
255286

256-
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
287+
if (DDLInProress) {
288+
MTM_LOG1("%d: pglogical_write_update filtered DDLInProress",MyProcPid);
289+
return;
290+
}
257291

292+
MtmTransactionRecords+=1;
258293

259-
pq_sendbyte(out,'U');/* action UPDATE */
260-
/* FIXME support whole tuple (O tuple type) */
261-
if (oldtuple!=NULL)
262-
{
263-
pq_sendbyte(out,'K');/* old key follows */
264-
pglogical_write_tuple(out,data,rel,oldtuple);
265-
}
266-
267-
pq_sendbyte(out,'N');/* new tuple follows */
268-
pglogical_write_tuple(out,data,rel,newtuple);
294+
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
295+
296+
pq_sendbyte(out,'U');/* action UPDATE */
297+
/* FIXME support whole tuple (O tuple type) */
298+
if (oldtuple!=NULL)
299+
{
300+
pq_sendbyte(out,'K');/* old key follows */
301+
pglogical_write_tuple(out,data,rel,oldtuple);
269302
}
303+
304+
pq_sendbyte(out,'N');/* new tuple follows */
305+
pglogical_write_tuple(out,data,rel,newtuple);
270306
}
271307

272308
/*
@@ -276,11 +312,19 @@ static void
276312
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
277313
Relationrel,HeapTupleoldtuple)
278314
{
279-
if (!MtmIsFilteredTxn) {
280-
MtmTransactionRecords+=1;
281-
pq_sendbyte(out,'D');/* action DELETE */
282-
pglogical_write_tuple(out,data,rel,oldtuple);
315+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
316+
MTM_LOG1("%d: pglogical_write_delete filtered",MyProcPid);
317+
return;
283318
}
319+
320+
if (DDLInProress) {
321+
MTM_LOG1("%d: pglogical_write_delete filtered DDLInProress",MyProcPid);
322+
return;
323+
}
324+
325+
MtmTransactionRecords+=1;
326+
pq_sendbyte(out,'D');/* action DELETE */
327+
pglogical_write_tuple(out,data,rel,oldtuple);
284328
}
285329

286330
/*
@@ -305,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
305349
inti;
306350
uint16nliveatts=0;
307351

352+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
353+
MTM_LOG1("%d: pglogical_write_tuple filtered",MyProcPid);
354+
return;
355+
}
356+
357+
if (DDLInProress) {
358+
MTM_LOG1("%d: pglogical_write_tuple filtered DDLInProress",MyProcPid);
359+
return;
360+
}
361+
308362
desc=RelationGetDescr(rel);
309363

310364
pq_sendbyte(out,'T');/* sending TUPLE */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp