@@ -59,7 +59,7 @@ typedef struct TupleData
59
59
bool changed [MaxTupleAttributeNumber ];
60
60
}TupleData ;
61
61
62
- static int MtmTransactionRecords ;
62
+ static bool inside_tx = false ;
63
63
64
64
static Relation read_rel (StringInfo s ,LOCKMODE mode );
65
65
static void read_tuple_parts (StringInfo s ,Relation rel ,TupleData * tup );
@@ -528,6 +528,8 @@ MtmEndSession(void)
528
528
if (replorigin_session_origin != InvalidRepOriginId ) {
529
529
MTM_LOG2 ("%d: Begin reset replorigin session for node %d: %d, progress %lx" ,MyProcPid ,MtmReplicationNodeId ,replorigin_session_origin ,replorigin_session_get_progress (false));
530
530
replorigin_session_origin = InvalidRepOriginId ;
531
+ replorigin_session_origin_lsn = InvalidXLogRecPtr ;
532
+ replorigin_session_origin_timestamp = 0 ;
531
533
replorigin_session_reset ();
532
534
if (unlock ) {
533
535
MtmUnlockNode (MtmReplicationNodeId );
@@ -539,42 +541,25 @@ MtmEndSession(void)
539
541
static void
540
542
process_remote_commit (StringInfo in )
541
543
{
542
- int i ;
543
544
uint8 flags ;
544
545
csn_t csn ;
545
546
const char * gid = NULL ;
546
547
XLogRecPtr end_lsn ;
547
548
XLogRecPtr origin_lsn ;
548
- RepOriginId originId ;
549
- int n_records ;
549
+ int origin_node ;
550
550
/* read flags */
551
551
flags = pq_getmsgbyte (in );
552
552
MtmReplicationNodeId = pq_getmsgbyte (in );
553
553
554
- n_records = pq_getmsgint (in ,4 );
555
- if (MtmTransactionRecords != n_records ) {
556
- elog (ERROR ,"Transaction %d flags %d contains %d records instead of %d" ,MtmGetCurrentTransactionId (),flags ,MtmTransactionRecords ,n_records );
557
- }
558
-
559
554
/* read fields */
560
555
replorigin_session_origin_lsn = pq_getmsgint64 (in );/* commit_lsn */
561
556
end_lsn = pq_getmsgint64 (in );/* end_lsn */
562
557
replorigin_session_origin_timestamp = pq_getmsgint64 (in );/* commit_time */
563
558
564
- originId = ( RepOriginId ) pq_getmsgint ( in , 2 );
559
+ origin_node = pq_getmsgbyte ( in );
565
560
origin_lsn = pq_getmsgint64 (in );
561
+ Mtm -> nodes [origin_node - 1 ].restartLsn = origin_lsn ;
566
562
567
- if (originId != InvalidRepOriginId ) {
568
- for (i = 0 ;i < Mtm -> nAllNodes ;i ++ ) {
569
- if (Mtm -> nodes [i ].originId == originId ) {
570
- Mtm -> nodes [i ].restartLsn = origin_lsn ;
571
- break ;
572
- }
573
- }
574
- if (i == Mtm -> nAllNodes ) {
575
- elog (WARNING ,"Failed to map origin %d" ,originId );
576
- }
577
- }
578
563
Assert (replorigin_session_origin == InvalidRepOriginId );
579
564
580
565
switch (PGLOGICAL_XACT_EVENT (flags ))
@@ -676,8 +661,6 @@ process_remote_insert(StringInfo s, Relation rel)
676
661
ScanKey * index_keys ;
677
662
int i ;
678
663
679
- MtmTransactionRecords += 1 ;
680
-
681
664
estate = create_rel_estate (rel );
682
665
newslot = ExecInitExtraTupleSlot (estate );
683
666
oldslot = ExecInitExtraTupleSlot (estate );
@@ -776,8 +759,6 @@ process_remote_update(StringInfo s, Relation rel)
776
759
ScanKeyData skey [INDEX_MAX_KEYS ];
777
760
HeapTuple remote_tuple = NULL ;
778
761
779
- MtmTransactionRecords += 1 ;
780
-
781
762
action = pq_getmsgbyte (s );
782
763
783
764
/* old key present, identifying key changed */
@@ -895,8 +876,6 @@ process_remote_delete(StringInfo s, Relation rel)
895
876
ScanKeyData skey [INDEX_MAX_KEYS ];
896
877
bool found_old ;
897
878
898
- MtmTransactionRecords += 1 ;
899
-
900
879
estate = create_rel_estate (rel );
901
880
oldslot = ExecInitExtraTupleSlot (estate );
902
881
ExecSetSlotDescriptor (oldslot ,RelationGetDescr (rel ));
@@ -984,7 +963,6 @@ void MtmExecutor(int id, void* work, size_t size)
984
963
}
985
964
MemoryContextSwitchTo (ApplyContext );
986
965
replorigin_session_origin = InvalidRepOriginId ;
987
- MtmTransactionRecords = 0 ;
988
966
PG_TRY ();
989
967
{
990
968
while (true) {