11using Microsoft . Extensions . DependencyInjection ;
22using System ;
3+ using System . Data . Common ;
34using System . Threading . Tasks ;
5+ using System . Transactions ;
6+ using Dapper ;
7+ using Grpc . Core ;
8+ using MySqlConnector ;
49using Xunit ;
510
611namespace Dtmgrpc . IntegrationTests
@@ -27,5 +32,100 @@ public async Task Submit_Should_Succeed()
2732var status = await ITTestHelper . GetTranStatus ( gid ) ;
2833Assert . Equal ( "succeed" , status ) ;
2934}
35+
36+ [ Fact ]
37+ public async Task DoAndSubmit_Should_DbTrans_Exception ( )
38+ {
39+ var provider = ITTestHelper . AddDtmGrpc ( ) ;
40+ var transFactory = provider . GetRequiredService < IDtmTransFactory > ( ) ;
41+
42+ var gid = "msgTestGid" + Guid . NewGuid ( ) . ToString ( ) ;
43+ var msg = transFactory . NewMsgGrpc ( gid ) ;
44+ var req = ITTestHelper . GenBusiReq ( false , false ) ;
45+ var busiGrpc = ITTestHelper . BuisgRPCUrl ;
46+
47+ msg . Add ( busiGrpc + "/busi.Busi/TransIn" , req ) ;
48+ // do TransOut local, then TransIn with DTM.
49+ await Assert . ThrowsAsync < System . InvalidOperationException > ( async ( ) =>
50+ {
51+ // System.InvalidOperationException: A TransactionScope must be disposed on the same thread that it was created.
52+ //
53+ // System.InvalidOperationException
54+ // A TransactionScope must be disposed on the same thread that it was created.
55+ // at Dtmgrpc.MsgGrpc.DoAndSubmit(String queryPrepared, Func`2 busiCall, CancellationToken cancellationToken) in /home/yunjin/Data/projects/github/dtm-labs/client-csharp/src/Dtmgrpc/Msg/MsgGrpc.cs:line 110
56+
57+ await msg . DoAndSubmit ( busiGrpc + "/busi.Busi/QueryPreparedMySqlReal" , async branchBarrier=>
58+ {
59+ MySqlConnection conn = getBarrierMySqlConnection ( ) ;
60+ await branchBarrier . Call ( conn , ( ) =>
61+ {
62+ Task task = this . LocalAdjustBalance ( conn , TransOutUID , - req . Amount , "SUCCESS" ) ;
63+ return task ;
64+ } ,
65+ TransactionScopeOption . Required ,
66+ IsolationLevel . ReadCommitted
67+ // , default TransactionScopeAsyncFlowOption.Suppress
68+ ) ;
69+ } ) ;
70+ } ) ;
71+
72+ await Task . Delay ( 4000 ) ;
73+ var status = await ITTestHelper . GetTranStatus ( gid ) ;
74+ // The exception did not affect the local transaction commit
75+ Assert . Equal ( "succeed" , status ) ;
76+ }
77+
78+ [ Fact ]
79+ public async Task DoAndSubmit_Should_Succeed ( )
80+ {
81+ var provider = ITTestHelper . AddDtmGrpc ( ) ;
82+ var transFactory = provider . GetRequiredService < IDtmTransFactory > ( ) ;
83+
84+ var gid = "msgTestGid" + Guid . NewGuid ( ) . ToString ( ) ;
85+ var msg = transFactory . NewMsgGrpc ( gid ) ;
86+ var req = ITTestHelper . GenBusiReq ( false , false ) ;
87+ var busiGrpc = ITTestHelper . BuisgRPCUrl ;
88+
89+ msg . Add ( busiGrpc + "/busi.Busi/TransIn" , req ) ;
90+ // do TransOut local, then TransIn with DTM.
91+
92+ await msg . DoAndSubmit ( busiGrpc + "/busi.Busi/QueryPreparedMySqlReal" , async branchBarrier=>
93+ {
94+ MySqlConnection conn = getBarrierMySqlConnection ( ) ;
95+ await branchBarrier . Call ( conn , ( ) =>
96+ {
97+ Task task = this . LocalAdjustBalance ( conn , TransOutUID , - req . Amount , "SUCCESS" ) ;
98+ return task ;
99+ } ,
100+ TransactionScopeOption . Required ,
101+ IsolationLevel . ReadCommitted ,
102+ TransactionScopeAsyncFlowOption . Enabled ) ;
103+ } ) ;
104+
105+ await Task . Delay ( 2000 ) ;
106+ var status = await ITTestHelper . GetTranStatus ( gid ) ;
107+ Assert . Equal ( "succeed" , status ) ;
108+ }
109+
110+ private static readonly int TransOutUID = 1 ;
111+
112+ private static readonly int TransInUID = 2 ;
113+
114+ private MySqlConnection getBarrierMySqlConnection ( ) => new ( "Server=localhost;port=3306;User ID=root;Password=123456;Database=dtm_barrier" ) ;
115+
116+ private async Task LocalAdjustBalance ( DbConnection conn , int uid , long amount , string result )
117+ {
118+ // _logger.LogInformation("AdjustBalanceLocal uid={uid}, amount={amount}, result={result}", uid, amount, result);
119+
120+ if ( result . Equals ( "FAILURE" ) )
121+ {
122+ throw new RpcException ( new Status ( StatusCode . Aborted , "FAILURE" ) ) ;
123+ }
124+
125+ await conn . ExecuteAsync (
126+ sql : "update dtm_busi.user_account set balance = balance + @balance where user_id = @user_id" ,
127+ param : new { balance = amount , user_id = uid }
128+ ) ;
129+ }
30130}
31131}