38
38
#include "multimaster.h"
39
39
#include "pglogical_relid_map.h"
40
40
41
- static bool MtmIsFilteredTxn ;
42
- static int MtmTransactionRecords ;
41
+ static int MtmTransactionRecords ;
42
+ static TransactionId MtmCurrentXid ;
43
+ static bool DDLInProress = false;
43
44
44
45
static void pglogical_write_rel (StringInfo out ,PGLogicalOutputData * data ,Relation rel );
45
46
@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
74
75
const char * relname ;
75
76
uint8 relnamelen ;
76
77
Oid relid ;
77
- if (MtmIsFilteredTxn ) {
78
+
79
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ) {
80
+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
78
81
return ;
79
82
}
80
-
83
+
84
+ if (DDLInProress ) {
85
+ MTM_LOG1 ("%d: pglogical_write_message filtered DDLInProress" ,MyProcPid );
86
+ return ;
87
+ }
88
+
81
89
relid = RelationGetRelid (rel );
82
90
pq_sendbyte (out ,'R' );/* sending RELATION */
83
91
pq_sendint (out ,relid ,sizeof relid );/* use Oid as relation identifier */
@@ -107,30 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107
115
{
108
116
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
109
117
csn_t csn = MtmTransactionSnapshot (txn -> xid );
118
+
119
+ MtmCurrentXid = txn -> xid ;
120
+
110
121
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111
122
MyProcPid ,txn -> xid ,MtmReplicationNodeId ,csn ,isRecovery ,txn -> restart_decoding_lsn ,txn -> first_lsn ,txn -> end_lsn ,MyReplicationSlot -> data .confirmed_flush );
112
-
113
- if (!isRecovery && csn == INVALID_CSN ) {
114
- MtmIsFilteredTxn = true;
115
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" ,MyProcPid ,txn -> xid );
116
- }else {
117
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
118
- MtmIsFilteredTxn = false;
119
- pq_sendbyte (out ,'B' );/* BEGIN */
120
- pq_sendint (out ,MtmNodeId ,4 );
121
- pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
122
- pq_sendint64 (out ,csn );
123
- MtmTransactionRecords = 0 ;
124
- }
123
+
124
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
125
+ pq_sendbyte (out ,'B' );/* BEGIN */
126
+ pq_sendint (out ,MtmNodeId ,4 );
127
+ pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
128
+ pq_sendint64 (out ,csn );
129
+ MtmTransactionRecords = 0 ;
125
130
}
126
131
127
132
static void
128
133
pglogical_write_message (StringInfo out ,
129
134
const char * prefix ,Size sz ,const char * message )
130
135
{
131
- if (* prefix == 'L' ) {
136
+ if (* prefix == 'L' )
137
+ {
132
138
MTM_LOG1 ("Send deadlock message to node %d" ,MtmReplicationNodeId );
133
139
}
140
+ else if (* prefix == 'G' )
141
+ {
142
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN )
143
+ {
144
+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
145
+ return ;
146
+ }
147
+ DDLInProress = true;
148
+ }
149
+ else if (* prefix == 'E' )
150
+ {
151
+ DDLInProress = false;
152
+ }
153
+
134
154
pq_sendbyte (out ,* prefix );
135
155
pq_sendint (out ,sz ,4 );
136
156
pq_sendbytes (out ,message ,sz );
@@ -163,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163
183
Assert (flags != PGLOGICAL_COMMIT_PREPARED || txn -> xid < 1000 || MtmTransactionRecords != 1 );
164
184
165
185
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
166
- if (MtmIsFilteredTxn ) {
167
- Assert (MtmTransactionRecords == 0 );
168
- return ;
169
- }
186
+ // if (MtmIsFilteredTxn) {
187
+ // Assert(MtmTransactionRecords == 0);
188
+ // return;
189
+ // }
170
190
}else {
171
191
csn_t csn = MtmTransactionSnapshot (txn -> xid );
172
192
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -236,11 +256,20 @@ static void
236
256
pglogical_write_insert (StringInfo out ,PGLogicalOutputData * data ,
237
257
Relation rel ,HeapTuple newtuple )
238
258
{
239
- if (!MtmIsFilteredTxn ) {
240
- MtmTransactionRecords += 1 ;
241
- pq_sendbyte (out ,'I' );/* action INSERT */
242
- pglogical_write_tuple (out ,data ,rel ,newtuple );
259
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
260
+ MTM_LOG1 ("%d: pglogical_write_insert filtered" ,MyProcPid );
261
+ return ;
262
+ }
263
+
264
+ if (DDLInProress ) {
265
+ MTM_LOG1 ("%d: pglogical_write_insert filtered DDLInProress" ,MyProcPid );
266
+ return ;
243
267
}
268
+
269
+ MtmTransactionRecords += 1 ;
270
+ pq_sendbyte (out ,'I' );/* action INSERT */
271
+ pglogical_write_tuple (out ,data ,rel ,newtuple );
272
+
244
273
}
245
274
246
275
/*
@@ -250,23 +279,30 @@ static void
250
279
pglogical_write_update (StringInfo out ,PGLogicalOutputData * data ,
251
280
Relation rel ,HeapTuple oldtuple ,HeapTuple newtuple )
252
281
{
253
- if (!MtmIsFilteredTxn ) {
254
- MtmTransactionRecords += 1 ;
282
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
283
+ MTM_LOG1 ("%d: pglogical_write_update filtered" ,MyProcPid );
284
+ return ;
285
+ }
255
286
256
- MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
287
+ if (DDLInProress ) {
288
+ MTM_LOG1 ("%d: pglogical_write_update filtered DDLInProress" ,MyProcPid );
289
+ return ;
290
+ }
257
291
292
+ MtmTransactionRecords += 1 ;
258
293
259
- pq_sendbyte (out ,'U' );/* action UPDATE */
260
- /* FIXME support whole tuple (O tuple type) */
261
- if (oldtuple != NULL )
262
- {
263
- pq_sendbyte (out ,'K' );/* old key follows */
264
- pglogical_write_tuple (out ,data ,rel ,oldtuple );
265
- }
266
-
267
- pq_sendbyte (out ,'N' );/* new tuple follows */
268
- pglogical_write_tuple (out ,data ,rel ,newtuple );
294
+ MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
295
+
296
+ pq_sendbyte (out ,'U' );/* action UPDATE */
297
+ /* FIXME support whole tuple (O tuple type) */
298
+ if (oldtuple != NULL )
299
+ {
300
+ pq_sendbyte (out ,'K' );/* old key follows */
301
+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
269
302
}
303
+
304
+ pq_sendbyte (out ,'N' );/* new tuple follows */
305
+ pglogical_write_tuple (out ,data ,rel ,newtuple );
270
306
}
271
307
272
308
/*
@@ -276,11 +312,19 @@ static void
276
312
pglogical_write_delete (StringInfo out ,PGLogicalOutputData * data ,
277
313
Relation rel ,HeapTuple oldtuple )
278
314
{
279
- if (!MtmIsFilteredTxn ) {
280
- MtmTransactionRecords += 1 ;
281
- pq_sendbyte (out ,'D' );/* action DELETE */
282
- pglogical_write_tuple (out ,data ,rel ,oldtuple );
315
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
316
+ MTM_LOG1 ("%d: pglogical_write_delete filtered" ,MyProcPid );
317
+ return ;
283
318
}
319
+
320
+ if (DDLInProress ) {
321
+ MTM_LOG1 ("%d: pglogical_write_delete filtered DDLInProress" ,MyProcPid );
322
+ return ;
323
+ }
324
+
325
+ MtmTransactionRecords += 1 ;
326
+ pq_sendbyte (out ,'D' );/* action DELETE */
327
+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
284
328
}
285
329
286
330
/*
@@ -305,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
305
349
int i ;
306
350
uint16 nliveatts = 0 ;
307
351
352
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
353
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered" ,MyProcPid );
354
+ return ;
355
+ }
356
+
357
+ if (DDLInProress ) {
358
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered DDLInProress" ,MyProcPid );
359
+ return ;
360
+ }
361
+
308
362
desc = RelationGetDescr (rel );
309
363
310
364
pq_sendbyte (out ,'T' );/* sending TUPLE */