@@ -522,16 +522,18 @@ pglogical_receiver_main(Datum main_arg)
522522
523523if (rc > hdr_len )
524524{
525+ int msg_len = rc - hdr_len ;
525526stmt = copybuf + hdr_len ;
526527if (mode == REPLMODE_RECOVERED ) {
528+ /* Ingore all incompleted transactions from recovered node */
527529if (stmt [0 ]!= 'B' ) {
528530output_written_lsn = Max (walEnd ,output_written_lsn );
529531continue ;
530532}
531533mode = REPLMODE_OPEN_EXISTED ;
532534}
533535MTM_LOG3 ("Receive message %c from node %d" ,stmt [0 ],nodeId );
534- if (buf .used >=MtmTransSpillThreshold * MB ) {
536+ if (buf .used + msg_len + 1 >=MtmTransSpillThreshold * MB ) {
535537if (spill_file < 0 ) {
536538int file_id ;
537539spill_file = MtmCreateSpillFile (nodeId ,& file_id );
@@ -548,15 +550,15 @@ pglogical_receiver_main(Datum main_arg)
548550if (stmt [0 ]== 'Z' || (stmt [0 ]== 'M' && (stmt [1 ]== 'L' || stmt [1 ]== 'A' || stmt [1 ]== 'C' ))) {
549551MTM_LOG3 ("Process '%c' message from %d" ,stmt [1 ],nodeId );
550552if (stmt [0 ]== 'M' && stmt [1 ]== 'C' ) {/* concurrent DDL should be executed by parallel workers */
551- MtmExecute (stmt ,rc - hdr_len );
553+ MtmExecute (stmt ,msg_len );
552554}else {
553- MtmExecutor (stmt ,rc - hdr_len );/* all other messages can be processed by receiver itself */
555+ MtmExecutor (stmt ,msg_len );/* all other messages can be processed by receiver itself */
554556}
555557}else {
556- ByteBufferAppend (& buf ,stmt ,rc - hdr_len );
558+ ByteBufferAppend (& buf ,stmt ,msg_len );
557559if (stmt [0 ]== 'C' )/* commit */
558560{
559- if (!MtmFilterTransaction (stmt ,rc - hdr_len ))
561+ if (!MtmFilterTransaction (stmt ,msg_len ))
560562{
561563if (spill_file >=0 ) {
562564ByteBufferAppend (& buf ,")" ,1 );
@@ -568,7 +570,7 @@ pglogical_receiver_main(Datum main_arg)
568570spill_file = -1 ;
569571resetStringInfo (& spill_info );
570572}else {
571- if (MtmPreserveCommitOrder && buf .used == rc - hdr_len ) {
573+ if (MtmPreserveCommitOrder && buf .used == msg_len ) {
572574/* Perform commit-prepared and rollback-prepared requested directly in receiver */
573575timestamp_t stop ,start = MtmGetSystemTime ();
574576MtmExecutor (buf .data ,buf .used );