@@ -465,20 +465,37 @@ read_rel(StringInfo s, LOCKMODE mode)
465465}
466466
467467static void
468- MtmSetCurrentSession (int nodeId )
468+ MtmBeginSession (int nodeId )
469469{
470+ #if 0
470471char slot_name [MULTIMASTER_MAX_SLOT_NAME_SIZE ];
471472sprintf (slot_name ,MULTIMASTER_SLOT_PATTERN ,nodeId );
473+ Assert (replorigin_session_origin == InvalidRepOriginId );
472474replorigin_session_origin = replorigin_by_name (slot_name , false);
475+ MTM_INFO ("%d: Begin setup replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
473476replorigin_session_setup (replorigin_session_origin );
477+ MTM_INFO ("%d: End setup replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
478+ #endif
479+ }
480+
481+ static void
482+ MtmEndSession (void )
483+ {
484+ if (replorigin_session_origin != InvalidRepOriginId ) {
485+ MTM_INFO ("%d: Begin reset replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
486+ replorigin_session_origin = InvalidRepOriginId ;
487+ replorigin_session_reset ();
488+ MTM_INFO ("%d: End reset replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
489+ }
474490}
475491
476492static void
477493process_remote_commit (StringInfo in )
478494{
479- uint8 flags ;
480- uint8 nodeId ;
481- const char * gid = NULL ;
495+ uint8 flags ;
496+ uint8 nodeId ;
497+ csn_t csn ;
498+ const char * gid = NULL ;
482499
483500/* read flags */
484501flags = pq_getmsgbyte (in );
@@ -489,14 +506,16 @@ process_remote_commit(StringInfo in)
489506pq_getmsgint64 (in );/* end_lsn */
490507replorigin_session_origin_timestamp = pq_getmsgint64 (in );/* commit_time */
491508
509+ Assert (replorigin_session_origin == InvalidRepOriginId );
510+
492511switch (PGLOGICAL_XACT_EVENT (flags ))
493512{
494513case PGLOGICAL_COMMIT :
495514{
496515MTM_TRACE ("%d: PGLOGICAL_COMMIT commit\n" ,MyProcPid );
497516if (IsTransactionState ()) {
498517Assert (TransactionIdIsValid (MtmGetCurrentTransactionId ()));
499- MtmSetCurrentSession (nodeId );
518+ MtmBeginSession (nodeId );
500519CommitTransactionCommand ();
501520}
502521break ;
@@ -510,7 +529,7 @@ process_remote_commit(StringInfo in)
510529BeginTransactionBlock ();
511530CommitTransactionCommand ();
512531StartTransactionCommand ();
513- MtmSetCurrentSession (nodeId );
532+ MtmBeginSession (nodeId );
514533/* PREPARE itself */
515534MtmSetCurrentTransactionGID (gid );
516535PrepareTransactionBlock (gid );
@@ -520,10 +539,12 @@ process_remote_commit(StringInfo in)
520539case PGLOGICAL_COMMIT_PREPARED :
521540{
522541Assert (!TransactionIdIsValid (MtmGetCurrentTransactionId ()));
542+ csn = pq_getmsgint64 (in );
523543gid = pq_getmsgstring (in );
524544MTM_TRACE ("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n" ,MyProcPid ,csn ,gid );
525545StartTransactionCommand ();
526- MtmSetCurrentSession (nodeId );
546+ MtmBeginSession (nodeId );
547+ MtmSetCurrentTransactionCSN (csn );
527548MtmSetCurrentTransactionGID (gid );
528549FinishPreparedTransaction (gid , true);
529550CommitTransactionCommand ();
@@ -545,8 +566,7 @@ process_remote_commit(StringInfo in)
545566default :
546567Assert (false);
547568}
548- replorigin_session_reset ();
549- replorigin_session_origin = InvalidRepOriginId ;
569+ MtmEndSession ();
550570}
551571
552572static void
@@ -859,10 +879,10 @@ void MtmExecutor(int id, void* work, size_t size)
859879{
860880StringInfoData s ;
861881Relation rel = NULL ;
862- initStringInfo (& s );
863882s .data = work ;
864883s .len = size ;
865884s .maxlen = -1 ;
885+ s .cursor = 0 ;
866886
867887if (ApplyContext == NULL ) {
868888ApplyContext = AllocSetContextCreate (TopMemoryContext ,
@@ -910,12 +930,10 @@ void MtmExecutor(int id, void* work, size_t size)
910930 }
911931PG_CATCH ();
912932 {
913- if (replorigin_session_origin != InvalidRepOriginId ) {
914- replorigin_session_reset ();
915- }
916933EmitErrorReport ();
917934FlushErrorState ();
918935MTM_TRACE ("%d: REMOTE begin abort transaction %d\n" ,MyProcPid ,MtmGetCurrentTransactionId ());
936+ MtmEndSession ();
919937AbortCurrentTransaction ();
920938MTM_TRACE ("%d: REMOTE end abort transaction %d\n" ,MyProcPid ,MtmGetCurrentTransactionId ());
921939 }