@@ -168,38 +168,34 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168
168
}
169
169
}
170
170
171
- static void pglogical_broadcast_table (StringInfo out ,PGLogicalOutputData * data ,MtmCopyRequest * copy )
171
+ static void pglogical_broadcast_table (StringInfo out ,LogicalDecodingContext * ctx ,MtmCopyRequest * copy )
172
172
{
173
173
if (BIT_CHECK (copy -> targetNodes ,MtmReplicationNodeId - 1 )) {
174
174
HeapScanDesc scandesc ;
175
175
HeapTuple tuple ;
176
176
Relation rel ;
177
177
178
- StartTransactionCommand ();
179
-
180
178
rel = heap_open (copy -> sourceTable ,ShareLock );
181
179
182
- pq_sendbyte (out ,'M' );
183
- pq_sendbyte (out ,'B' );
184
- pq_sendint (out ,sizeof (* copy ),4 );
185
- pq_sendbytes (out , (char * )copy ,sizeof (* copy ));
186
-
187
- pglogical_write_rel (out ,data ,rel );
180
+ pglogical_write_rel (out ,ctx -> output_plugin_private ,rel );
181
+
182
+ pq_sendbyte (out ,'0' );
188
183
189
184
scandesc = heap_beginscan (rel ,GetTransactionSnapshot (),0 ,NULL );
190
185
while ((tuple = heap_getnext (scandesc ,ForwardScanDirection ))!= NULL )
191
186
{
192
- pglogical_write_tuple (out ,data ,rel ,tuple );
187
+ MtmOutputPluginPrepareWrite (ctx , false, false);
188
+ pq_sendbyte (out ,'I' );/* action INSERT */
189
+ pglogical_write_tuple (out ,ctx -> output_plugin_private ,rel ,tuple );
190
+ MtmOutputPluginWrite (ctx , false, false);
193
191
}
194
192
heap_endscan (scandesc );
195
193
heap_close (rel ,ShareLock );
196
-
197
- CommitTransactionCommand ();
198
194
}
199
195
}
200
196
201
197
static void
202
- pglogical_write_message (StringInfo out ,PGLogicalOutputData * data ,
198
+ pglogical_write_message (StringInfo out ,LogicalDecodingContext * ctx ,
203
199
const char * prefix ,Size sz ,const char * message )
204
200
{
205
201
MtmLastRelId = InvalidOid ;
@@ -231,7 +227,7 @@ pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
231
227
*/
232
228
return ;
233
229
case 'B' :
234
- pglogical_broadcast_table (out ,data , (MtmCopyRequest * )message );
230
+ pglogical_broadcast_table (out ,ctx , (MtmCopyRequest * )message );
235
231
return ;
236
232
}
237
233
pq_sendbyte (out ,'M' );