@@ -474,8 +474,9 @@ static void
474
474
apply_handle_commit (StringInfo s )
475
475
{
476
476
LogicalRepCommitData commit_data ;
477
+ uint8 flags = 0 ;
477
478
478
- logicalrep_read_commit (s ,& commit_data );
479
+ logicalrep_read_commit (s ,& commit_data , & flags );
479
480
480
481
Assert (commit_data .commit_lsn == remote_final_lsn );
481
482
@@ -489,7 +490,11 @@ apply_handle_commit(StringInfo s)
489
490
replorigin_session_origin_lsn = commit_data .end_lsn ;
490
491
replorigin_session_origin_timestamp = commit_data .committime ;
491
492
492
- CommitTransactionCommand ();
493
+ if (flags & LOGICALREP_IS_COMMIT )
494
+ CommitTransactionCommand ();
495
+ else if (flags & LOGICALREP_IS_ABORT )
496
+ AbortCurrentTransaction ();
497
+
493
498
pgstat_report_stat (false);
494
499
495
500
store_flush_position (commit_data .end_lsn );
@@ -509,6 +514,121 @@ apply_handle_commit(StringInfo s)
509
514
pgstat_report_activity (STATE_IDLE ,NULL );
510
515
}
511
516
517
+ static void
518
+ apply_handle_prepare_txn (LogicalRepCommitData * commit_data )
519
+ {
520
+ Assert (commit_data -> commit_lsn == remote_final_lsn );
521
+ /* The synchronization worker runs in single transaction. */
522
+ if (IsTransactionState ()&& !am_tablesync_worker ())
523
+ {
524
+ /* End the earlier transaction and start a new one */
525
+ BeginTransactionBlock ();
526
+ CommitTransactionCommand ();
527
+ StartTransactionCommand ();
528
+ /*
529
+ * Update origin state so we can restart streaming from correct
530
+ * position in case of crash.
531
+ */
532
+ replorigin_session_origin_lsn = commit_data -> end_lsn ;
533
+ replorigin_session_origin_timestamp = commit_data -> committime ;
534
+
535
+ PrepareTransactionBlock (commit_data -> gid );
536
+ CommitTransactionCommand ();
537
+ pgstat_report_stat (false);
538
+
539
+ store_flush_position (commit_data -> end_lsn );
540
+ }
541
+ else
542
+ {
543
+ /* Process any invalidation messages that might have accumulated. */
544
+ AcceptInvalidationMessages ();
545
+ maybe_reread_subscription ();
546
+ }
547
+
548
+ in_remote_transaction = false;
549
+
550
+ /* Process any tables that are being synchronized in parallel. */
551
+ process_syncing_tables (commit_data -> end_lsn );
552
+
553
+ pgstat_report_activity (STATE_IDLE ,NULL );
554
+ }
555
+
556
+ static void
557
+ apply_handle_commit_prepared_txn (LogicalRepCommitData * commit_data )
558
+ {
559
+ /* there is no transaction when COMMIT PREPARED is called */
560
+ ensure_transaction ();
561
+
562
+ /*
563
+ * Update origin state so we can restart streaming from correct
564
+ * position in case of crash.
565
+ */
566
+ replorigin_session_origin_lsn = commit_data -> end_lsn ;
567
+ replorigin_session_origin_timestamp = commit_data -> committime ;
568
+
569
+ FinishPreparedTransaction (commit_data -> gid , true);
570
+ CommitTransactionCommand ();
571
+ pgstat_report_stat (false);
572
+
573
+ store_flush_position (commit_data -> end_lsn );
574
+ in_remote_transaction = false;
575
+
576
+ /* Process any tables that are being synchronized in parallel. */
577
+ process_syncing_tables (commit_data -> end_lsn );
578
+
579
+ pgstat_report_activity (STATE_IDLE ,NULL );
580
+ }
581
+
582
+ static void
583
+ apply_handle_rollback_prepared_txn (LogicalRepCommitData * commit_data )
584
+ {
585
+ /* there is no transaction when ABORT/ROLLBACK PREPARED is called */
586
+ ensure_transaction ();
587
+
588
+ /*
589
+ * Update origin state so we can restart streaming from correct
590
+ * position in case of crash.
591
+ */
592
+ replorigin_session_origin_lsn = commit_data -> end_lsn ;
593
+ replorigin_session_origin_timestamp = commit_data -> committime ;
594
+
595
+ /* FIXME: it is ok if xact is absent */
596
+ FinishPreparedTransaction (commit_data -> gid , false);
597
+ CommitTransactionCommand ();
598
+ pgstat_report_stat (false);
599
+
600
+ store_flush_position (commit_data -> end_lsn );
601
+ in_remote_transaction = false;
602
+
603
+ /* Process any tables that are being synchronized in parallel. */
604
+ process_syncing_tables (commit_data -> end_lsn );
605
+
606
+ pgstat_report_activity (STATE_IDLE ,NULL );
607
+ }
608
+
609
+ /*
610
+ * Handle PREPARE message.
611
+ */
612
+ static void
613
+ apply_handle_prepare (StringInfo s )
614
+ {
615
+ LogicalRepCommitData commit_data ;
616
+ uint8 flags = 0 ;
617
+
618
+ logicalrep_read_prepare (s ,& commit_data ,& flags );
619
+
620
+ if (flags & LOGICALREP_IS_PREPARE )
621
+ apply_handle_prepare_txn (& commit_data );
622
+ else if (flags & LOGICALREP_IS_COMMIT_PREPARED )
623
+ apply_handle_commit_prepared_txn (& commit_data );
624
+ else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED )
625
+ apply_handle_rollback_prepared_txn (& commit_data );
626
+ else
627
+ ereport (ERROR ,
628
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
629
+ errmsg ("wrong [commit|rollback] prepare message" )));
630
+ }
631
+
512
632
/*
513
633
* Handle ORIGIN message.
514
634
*
@@ -969,10 +1089,14 @@ apply_dispatch(StringInfo s)
969
1089
case 'B' :
970
1090
apply_handle_begin (s );
971
1091
break ;
972
- /* COMMIT */
1092
+ /* COMMIT|ABORT */
973
1093
case 'C' :
974
1094
apply_handle_commit (s );
975
1095
break ;
1096
+ /* [COMMIT|ROLLBACK] PREPARE */
1097
+ case 'P' :
1098
+ apply_handle_prepare (s );
1099
+ break ;
976
1100
/* INSERT */
977
1101
case 'I' :
978
1102
apply_handle_insert (s );