@@ -465,20 +465,37 @@ read_rel(StringInfo s, LOCKMODE mode)
465
465
}
466
466
467
467
static void
468
- MtmSetCurrentSession (int nodeId )
468
+ MtmBeginSession (int nodeId )
469
469
{
470
+ #if 0
470
471
char slot_name [MULTIMASTER_MAX_SLOT_NAME_SIZE ];
471
472
sprintf (slot_name ,MULTIMASTER_SLOT_PATTERN ,nodeId );
473
+ Assert (replorigin_session_origin == InvalidRepOriginId );
472
474
replorigin_session_origin = replorigin_by_name (slot_name , false);
475
+ MTM_INFO ("%d: Begin setup replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
473
476
replorigin_session_setup (replorigin_session_origin );
477
+ MTM_INFO ("%d: End setup replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
478
+ #endif
479
+ }
480
+
481
+ static void
482
+ MtmEndSession (void )
483
+ {
484
+ if (replorigin_session_origin != InvalidRepOriginId ) {
485
+ MTM_INFO ("%d: Begin reset replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
486
+ replorigin_session_origin = InvalidRepOriginId ;
487
+ replorigin_session_reset ();
488
+ MTM_INFO ("%d: End reset replorigin session: %d\n" ,MyProcPid ,replorigin_session_origin );
489
+ }
474
490
}
475
491
476
492
static void
477
493
process_remote_commit (StringInfo in )
478
494
{
479
- uint8 flags ;
480
- uint8 nodeId ;
481
- const char * gid = NULL ;
495
+ uint8 flags ;
496
+ uint8 nodeId ;
497
+ csn_t csn ;
498
+ const char * gid = NULL ;
482
499
483
500
/* read flags */
484
501
flags = pq_getmsgbyte (in );
@@ -489,14 +506,16 @@ process_remote_commit(StringInfo in)
489
506
pq_getmsgint64 (in );/* end_lsn */
490
507
replorigin_session_origin_timestamp = pq_getmsgint64 (in );/* commit_time */
491
508
509
+ Assert (replorigin_session_origin == InvalidRepOriginId );
510
+
492
511
switch (PGLOGICAL_XACT_EVENT (flags ))
493
512
{
494
513
case PGLOGICAL_COMMIT :
495
514
{
496
515
MTM_TRACE ("%d: PGLOGICAL_COMMIT commit\n" ,MyProcPid );
497
516
if (IsTransactionState ()) {
498
517
Assert (TransactionIdIsValid (MtmGetCurrentTransactionId ()));
499
- MtmSetCurrentSession (nodeId );
518
+ MtmBeginSession (nodeId );
500
519
CommitTransactionCommand ();
501
520
}
502
521
break ;
@@ -510,7 +529,7 @@ process_remote_commit(StringInfo in)
510
529
BeginTransactionBlock ();
511
530
CommitTransactionCommand ();
512
531
StartTransactionCommand ();
513
- MtmSetCurrentSession (nodeId );
532
+ MtmBeginSession (nodeId );
514
533
/* PREPARE itself */
515
534
MtmSetCurrentTransactionGID (gid );
516
535
PrepareTransactionBlock (gid );
@@ -520,10 +539,12 @@ process_remote_commit(StringInfo in)
520
539
case PGLOGICAL_COMMIT_PREPARED :
521
540
{
522
541
Assert (!TransactionIdIsValid (MtmGetCurrentTransactionId ()));
542
+ csn = pq_getmsgint64 (in );
523
543
gid = pq_getmsgstring (in );
524
544
MTM_TRACE ("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n" ,MyProcPid ,csn ,gid );
525
545
StartTransactionCommand ();
526
- MtmSetCurrentSession (nodeId );
546
+ MtmBeginSession (nodeId );
547
+ MtmSetCurrentTransactionCSN (csn );
527
548
MtmSetCurrentTransactionGID (gid );
528
549
FinishPreparedTransaction (gid , true);
529
550
CommitTransactionCommand ();
@@ -545,8 +566,7 @@ process_remote_commit(StringInfo in)
545
566
default :
546
567
Assert (false);
547
568
}
548
- replorigin_session_reset ();
549
- replorigin_session_origin = InvalidRepOriginId ;
569
+ MtmEndSession ();
550
570
}
551
571
552
572
static void
@@ -859,10 +879,10 @@ void MtmExecutor(int id, void* work, size_t size)
859
879
{
860
880
StringInfoData s ;
861
881
Relation rel = NULL ;
862
- initStringInfo (& s );
863
882
s .data = work ;
864
883
s .len = size ;
865
884
s .maxlen = -1 ;
885
+ s .cursor = 0 ;
866
886
867
887
if (ApplyContext == NULL ) {
868
888
ApplyContext = AllocSetContextCreate (TopMemoryContext ,
@@ -910,12 +930,10 @@ void MtmExecutor(int id, void* work, size_t size)
910
930
}
911
931
PG_CATCH ();
912
932
{
913
- if (replorigin_session_origin != InvalidRepOriginId ) {
914
- replorigin_session_reset ();
915
- }
916
933
EmitErrorReport ();
917
934
FlushErrorState ();
918
935
MTM_TRACE ("%d: REMOTE begin abort transaction %d\n" ,MyProcPid ,MtmGetCurrentTransactionId ());
936
+ MtmEndSession ();
919
937
AbortCurrentTransaction ();
920
938
MTM_TRACE ("%d: REMOTE end abort transaction %d\n" ,MyProcPid ,MtmGetCurrentTransactionId ());
921
939
}