3838#include "multimaster.h"
3939#include "pglogical_relid_map.h"
4040
41- static bool MtmIsFilteredTxn ;
42- static int MtmTransactionRecords ;
41+ static int MtmTransactionRecords ;
42+ static TransactionId MtmCurrentXid ;
43+ static bool DDLInProress = false;
4344
4445static void pglogical_write_rel (StringInfo out ,PGLogicalOutputData * data ,Relation rel );
4546
@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7475const char * relname ;
7576uint8 relnamelen ;
7677Oid relid ;
77- if (MtmIsFilteredTxn ) {
78+
79+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ) {
80+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
7881return ;
7982}
80-
83+
84+ if (DDLInProress ) {
85+ MTM_LOG1 ("%d: pglogical_write_message filtered DDLInProress" ,MyProcPid );
86+ return ;
87+ }
88+
8189relid = RelationGetRelid (rel );
8290pq_sendbyte (out ,'R' );/* sending RELATION */
8391pq_sendint (out ,relid ,sizeof relid );/* use Oid as relation identifier */
@@ -107,30 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107115{
108116bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
109117csn_t csn = MtmTransactionSnapshot (txn -> xid );
118+
119+ MtmCurrentXid = txn -> xid ;
120+
110121MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111122MyProcPid ,txn -> xid ,MtmReplicationNodeId ,csn ,isRecovery ,txn -> restart_decoding_lsn ,txn -> first_lsn ,txn -> end_lsn ,MyReplicationSlot -> data .confirmed_flush );
112-
113- if (!isRecovery && csn == INVALID_CSN ) {
114- MtmIsFilteredTxn = true;
115- MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" ,MyProcPid ,txn -> xid );
116- }else {
117- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
118- MtmIsFilteredTxn = false;
119- pq_sendbyte (out ,'B' );/* BEGIN */
120- pq_sendint (out ,MtmNodeId ,4 );
121- pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
122- pq_sendint64 (out ,csn );
123- MtmTransactionRecords = 0 ;
124- }
123+
124+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
125+ pq_sendbyte (out ,'B' );/* BEGIN */
126+ pq_sendint (out ,MtmNodeId ,4 );
127+ pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
128+ pq_sendint64 (out ,csn );
129+ MtmTransactionRecords = 0 ;
125130}
126131
127132static void
128133pglogical_write_message (StringInfo out ,
129134const char * prefix ,Size sz ,const char * message )
130135{
131- if (* prefix == 'L' ) {
136+ if (* prefix == 'L' )
137+ {
132138MTM_LOG1 ("Send deadlock message to node %d" ,MtmReplicationNodeId );
133139}
140+ else if (* prefix == 'G' )
141+ {
142+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN )
143+ {
144+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
145+ return ;
146+ }
147+ DDLInProress = true;
148+ }
149+ else if (* prefix == 'E' )
150+ {
151+ DDLInProress = false;
152+ }
153+
134154pq_sendbyte (out ,* prefix );
135155pq_sendint (out ,sz ,4 );
136156pq_sendbytes (out ,message ,sz );
@@ -163,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163183Assert (flags != PGLOGICAL_COMMIT_PREPARED || txn -> xid < 1000 || MtmTransactionRecords != 1 );
164184
165185if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
166- if (MtmIsFilteredTxn ) {
167- Assert (MtmTransactionRecords == 0 );
168- return ;
169- }
186+ // if (MtmIsFilteredTxn) {
187+ // Assert(MtmTransactionRecords == 0);
188+ // return;
189+ // }
170190}else {
171191csn_t csn = MtmTransactionSnapshot (txn -> xid );
172192bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -236,11 +256,20 @@ static void
236256pglogical_write_insert (StringInfo out ,PGLogicalOutputData * data ,
237257Relation rel ,HeapTuple newtuple )
238258{
239- if (!MtmIsFilteredTxn ) {
240- MtmTransactionRecords += 1 ;
241- pq_sendbyte (out ,'I' );/* action INSERT */
242- pglogical_write_tuple (out ,data ,rel ,newtuple );
259+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
260+ MTM_LOG1 ("%d: pglogical_write_insert filtered" ,MyProcPid );
261+ return ;
262+ }
263+
264+ if (DDLInProress ) {
265+ MTM_LOG1 ("%d: pglogical_write_insert filtered DDLInProress" ,MyProcPid );
266+ return ;
243267}
268+
269+ MtmTransactionRecords += 1 ;
270+ pq_sendbyte (out ,'I' );/* action INSERT */
271+ pglogical_write_tuple (out ,data ,rel ,newtuple );
272+
244273}
245274
246275/*
@@ -250,23 +279,30 @@ static void
250279pglogical_write_update (StringInfo out ,PGLogicalOutputData * data ,
251280Relation rel ,HeapTuple oldtuple ,HeapTuple newtuple )
252281{
253- if (!MtmIsFilteredTxn ) {
254- MtmTransactionRecords += 1 ;
282+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
283+ MTM_LOG1 ("%d: pglogical_write_update filtered" ,MyProcPid );
284+ return ;
285+ }
255286
256- MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
287+ if (DDLInProress ) {
288+ MTM_LOG1 ("%d: pglogical_write_update filtered DDLInProress" ,MyProcPid );
289+ return ;
290+ }
257291
292+ MtmTransactionRecords += 1 ;
258293
259- pq_sendbyte (out ,'U' );/* action UPDATE */
260- /* FIXME support whole tuple (O tuple type) */
261- if (oldtuple != NULL )
262- {
263- pq_sendbyte (out ,'K' );/* old key follows */
264- pglogical_write_tuple (out ,data ,rel ,oldtuple );
265- }
266-
267- pq_sendbyte (out ,'N' );/* new tuple follows */
268- pglogical_write_tuple (out ,data ,rel ,newtuple );
294+ MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
295+
296+ pq_sendbyte (out ,'U' );/* action UPDATE */
297+ /* FIXME support whole tuple (O tuple type) */
298+ if (oldtuple != NULL )
299+ {
300+ pq_sendbyte (out ,'K' );/* old key follows */
301+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
269302}
303+
304+ pq_sendbyte (out ,'N' );/* new tuple follows */
305+ pglogical_write_tuple (out ,data ,rel ,newtuple );
270306}
271307
272308/*
@@ -276,11 +312,19 @@ static void
276312pglogical_write_delete (StringInfo out ,PGLogicalOutputData * data ,
277313Relation rel ,HeapTuple oldtuple )
278314{
279- if (!MtmIsFilteredTxn ) {
280- MtmTransactionRecords += 1 ;
281- pq_sendbyte (out ,'D' );/* action DELETE */
282- pglogical_write_tuple (out ,data ,rel ,oldtuple );
315+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
316+ MTM_LOG1 ("%d: pglogical_write_delete filtered" ,MyProcPid );
317+ return ;
283318}
319+
320+ if (DDLInProress ) {
321+ MTM_LOG1 ("%d: pglogical_write_delete filtered DDLInProress" ,MyProcPid );
322+ return ;
323+ }
324+
325+ MtmTransactionRecords += 1 ;
326+ pq_sendbyte (out ,'D' );/* action DELETE */
327+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
284328}
285329
286330/*
@@ -305,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
305349int i ;
306350uint16 nliveatts = 0 ;
307351
352+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
353+ MTM_LOG1 ("%d: pglogical_write_tuple filtered" ,MyProcPid );
354+ return ;
355+ }
356+
357+ if (DDLInProress ) {
358+ MTM_LOG1 ("%d: pglogical_write_tuple filtered DDLInProress" ,MyProcPid );
359+ return ;
360+ }
361+
308362desc = RelationGetDescr (rel );
309363
310364pq_sendbyte (out ,'T' );/* sending TUPLE */