@@ -79,6 +79,26 @@ static void send_startup_message(LogicalDecodingContext *ctx,
7979
8080static bool startup_message_sent = false;
8181
82+ #define OUTPUT_BUFFER_SIZE (16*1024*1024)
83+
84+ static void MtmOutputPluginWrite (LogicalDecodingContext * ctx ,bool last_write ,bool flush )
85+ {
86+ if (flush ) {
87+ OutputPluginWrite (ctx ,last_write );
88+ }
89+ }
90+
91+ static void MtmOutputPluginPrepareWrite (LogicalDecodingContext * ctx ,bool last_write ,bool flush )
92+ {
93+ if (!ctx -> prepared_write ) {
94+ OutputPluginPrepareWrite (ctx ,last_write );
95+ }else if (flush || ctx -> out -> len > OUTPUT_BUFFER_SIZE ) {
96+ OutputPluginWrite (ctx , false);
97+ OutputPluginPrepareWrite (ctx ,last_write );
98+ }
99+ }
100+
101+
82102/* specify output plugin callbacks */
83103void
84104_PG_output_plugin_init (OutputPluginCallbacks * cb )
@@ -388,16 +408,16 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
388408send_replication_origin &=txn -> origin_id != InvalidRepOriginId ;
389409
390410if (data -> api ) {
391- OutputPluginPrepareWrite (ctx , !send_replication_origin );
411+ MtmOutputPluginPrepareWrite (ctx , !send_replication_origin , true );
392412data -> api -> write_begin (ctx -> out ,data ,txn );
393413
394414if (send_replication_origin )
395415{
396416char * origin ;
397417
398418/* Message boundary */
399- OutputPluginWrite (ctx , false);
400- OutputPluginPrepareWrite (ctx , true);
419+ MtmOutputPluginWrite (ctx , false , false);
420+ MtmOutputPluginPrepareWrite (ctx , true, false );
401421
402422/*
403423 * XXX: which behaviour we want here?
@@ -412,7 +432,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
412432replorigin_by_oid (txn -> origin_id , true,& origin ))
413433data -> api -> write_origin (ctx -> out ,origin ,txn -> origin_lsn );
414434}
415- OutputPluginWrite (ctx , true);
435+ MtmOutputPluginWrite (ctx , true, false );
416436}
417437}
418438
@@ -422,9 +442,9 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
422442PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
423443
424444if (data -> api ) {
425- OutputPluginPrepareWrite (ctx , true);
445+ MtmOutputPluginPrepareWrite (ctx , true , true);
426446data -> api -> write_caughtup (ctx -> out ,data ,ctx -> reader -> EndRecPtr );
427- OutputPluginWrite (ctx , true);
447+ MtmOutputPluginWrite (ctx , true , true);
428448}
429449}
430450
@@ -439,9 +459,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
439459PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
440460
441461if (data -> api ) {
442- OutputPluginPrepareWrite (ctx , true);
462+ MtmOutputPluginPrepareWrite (ctx , true , true);
443463data -> api -> write_commit (ctx -> out ,data ,txn ,commit_lsn );
444- OutputPluginWrite (ctx , true);
464+ MtmOutputPluginWrite (ctx , true , true);
445465}
446466}
447467
@@ -462,38 +482,38 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
462482/* TODO: add caching (send only if changed) */
463483if (data -> api -> write_rel )
464484{
465- OutputPluginPrepareWrite (ctx , false);
485+ MtmOutputPluginPrepareWrite (ctx , false , false);
466486data -> api -> write_rel (ctx -> out ,data ,relation );
467- OutputPluginWrite (ctx , false);
487+ MtmOutputPluginWrite (ctx , false , false);
468488}
469489
470490/* Send the data */
471491switch (change -> action )
472492{
473493case REORDER_BUFFER_CHANGE_INSERT :
474- OutputPluginPrepareWrite (ctx , true);
494+ MtmOutputPluginPrepareWrite (ctx , true, false );
475495data -> api -> write_insert (ctx -> out ,data ,relation ,
476496& change -> data .tp .newtuple -> tuple );
477- OutputPluginWrite (ctx , true);
497+ MtmOutputPluginWrite (ctx , true, false );
478498break ;
479499case REORDER_BUFFER_CHANGE_UPDATE :
480500{
481501HeapTuple oldtuple = change -> data .tp .oldtuple ?
482502& change -> data .tp .oldtuple -> tuple :NULL ;
483503
484- OutputPluginPrepareWrite (ctx , true);
504+ MtmOutputPluginPrepareWrite (ctx , true, false );
485505data -> api -> write_update (ctx -> out ,data ,relation ,oldtuple ,
486506& change -> data .tp .newtuple -> tuple );
487- OutputPluginWrite (ctx , true);
507+ MtmOutputPluginWrite (ctx , true, false );
488508break ;
489509}
490510case REORDER_BUFFER_CHANGE_DELETE :
491511if (change -> data .tp .oldtuple )
492512{
493- OutputPluginPrepareWrite (ctx , true);
513+ MtmOutputPluginPrepareWrite (ctx , true, false );
494514data -> api -> write_delete (ctx -> out ,data ,relation ,
495515& change -> data .tp .oldtuple -> tuple );
496- OutputPluginWrite (ctx , true);
516+ MtmOutputPluginWrite (ctx , true, false );
497517}
498518else
499519elog (DEBUG1 ,"didn't send DELETE change because of missing oldtuple" );
@@ -536,9 +556,9 @@ pg_decode_message(LogicalDecodingContext *ctx,
536556{
537557PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
538558
539- OutputPluginPrepareWrite (ctx , true);
559+ MtmOutputPluginPrepareWrite (ctx , true, ! transactional );
540560data -> api -> write_message (ctx -> out ,prefix ,sz ,message );
541- OutputPluginWrite (ctx , true);
561+ MtmOutputPluginWrite (ctx , true, ! transactional );
542562}
543563
544564static void
@@ -559,9 +579,9 @@ send_startup_message(LogicalDecodingContext *ctx,
559579 */
560580
561581if (data -> api ) {
562- OutputPluginPrepareWrite (ctx ,last_message );
582+ MtmOutputPluginPrepareWrite (ctx ,last_message , true );
563583data -> api -> write_startup_message (ctx -> out ,msg );
564- OutputPluginWrite (ctx ,last_message );
584+ MtmOutputPluginWrite (ctx ,last_message , true );
565585}
566586
567587pfree (msg );