Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit684c9f2

Browse files
authored
Feat/workflow (#66)
* init type workflow* more for workflow* more for workflow* fix tests
1 parentbb8c0a9 commit684c9f2

31 files changed

+1639
-61
lines changed

‎build/version.props‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project>
22
<PropertyGroup>
33

4-
<DtmCSharpVersion>1.4.0</DtmCSharpVersion>
4+
<DtmCSharpVersion>1.5.0</DtmCSharpVersion>
55

66
</PropertyGroup>
77
</Project>
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
usingDtmcli.DtmImp;
2+
usingDtmCommon;
3+
usingDtmSample.Dtos;
4+
usingDtmworkflow;
5+
usingMicrosoft.AspNetCore.Mvc;
6+
usingMicrosoft.Extensions.Logging;
7+
usingMicrosoft.Extensions.Options;
8+
usingSystem;
9+
usingSystem.Net.Http;
10+
usingSystem.Net.Http.Headers;
11+
usingSystem.Text;
12+
usingSystem.Text.Json;
13+
usingSystem.Threading;
14+
usingSystem.Threading.Tasks;
15+
16+
namespaceDtmSample.Controllers
17+
{
18+
[ApiController]
19+
[Route("/api")]
20+
publicclassWfTestController:ControllerBase
21+
{
22+
23+
privatereadonlyILogger<WfTestController>_logger;
24+
privatereadonlyWorlflowGlobalTransaction_globalTransaction;
25+
privatereadonlyAppSettings_settings;
26+
27+
publicWfTestController(ILogger<WfTestController>logger,IOptions<AppSettings>optionsAccs,WorlflowGlobalTransactiontransaction)
28+
{
29+
_logger=logger;
30+
_settings=optionsAccs.Value;
31+
_globalTransaction=transaction;
32+
}
33+
34+
[HttpPost("wf-simple")]
35+
publicasyncTask<IActionResult>Simple(CancellationTokencancellationToken)
36+
{
37+
try
38+
{
39+
varwfName=$"wf-simple-{Guid.NewGuid():N}";
40+
41+
_globalTransaction.Register(wfName,async(wf,data)=>
42+
{
43+
varcontent=newByteArrayContent(data);
44+
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
45+
46+
varoutClient=wf.NewBranch().NewRequest();
47+
awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOut",content);
48+
49+
varinClient=wf.NewBranch().NewRequest();
50+
awaitinClient.PostAsync(_settings.BusiUrl+"/TransIn",content);
51+
52+
returnnull;
53+
});
54+
55+
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
56+
57+
await_globalTransaction.Execute(wfName,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
58+
59+
returnOk(TransResponse.BuildSucceedResponse());
60+
}
61+
catch(Exceptionex)
62+
{
63+
_logger.LogError(ex,"Workflow Error");
64+
returnOk(TransResponse.BuildFailureResponse());
65+
}
66+
}
67+
68+
[HttpPost("wf-saga")]
69+
publicasyncTask<IActionResult>Saga(CancellationTokencancellationToken)
70+
{
71+
try
72+
{
73+
varwfName=$"wf-saga-{Guid.NewGuid():N}";
74+
75+
_globalTransaction.Register(wfName,async(wf,data)=>
76+
{
77+
varcontent=newByteArrayContent(data);
78+
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
79+
80+
WfPhase2FuncoutRbFunc=async bb=>
81+
{
82+
varrbClient=wf.NewRequest();
83+
awaitrbClient.PostAsync(_settings.BusiUrl+"/TransOutRevert",content);
84+
};
85+
varoutClient=wf.NewBranch().OnRollback(outRbFunc).NewRequest();
86+
awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOut",content);
87+
88+
WfPhase2FuncinRbFunc=async bb=>
89+
{
90+
varrbClient=wf.NewRequest();
91+
awaitrbClient.PostAsync(_settings.BusiUrl+"/TransInRevert",content);
92+
};
93+
varinClient=wf.NewBranch().OnRollback(inRbFunc).NewRequest();
94+
awaitinClient.PostAsync(_settings.BusiUrl+"/TransIn",content);
95+
96+
returnnull;
97+
});
98+
99+
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
100+
101+
await_globalTransaction.Execute(wfName,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
102+
103+
returnOk(TransResponse.BuildSucceedResponse());
104+
}
105+
catch(Exceptionex)
106+
{
107+
_logger.LogError(ex,"Workflow Saga Error");
108+
returnOk(TransResponse.BuildFailureResponse());
109+
}
110+
}
111+
112+
[HttpPost("wf-saga-rollback")]
113+
publicasyncTask<IActionResult>SagaRollBack(CancellationTokencancellationToken)
114+
{
115+
try
116+
{
117+
varwfName=$"wf-saga-rollback-{Guid.NewGuid():N}";
118+
119+
_globalTransaction.Register(wfName,async(wf,data)=>
120+
{
121+
varcontent=newByteArrayContent(data);
122+
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
123+
124+
WfPhase2FuncoutRbFunc=async bb=>
125+
{
126+
varrbClient=wf.NewRequest();
127+
awaitrbClient.PostAsync(_settings.BusiUrl+"/TransOutRevert",content);
128+
};
129+
varoutClient=wf.NewBranch().OnRollback(outRbFunc).NewRequest();
130+
varresp=awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOutError",content);
131+
132+
varex=awaitUtils.RespAsErrorCompatible(resp);
133+
if(ex!=null)throwex;
134+
135+
returnnull;
136+
});
137+
138+
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
139+
140+
await_globalTransaction.Execute(wfName,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
141+
142+
returnOk(TransResponse.BuildSucceedResponse());
143+
}
144+
catch(Exceptionex)
145+
{
146+
_logger.LogError(ex,"Workflow Saga Error");
147+
returnOk(TransResponse.BuildFailureResponse());
148+
}
149+
}
150+
151+
[HttpPost("wf-tcc")]
152+
publicasyncTask<IActionResult>Tcc(CancellationTokencancellationToken)
153+
{
154+
try
155+
{
156+
varwfName=$"wf-tcc-{Guid.NewGuid():N}";
157+
158+
_globalTransaction.Register(wfName,async(wf,data)=>
159+
{
160+
varcontent=newByteArrayContent(data);
161+
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
162+
163+
WfPhase2FuncoutRbFunc=async bb=>
164+
{
165+
varclient=wf.NewRequest();
166+
awaitclient.PostAsync(_settings.BusiUrl+"/TransOutCancel",content);
167+
};
168+
169+
WfPhase2FuncoutCmFunc=async bb=>
170+
{
171+
varclient=wf.NewRequest();
172+
awaitclient.PostAsync(_settings.BusiUrl+"/TransOutConfirm",content);
173+
};
174+
175+
varoutClient=wf.NewBranch().OnRollback(outRbFunc).OnCommit(outCmFunc).NewRequest();
176+
awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOutTry",content);
177+
178+
WfPhase2FuncinRbFunc=async bb=>
179+
{
180+
varclient=wf.NewRequest();
181+
awaitclient.PostAsync(_settings.BusiUrl+"/TransInCancel",content);
182+
};
183+
184+
WfPhase2FuncinCmFunc=async bb=>
185+
{
186+
varclient=wf.NewRequest();
187+
awaitclient.PostAsync(_settings.BusiUrl+"/TransInConfirm",content);
188+
};
189+
190+
varinClient=wf.NewBranch().OnRollback(inRbFunc).OnCommit(inCmFunc).NewRequest();
191+
awaitinClient.PostAsync(_settings.BusiUrl+"/TransInTry",content);
192+
193+
returnnull;
194+
});
195+
196+
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
197+
198+
await_globalTransaction.Execute(wfName,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
199+
200+
returnOk(TransResponse.BuildSucceedResponse());
201+
}
202+
catch(Exceptionex)
203+
{
204+
_logger.LogError(ex,"Workflow Tcc Error");
205+
returnOk(TransResponse.BuildFailureResponse());
206+
}
207+
}
208+
209+
[HttpPost("wf-tcc-rollback")]
210+
publicasyncTask<IActionResult>TccRollBack(CancellationTokencancellationToken)
211+
{
212+
try
213+
{
214+
varwfName=$"wf-tcc-rollback-{Guid.NewGuid():N}";
215+
216+
_globalTransaction.Register(wfName,async(wf,data)=>
217+
{
218+
varcontent=newByteArrayContent(data);
219+
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
220+
221+
WfPhase2FuncoutRbFunc=async bb=>
222+
{
223+
varclient=wf.NewRequest();
224+
awaitclient.PostAsync(_settings.BusiUrl+"/TransOutCancel",content);
225+
};
226+
227+
WfPhase2FuncoutCmFunc=async bb=>
228+
{
229+
varclient=wf.NewRequest();
230+
awaitclient.PostAsync(_settings.BusiUrl+"/TransOutConfirm",content);
231+
};
232+
233+
varoutClient=wf.NewBranch().OnRollback(outRbFunc).OnCommit(outCmFunc).NewRequest();
234+
varresp=awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOutTryError",content);
235+
236+
if((int)resp.StatusCode>=400)
237+
{
238+
thrownewDtmFailureException();
239+
}
240+
241+
returnnull;
242+
});
243+
244+
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
245+
246+
await_globalTransaction.Execute(wfName,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
247+
248+
returnOk(TransResponse.BuildSucceedResponse());
249+
}
250+
catch(Exceptionex)
251+
{
252+
_logger.LogError(ex,"Workflow Tcc Error");
253+
returnOk(TransResponse.BuildFailureResponse());
254+
}
255+
}
256+
}
257+
}

‎samples/DtmSample/DtmSample.csproj‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<ItemGroup>
1818
<ProjectReferenceInclude="..\..\src\Dtmcli\Dtmcli.csproj" />
1919
<ProjectReferenceInclude="..\..\src\DtmMongoBarrier\DtmMongoBarrier.csproj" />
20+
<ProjectReferenceInclude="..\..\src\Dtmworkflow\Dtmworkflow.csproj" />
2021
</ItemGroup>
2122

2223
</Project>

‎samples/DtmSample/Startup.cs‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
usingDtmcli;
2+
usingDtmworkflow;
23
usingMicrosoft.AspNetCore.Builder;
34
usingMicrosoft.AspNetCore.Hosting;
45
usingMicrosoft.Extensions.Configuration;
@@ -22,11 +23,14 @@ public Startup(IConfiguration configuration)
2223

2324
publicvoidConfigureServices(IServiceCollectionservices)
2425
{
25-
services.AddDtmcli(dtm=>
26+
services.AddDtmWorkflow(dtm=>
2627
{
2728
dtm.DtmUrl=Configuration.GetValue<string>("AppSettings:DtmUrl");
2829
dtm.SqlDbType=Configuration.GetValue<string>("AppSettings:SqlDbType");
2930
dtm.BarrierSqlTableName=Configuration.GetValue<string>("AppSettings:BarrierSqlTableName");
31+
dtm.DtmGrpcUrl=Configuration.GetValue<string>("AppSettings:DtmGrpcUrl");
32+
dtm.HttpCallback="";
33+
dtm.GrpcCallback="";
3034
});
3135

3236
services.Configure<AppSettings>(Configuration.GetSection("AppSettings"));

‎samples/DtmSample/appsettings.docker.json‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"AllowedHosts":"*",
1212
"AppSettings": {
1313
"DtmUrl":"http://dtm:36789",
14+
"DtmGrpcUrl":"http://dtm:36790",
1415
"BusiUrl":"http://sample:9090/api",
1516
"BarrierConn":"Server=db;port=3306;User ID=root;Password=123456;Database=dtm_barrier",
1617
"MongoBarrierConn":"mongodb://mgdb:27017"

‎samples/DtmSample/appsettings.json‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"AllowedHosts":"*",
1212
"AppSettings": {
1313
"DtmUrl":"http://localhost:36789",
14+
"DtmGrpcUrl":"http://localhost:36790",
1415
"BusiUrl":"http://localhost:9090/api",
1516
// if you want to run it with dapr, uncomment below two lines
1617
//"DtmUrl": "http://localhost:3602/v1.0/invoke/dtm/method",

‎src/DtmCommon/Constant.cs‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,24 @@ public class Constant
1212
publicstaticreadonlystringResultSuccess="SUCCESS";
1313
publicstaticreadonlystringResultOngoing="ONGOING";
1414

15+
publicstaticreadonlystringStatusPrepared="prepared";
16+
publicstaticreadonlystringStatusSubmitted="submitted";
17+
publicstaticreadonlystringStatusSucceed="succeed";
18+
publicstaticreadonlystringStatusFailed="failed";
19+
publicstaticreadonlystringStatusAborting="aborting";
20+
21+
publicstaticreadonlystringOpTry="try";
22+
publicstaticreadonlystringOpConfirm="confirm";
23+
publicstaticreadonlystringOpCancel="cancel";
24+
publicstaticreadonlystringOpAction="action";
25+
publicstaticreadonlystringOpCompensate="compensate";
26+
publicstaticreadonlystringOpCommit="commit";
27+
publicstaticreadonlystringOpRollback="rollback";
28+
29+
publicstaticreadonlystringProtocolGRPC="grpc";
30+
publicstaticreadonlystringProtocolHTTP="http";
31+
32+
1533
/// <summary>
1634
/// error of DUPLICATED for only msg
1735
/// if QueryPrepared executed before call. then DoAndSubmit return this error

‎src/DtmCommon/DtmOptions.cs‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,9 @@ public class DtmOptions
4141
/// branch request timeout in milliseconds, default 10,000 milliseconds(10s)
4242
/// </summary>
4343
publicintBranchTimeout{get;set;}=10*1000;
44+
45+
publicstringHttpCallback{get;set;}
46+
47+
publicstringGrpcCallback{get;set;}
4448
}
4549
}

‎src/Dtmcli/DtmClient.cs‎

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ public async Task<string> GenGid(CancellationToken cancellationToken)
4343
returndtmgid.Gid;
4444
}
4545

46+
publicHttpClientGetHttpClient(stringname)
47+
{
48+
return_httpClientFactory.CreateClient(name);
49+
}
50+
51+
publicasyncTask<HttpResponseMessage>PrepareWorkflow(TransBasetb,CancellationTokencancellationToken)
52+
{
53+
varurl=string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash),Constant.Request.URLBASE_PREFIX,"prepareWorkflow");
54+
55+
varcontent=newStringContent(JsonSerializer.Serialize(tb,_jsonOptions));
56+
content.Headers.ContentType=newMediaTypeHeaderValue(Constant.Request.CONTENT_TYPE);
57+
58+
varclient=_httpClientFactory.CreateClient(Constant.DtmClientHttpName);
59+
varresponse=awaitclient.PostAsync(url,content,cancellationToken).ConfigureAwait(false);
60+
returnresponse;
61+
}
62+
4663
publicasyncTaskTransCallDtm(TransBasetb,objectbody,stringoperation,CancellationTokencancellationToken)
4764
{
4865
varurl=string.Concat(_dtmOptions.DtmUrl.TrimEnd(Slash),Constant.Request.URLBASE_PREFIX,operation);

‎src/Dtmcli/DtmImp/Utils.cs‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ public static async Task<Exception> RespAsErrorCompatible(HttpResponseMessage re
1818
// System.Net.HttpStatusCode do not contain StatusTooEarly
1919
if((int)resp.StatusCode==StatusTooEarly||str.Contains(DtmCommon.Constant.ResultOngoing))
2020
{
21-
returnnewDtmException(DtmCommon.Constant.ResultOngoing);
21+
returnnewDtmOngingException();
2222
}
2323
elseif(resp.StatusCode==HttpStatusCode.Conflict||str.Contains(DtmCommon.Constant.ResultFailure))
2424
{
25-
returnnewDtmException(DtmCommon.Constant.ResultFailure);
25+
returnnewDtmFailureException();
2626
}
2727
elseif(resp.StatusCode!=HttpStatusCode.OK)
2828
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp