Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit11a14de

Browse files
committed
filter XLOG records produced by DLL statementsfixes#16
1 parent89a8d27 commit11a14de

File tree

4 files changed

+109
-65
lines changed

4 files changed

+109
-65
lines changed

‎contrib/mmts/multimaster.c‎

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3747,6 +3747,11 @@ static bool MtmProcessDDLCommand(char const* queryString)
37473747
return false;
37483748
}
37493749

3750+
staticvoidMtmFinishDDLCommand()
3751+
{
3752+
LogLogicalMessage("E","",1, true);
3753+
}
3754+
37503755
voidMtmUpdateLockGraph(intnodeId,voidconst*messageBody,intmessageSize)
37513756
{
37523757
intallocated;
@@ -3768,6 +3773,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
37683773
DestReceiver*dest,char*completionTag)
37693774
{
37703775
boolskipCommand= false;
3776+
boolexecuted= false;
37713777

37723778
MTM_LOG3("%d: Process utility statement %s",MyProcPid,queryString);
37733779
switch (nodeTag(parsetree))
@@ -3912,6 +3918,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39123918
if (!skipCommand&& !MtmTx.isReplicated&& (MtmUtilityProcessedInXid==InvalidTransactionId)) {
39133919
MtmUtilityProcessedInXid=GetCurrentTransactionId();
39143920
MtmProcessDDLCommand(queryString);
3921+
executed= true;
39153922
}
39163923
}
39173924

@@ -3933,6 +3940,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39333940
MtmTx.snapshot=INVALID_CSN;
39343941
}
39353942

3943+
if (executed)
3944+
{
3945+
MtmFinishDDLCommand();
3946+
}
39363947
}
39373948

39383949

@@ -3967,12 +3978,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
39673978
}
39683979
}
39693980

3970-
// if (MyXactAccessedRel)
3971-
// {
3972-
// MTM_LOG1("MtmTx.containsDML = true");
3973-
// MtmTx.containsDML = true;
3974-
// }
3975-
39763981
if (PreviousExecutorFinishHook!=NULL)
39773982
{
39783983
PreviousExecutorFinishHook(queryDesc);

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62-
staticboolinside_tx= false;
63-
6462
staticRelationread_rel(StringInfos,LOCKMODEmode);
6563
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
6664
staticEState*create_rel_estate(Relationrel);
@@ -353,8 +351,6 @@ process_remote_begin(StringInfo s)
353351
StartTransactionCommand();
354352
MtmJoinTransaction(&gtid,snapshot);
355353

356-
inside_tx= true;
357-
358354
return true;
359355
}
360356

@@ -364,11 +360,6 @@ process_remote_transactional_message(StringInfo s)
364360
intrc;
365361
intmessageSize=pq_getmsgint(s,4);
366362
charconst*stmt=pq_getmsgbytes(s,messageSize);
367-
if (!inside_tx)
368-
{
369-
MTM_LOG1("%d: Ignoring utility statement %s",MyProcPid,stmt);
370-
return;
371-
}
372363

373364
MTM_LOG1("%d: Executing utility statement %s",MyProcPid,stmt);
374365
SPI_connect();
@@ -667,7 +658,6 @@ process_remote_commit(StringInfo in)
667658
if (flags&PGLOGICAL_CAUGHT_UP) {
668659
MtmRecoveryCompleted();
669660
}
670-
inside_tx= false;
671661
}
672662

673663
staticvoid
@@ -992,7 +982,7 @@ void MtmExecutor(void* work, size_t size)
992982
{
993983
while (true) {
994984
charaction=pq_getmsgbyte(&s);
995-
MTM_LOG3("%d: REMOTE process action %c",MyProcPid,action);
985+
MTM_LOG1("%d: REMOTE process action %c",MyProcPid,action);
996986
#if0
997987
if (Mtm->status==MTM_RECOVERY) {
998988
MTM_LOG1("Replay action %c[%x]",action,s.data[s.cursor]);
@@ -1054,6 +1044,7 @@ void MtmExecutor(void* work, size_t size)
10541044
continue;
10551045
}
10561046
case'G':
1047+
case'E':
10571048
{
10581049
process_remote_transactional_message(&s);
10591050
continue;

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 94 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838
#include"multimaster.h"
3939
#include"pglogical_relid_map.h"
4040

41-
staticboolMtmIsFilteredTxn;
42-
staticintMtmTransactionRecords;
41+
staticintMtmTransactionRecords;
42+
staticTransactionIdMtmCurrentXid;
43+
staticboolDDLInProress= false;
4344

4445
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4546

@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7475
constchar*relname;
7576
uint8relnamelen;
7677
Oidrelid;
77-
if (MtmIsFilteredTxn) {
78+
79+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN) {
80+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
7881
return;
7982
}
80-
83+
84+
if (DDLInProress) {
85+
MTM_LOG1("%d: pglogical_write_message filtered DDLInProress",MyProcPid);
86+
return;
87+
}
88+
8189
relid=RelationGetRelid(rel);
8290
pq_sendbyte(out,'R');/* sending RELATION */
8391
pq_sendint(out,relid,sizeofrelid);/* use Oid as relation identifier */
@@ -107,36 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107115
{
108116
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
109117
csn_tcsn=MtmTransactionSnapshot(txn->xid);
118+
119+
MtmCurrentXid=txn->xid;
120+
110121
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111122
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
112-
113-
if (!isRecovery&&csn==INVALID_CSN) {
114-
MtmIsFilteredTxn= true;
115-
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
116-
}else {
117-
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
118-
MtmIsFilteredTxn= false;
119-
pq_sendbyte(out,'B');/* BEGIN */
120-
pq_sendint(out,MtmNodeId,4);
121-
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
122-
pq_sendint64(out,csn);
123-
MtmTransactionRecords=0;
124-
}
123+
124+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
125+
pq_sendbyte(out,'B');/* BEGIN */
126+
pq_sendint(out,MtmNodeId,4);
127+
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
128+
pq_sendint64(out,csn);
129+
MtmTransactionRecords=0;
125130
}
126131

127132
staticvoid
128133
pglogical_write_message(StringInfoout,
129134
constchar*prefix,Sizesz,constchar*message)
130135
{
131-
if (*prefix=='L') {
136+
if (*prefix=='L')
137+
{
132138
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
133-
}else {
134-
if (MtmIsFilteredTxn)
139+
}
140+
elseif (*prefix=='G')
141+
{
142+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN)
135143
{
136-
MTM_LOG3("%d: pglogical_write_message filtered",MyProcPid);
144+
MTM_LOG1("%d: pglogical_write_message filtered",MyProcPid);
137145
return;
138146
}
147+
DDLInProress= true;
139148
}
149+
elseif (*prefix=='E')
150+
{
151+
DDLInProress= false;
152+
}
153+
140154
pq_sendbyte(out,*prefix);
141155
pq_sendint(out,sz,4);
142156
pq_sendbytes(out,message,sz);
@@ -169,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169183
Assert(flags!=PGLOGICAL_COMMIT_PREPARED||txn->xid<1000||MtmTransactionRecords!=1);
170184

171185
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
172-
if (MtmIsFilteredTxn) {
173-
Assert(MtmTransactionRecords==0);
174-
return;
175-
}
186+
//if (MtmIsFilteredTxn) {
187+
//Assert(MtmTransactionRecords == 0);
188+
//return;
189+
//}
176190
}else {
177191
csn_tcsn=MtmTransactionSnapshot(txn->xid);
178192
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
@@ -242,11 +256,20 @@ static void
242256
pglogical_write_insert(StringInfoout,PGLogicalOutputData*data,
243257
Relationrel,HeapTuplenewtuple)
244258
{
245-
if (!MtmIsFilteredTxn) {
246-
MtmTransactionRecords+=1;
247-
pq_sendbyte(out,'I');/* action INSERT */
248-
pglogical_write_tuple(out,data,rel,newtuple);
259+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
260+
MTM_LOG1("%d: pglogical_write_insert filtered",MyProcPid);
261+
return;
262+
}
263+
264+
if (DDLInProress) {
265+
MTM_LOG1("%d: pglogical_write_insert filtered DDLInProress",MyProcPid);
266+
return;
249267
}
268+
269+
MtmTransactionRecords+=1;
270+
pq_sendbyte(out,'I');/* action INSERT */
271+
pglogical_write_tuple(out,data,rel,newtuple);
272+
250273
}
251274

252275
/*
@@ -256,23 +279,30 @@ static void
256279
pglogical_write_update(StringInfoout,PGLogicalOutputData*data,
257280
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
258281
{
259-
if (!MtmIsFilteredTxn) {
260-
MtmTransactionRecords+=1;
282+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
283+
MTM_LOG1("%d: pglogical_write_update filtered",MyProcPid);
284+
return;
285+
}
261286

262-
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
287+
if (DDLInProress) {
288+
MTM_LOG1("%d: pglogical_write_update filtered DDLInProress",MyProcPid);
289+
return;
290+
}
263291

292+
MtmTransactionRecords+=1;
264293

265-
pq_sendbyte(out,'U');/* action UPDATE */
266-
/* FIXME support whole tuple (O tuple type) */
267-
if (oldtuple!=NULL)
268-
{
269-
pq_sendbyte(out,'K');/* old key follows */
270-
pglogical_write_tuple(out,data,rel,oldtuple);
271-
}
272-
273-
pq_sendbyte(out,'N');/* new tuple follows */
274-
pglogical_write_tuple(out,data,rel,newtuple);
294+
MTM_LOG3("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
295+
296+
pq_sendbyte(out,'U');/* action UPDATE */
297+
/* FIXME support whole tuple (O tuple type) */
298+
if (oldtuple!=NULL)
299+
{
300+
pq_sendbyte(out,'K');/* old key follows */
301+
pglogical_write_tuple(out,data,rel,oldtuple);
275302
}
303+
304+
pq_sendbyte(out,'N');/* new tuple follows */
305+
pglogical_write_tuple(out,data,rel,newtuple);
276306
}
277307

278308
/*
@@ -282,11 +312,19 @@ static void
282312
pglogical_write_delete(StringInfoout,PGLogicalOutputData*data,
283313
Relationrel,HeapTupleoldtuple)
284314
{
285-
if (!MtmIsFilteredTxn) {
286-
MtmTransactionRecords+=1;
287-
pq_sendbyte(out,'D');/* action DELETE */
288-
pglogical_write_tuple(out,data,rel,oldtuple);
315+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
316+
MTM_LOG1("%d: pglogical_write_delete filtered",MyProcPid);
317+
return;
318+
}
319+
320+
if (DDLInProress) {
321+
MTM_LOG1("%d: pglogical_write_delete filtered DDLInProress",MyProcPid);
322+
return;
289323
}
324+
325+
MtmTransactionRecords+=1;
326+
pq_sendbyte(out,'D');/* action DELETE */
327+
pglogical_write_tuple(out,data,rel,oldtuple);
290328
}
291329

292330
/*
@@ -311,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
311349
inti;
312350
uint16nliveatts=0;
313351

352+
if (MtmTransactionSnapshot(MtmCurrentXid)==INVALID_CSN){
353+
MTM_LOG1("%d: pglogical_write_tuple filtered",MyProcPid);
354+
return;
355+
}
356+
357+
if (DDLInProress) {
358+
MTM_LOG1("%d: pglogical_write_tuple filtered DDLInProress",MyProcPid);
359+
return;
360+
}
361+
314362
desc=RelationGetDescr(rel);
315363

316364
pq_sendbyte(out,'T');/* sending TUPLE */

‎src/test/regress/serial_schedule‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ test: oidjoins
5050
test: type_sanity
5151
test: opr_sanity
5252
test: insert
53-
# test: insert_conflict # issue#17
53+
# test: insert_conflict
5454
test: create_function_1
5555
test: create_type
5656
test: create_table
@@ -108,7 +108,7 @@ test: privileges
108108
test: init_privs
109109
test: security_label
110110
test: collate
111-
#test: matview
111+
test: matview
112112
test: lock
113113
test: replica_identity
114114
# test: rowsecurity # issue#20

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp