39
39
#include "pglogical_relid_map.h"
40
40
41
41
static bool MtmIsFilteredTxn ;
42
+ static int MtmTransactionRecords ;
42
43
43
44
static void pglogical_write_rel (StringInfo out ,PGLogicalOutputData * data ,Relation rel );
44
45
@@ -106,7 +107,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
106
107
{
107
108
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
108
109
csn_t csn = MtmTransactionSnapshot (txn -> xid );
109
- MTM_LOG2 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d" ,MyProcPid ,txn -> xid ,MtmReplicationNodeId ,csn ,isRecovery );
110
+ MTM_LOG1 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111
+ MyProcPid ,txn -> xid ,MtmReplicationNodeId ,csn ,isRecovery ,txn -> restart_decoding_lsn ,txn -> first_lsn ,txn -> end_lsn ,MyReplicationSlot -> data .confirmed_flush );
110
112
111
113
if (csn == INVALID_CSN && !isRecovery ) {
112
114
MtmIsFilteredTxn = true;
@@ -116,6 +118,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116
118
pq_sendint (out ,isRecovery ?InvalidTransactionId :txn -> xid ,4 );
117
119
pq_sendint64 (out ,csn );
118
120
MtmIsFilteredTxn = false;
121
+ MtmTransactionRecords = 0 ;
119
122
}
120
123
}
121
124
@@ -137,6 +140,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
137
140
{
138
141
uint8 flags = 0 ;
139
142
143
+ MTM_LOG1 ("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
144
+ MyProcPid ,txn -> xid ,MtmReplicationNodeId ,txn -> restart_decoding_lsn ,txn -> first_lsn ,txn -> end_lsn ,MyReplicationSlot -> data .confirmed_flush );
145
+
146
+
140
147
if (txn -> xact_action == XLOG_XACT_COMMIT )
141
148
flags = PGLOGICAL_COMMIT ;
142
149
else if (txn -> xact_action == XLOG_XACT_PREPARE )
@@ -150,6 +157,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150
157
151
158
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
152
159
if (MtmIsFilteredTxn ) {
160
+ Assert (MtmTransactionRecords == 0 );
153
161
return ;
154
162
}
155
163
}else {
@@ -161,6 +169,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161
169
*/
162
170
if (csn == INVALID_CSN && !isRecovery )
163
171
{
172
+ Assert (MtmTransactionRecords == 0 );
164
173
return ;
165
174
}
166
175
if (MtmRecoveryCaughtUp (MtmReplicationNodeId ,txn -> end_lsn )) {
@@ -176,18 +185,23 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
176
185
pq_sendbyte (out ,flags );
177
186
pq_sendbyte (out ,MtmNodeId );
178
187
188
+ Assert (txn -> xact_action != XLOG_XACT_PREPARE || txn -> xid < 1000 || MtmTransactionRecords >=2 );
189
+ pq_sendint (out ,MtmTransactionRecords ,4 );
190
+
179
191
/* send fixed fields */
180
192
pq_sendint64 (out ,commit_lsn );
181
193
pq_sendint64 (out ,txn -> end_lsn );
182
194
pq_sendint64 (out ,txn -> commit_time );
183
195
184
196
if (txn -> xact_action == XLOG_XACT_COMMIT_PREPARED ) {
197
+ Assert (MtmTransactionRecords == 0 );
185
198
pq_sendint64 (out ,MtmGetTransactionCSN (txn -> xid ));
186
199
}
187
200
if (txn -> xact_action != XLOG_XACT_COMMIT ) {
188
201
pq_sendstring (out ,txn -> gid );
189
202
}
190
203
204
+ MtmTransactionRecords = 0 ;
191
205
MTM_TXTRACE (txn ,"pglogical_write_commit Finish" );
192
206
}
193
207
@@ -199,6 +213,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
199
213
Relation rel ,HeapTuple newtuple )
200
214
{
201
215
if (!MtmIsFilteredTxn ) {
216
+ MtmTransactionRecords += 1 ;
202
217
pq_sendbyte (out ,'I' );/* action INSERT */
203
218
pglogical_write_tuple (out ,data ,rel ,newtuple );
204
219
}
@@ -212,6 +227,11 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
212
227
Relation rel ,HeapTuple oldtuple ,HeapTuple newtuple )
213
228
{
214
229
if (!MtmIsFilteredTxn ) {
230
+ MtmTransactionRecords += 1 ;
231
+
232
+ MTM_LOG1 ("%d: pglogical_write_update confirmed_flush=%lx" ,MyProcPid ,MyReplicationSlot -> data .confirmed_flush );
233
+
234
+
215
235
pq_sendbyte (out ,'U' );/* action UPDATE */
216
236
/* FIXME support whole tuple (O tuple type) */
217
237
if (oldtuple != NULL )
@@ -233,6 +253,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
233
253
Relation rel ,HeapTuple oldtuple )
234
254
{
235
255
if (!MtmIsFilteredTxn ) {
256
+ MtmTransactionRecords += 1 ;
236
257
pq_sendbyte (out ,'D' );/* action DELETE */
237
258
pglogical_write_tuple (out ,data ,rel ,oldtuple );
238
259
}