38
38
#include "multimaster.h"
39
39
#include "pglogical_relid_map.h"
40
40
41
- static bool MtmIsFilteredTxn ;
42
- static int MtmTransactionRecords ;
41
+ static int MtmTransactionRecords ;
42
+ static TransactionId MtmCurrentXid ;
43
+ static bool DDLInProress = false;
43
44
44
45
static void pglogical_write_rel (StringInfo out ,PGLogicalOutputData * data ,Relation rel );
45
46
@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
74
75
const char * relname ;
75
76
uint8 relnamelen ;
76
77
Oid relid ;
77
- if (MtmIsFilteredTxn ) {
78
+
79
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ) {
80
+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
78
81
return ;
79
82
}
80
-
83
+
84
+ if (DDLInProress ) {
85
+ MTM_LOG1 ("%d: pglogical_write_message filtered DDLInProress" ,MyProcPid );
86
+ return ;
87
+ }
88
+
81
89
relid = RelationGetRelid (rel );
82
90
pq_sendbyte (out ,'R' );/* sending RELATION */
83
91
pq_sendint (out ,relid ,sizeof relid );/* use Oid as relation identifier */
@@ -107,36 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107
115
{
108
116
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
109
117
csn_t csn = MtmTransactionSnapshot (txn -> xid );
118
+
119
+ MtmCurrentXid = txn -> xid ;
120
+
110
121
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111
122
MyProcPid ,txn -> xid ,MtmReplicationNodeId ,csn ,isRecovery ,txn -> restart_decoding_lsn ,txn -> first_lsn ,txn -> end_lsn ,MyReplicationSlot -> data .confirmed_flush );
112
-
113
- if (!isRecovery && csn == INVALID_CSN ) {
114
- MtmIsFilteredTxn = true;
115
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" ,MyProcPid ,txn -> xid );
116
- }else {
117
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
118
- MtmIsFilteredTxn = false;
119
- pq_sendbyte (out ,'B' );/* BEGIN */
120
- pq_sendint (out ,MtmNodeId ,4 );
121
- pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
122
- pq_sendint64 (out ,csn );
123
- MtmTransactionRecords = 0 ;
124
- }
123
+
124
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" ,MyProcPid ,txn -> xid );
125
+ pq_sendbyte (out ,'B' );/* BEGIN */
126
+ pq_sendint (out ,MtmNodeId ,4 );
127
+ pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
128
+ pq_sendint64 (out ,csn );
129
+ MtmTransactionRecords = 0 ;
125
130
}
126
131
127
132
static void
128
133
pglogical_write_message (StringInfo out ,
129
134
const char * prefix ,Size sz ,const char * message )
130
135
{
131
- if (* prefix == 'L' ) {
136
+ if (* prefix == 'L' )
137
+ {
132
138
MTM_LOG1 ("Send deadlock message to node %d" ,MtmReplicationNodeId );
133
- }else {
134
- if (MtmIsFilteredTxn )
139
+ }
140
+ else if (* prefix == 'G' )
141
+ {
142
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN )
135
143
{
136
- MTM_LOG3 ("%d: pglogical_write_message filtered" ,MyProcPid );
144
+ MTM_LOG1 ("%d: pglogical_write_message filtered" ,MyProcPid );
137
145
return ;
138
146
}
147
+ DDLInProress = true;
139
148
}
149
+ else if (* prefix == 'E' )
150
+ {
151
+ DDLInProress = false;
152
+ }
153
+
140
154
pq_sendbyte (out ,* prefix );
141
155
pq_sendint (out ,sz ,4 );
142
156
pq_sendbytes (out ,message ,sz );
@@ -169,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169
183
Assert (flags != PGLOGICAL_COMMIT_PREPARED || txn -> xid < 1000 || MtmTransactionRecords != 1 );
170
184
171
185
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
172
- if (MtmIsFilteredTxn ) {
173
- Assert (MtmTransactionRecords == 0 );
174
- return ;
175
- }
186
+ // if (MtmIsFilteredTxn) {
187
+ // Assert(MtmTransactionRecords == 0);
188
+ // return;
189
+ // }
176
190
}else {
177
191
csn_t csn = MtmTransactionSnapshot (txn -> xid );
178
192
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -242,11 +256,20 @@ static void
242
256
pglogical_write_insert (StringInfo out ,PGLogicalOutputData * data ,
243
257
Relation rel ,HeapTuple newtuple )
244
258
{
245
- if (!MtmIsFilteredTxn ) {
246
- MtmTransactionRecords += 1 ;
247
- pq_sendbyte (out ,'I' );/* action INSERT */
248
- pglogical_write_tuple (out ,data ,rel ,newtuple );
259
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
260
+ MTM_LOG1 ("%d: pglogical_write_insert filtered" ,MyProcPid );
261
+ return ;
262
+ }
263
+
264
+ if (DDLInProress ) {
265
+ MTM_LOG1 ("%d: pglogical_write_insert filtered DDLInProress" ,MyProcPid );
266
+ return ;
249
267
}
268
+
269
+ MtmTransactionRecords += 1 ;
270
+ pq_sendbyte (out ,'I' );/* action INSERT */
271
+ pglogical_write_tuple (out ,data ,rel ,newtuple );
272
+
250
273
}
251
274
252
275
/*
@@ -256,23 +279,30 @@ static void
256
279
pglogical_write_update (StringInfo out ,PGLogicalOutputData * data ,
257
280
Relation rel ,HeapTuple oldtuple ,HeapTuple newtuple )
258
281
{
259
- if (!MtmIsFilteredTxn ) {
260
- MtmTransactionRecords += 1 ;
282
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
283
+ MTM_LOG1 ("%d: pglogical_write_update filtered" ,MyProcPid );
284
+ return ;
285
+ }
261
286
262
- MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
287
+ if (DDLInProress ) {
288
+ MTM_LOG1 ("%d: pglogical_write_update filtered DDLInProress" ,MyProcPid );
289
+ return ;
290
+ }
263
291
292
+ MtmTransactionRecords += 1 ;
264
293
265
- pq_sendbyte (out ,'U' );/* action UPDATE */
266
- /* FIXME support whole tuple (O tuple type) */
267
- if (oldtuple != NULL )
268
- {
269
- pq_sendbyte (out ,'K' );/* old key follows */
270
- pglogical_write_tuple (out ,data ,rel ,oldtuple );
271
- }
272
-
273
- pq_sendbyte (out ,'N' );/* new tuple follows */
274
- pglogical_write_tuple (out ,data ,rel ,newtuple );
294
+ MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
295
+
296
+ pq_sendbyte (out ,'U' );/* action UPDATE */
297
+ /* FIXME support whole tuple (O tuple type) */
298
+ if (oldtuple != NULL )
299
+ {
300
+ pq_sendbyte (out ,'K' );/* old key follows */
301
+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
275
302
}
303
+
304
+ pq_sendbyte (out ,'N' );/* new tuple follows */
305
+ pglogical_write_tuple (out ,data ,rel ,newtuple );
276
306
}
277
307
278
308
/*
@@ -282,11 +312,19 @@ static void
282
312
pglogical_write_delete (StringInfo out ,PGLogicalOutputData * data ,
283
313
Relation rel ,HeapTuple oldtuple )
284
314
{
285
- if (!MtmIsFilteredTxn ) {
286
- MtmTransactionRecords += 1 ;
287
- pq_sendbyte (out ,'D' );/* action DELETE */
288
- pglogical_write_tuple (out ,data ,rel ,oldtuple );
315
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
316
+ MTM_LOG1 ("%d: pglogical_write_delete filtered" ,MyProcPid );
317
+ return ;
318
+ }
319
+
320
+ if (DDLInProress ) {
321
+ MTM_LOG1 ("%d: pglogical_write_delete filtered DDLInProress" ,MyProcPid );
322
+ return ;
289
323
}
324
+
325
+ MtmTransactionRecords += 1 ;
326
+ pq_sendbyte (out ,'D' );/* action DELETE */
327
+ pglogical_write_tuple (out ,data ,rel ,oldtuple );
290
328
}
291
329
292
330
/*
@@ -311,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
311
349
int i ;
312
350
uint16 nliveatts = 0 ;
313
351
352
+ if (MtmTransactionSnapshot (MtmCurrentXid )== INVALID_CSN ){
353
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered" ,MyProcPid );
354
+ return ;
355
+ }
356
+
357
+ if (DDLInProress ) {
358
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered DDLInProress" ,MyProcPid );
359
+ return ;
360
+ }
361
+
314
362
desc = RelationGetDescr (rel );
315
363
316
364
pq_sendbyte (out ,'T' );/* sending TUPLE */