Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit477b12b

Browse files
authored
bugfix: fix workflow return null handling (#94)
- Fix null value handling for execute again- Add unit test and sample for different workflow return
1 parent7242fd7 commit477b12b

File tree

3 files changed

+114
-83
lines changed

3 files changed

+114
-83
lines changed

‎samples/DtmSample/Controllers/WfTestController.cs‎

Lines changed: 2 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
usingMicrosoft.Extensions.Options;
88
usingSystem;
99
usingSystem.IO;
10+
usingSystem.Diagnostics;
1011
usingSystem.Net.Http;
1112
usingSystem.Net.Http.Headers;
1213
usingSystem.Text;
1314
usingSystem.Text.Json;
1415
usingSystem.Text.Unicode;
1516
usingSystem.Threading;
1617
usingSystem.Threading.Tasks;
18+
usingException=System.Exception;
1719

1820
namespaceDtmSample.Controllers
1921
{
@@ -255,84 +257,5 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
255257
returnOk(TransResponse.BuildFailureResponse());
256258
}
257259
}
258-
259-
260-
privatestaticreadonlystringwfNameForResume="wfNameForResume";
261-
262-
/// <summary>
263-
///
264-
/// </summary>
265-
/// <param name="cancellationToken"></param>
266-
/// <returns></returns>
267-
[HttpPost("wf-crash")]
268-
publicasyncTask<IActionResult>Crash(CancellationTokencancellationToken)
269-
{
270-
if(!_globalTransaction.Exists(wfNameForResume))
271-
{
272-
_globalTransaction.Register(wfNameForResume,async(wf,data)=>
273-
{
274-
varcontent=newByteArrayContent(data);
275-
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
276-
277-
varoutClient=wf.NewBranch().NewRequest();
278-
awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOut",content);
279-
280-
// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
281-
// manual stop application
282-
Environment.Exit(0);
283-
284-
varinClient=wf.NewBranch().NewRequest();
285-
awaitinClient.PostAsync(_settings.BusiUrl+"/TransIn",content);
286-
287-
returnnull;
288-
});
289-
}
290-
291-
varreq=JsonSerializer.Serialize(newTransRequest("1",-30));
292-
await_globalTransaction.Execute(wfNameForResume,Guid.NewGuid().ToString("N"),Encoding.UTF8.GetBytes(req),true);
293-
294-
returnOk(TransResponse.BuildSucceedResponse());
295-
}
296-
297-
[HttpPost("wf-resume")]
298-
publicasyncTask<IActionResult>WfResume(CancellationTokencancellationToken)
299-
{
300-
try
301-
{
302-
if(!_globalTransaction.Exists(wfNameForResume))
303-
{
304-
// register again after manual crash by Environment.Exit(0);
305-
_globalTransaction.Register(wfNameForResume,async(wf,data)=>
306-
{
307-
varcontent=newByteArrayContent(data);
308-
content.Headers.ContentType=newMediaTypeHeaderValue("application/json");
309-
310-
varoutClient=wf.NewBranch().NewRequest();
311-
awaitoutClient.PostAsync(_settings.BusiUrl+"/TransOut",content);
312-
313-
varinClient=wf.NewBranch().NewRequest();
314-
awaitinClient.PostAsync(_settings.BusiUrl+"/TransIn",content);
315-
316-
returnnull;
317-
});
318-
}
319-
320-
// prepared call ExecuteByQS
321-
usingvarbodyMemoryStream=newMemoryStream();
322-
awaitRequest.Body.CopyToAsync(bodyMemoryStream,cancellationToken);
323-
byte[]bytes=bodyMemoryStream.ToArray();
324-
stringbody=Encoding.UTF8.GetString(bytes);
325-
_logger.LogDebug($"body:{body}");
326-
327-
await_globalTransaction.ExecuteByQS(Request.Query,bodyMemoryStream.ToArray());
328-
329-
returnOk(TransResponse.BuildSucceedResponse());
330-
}
331-
catch(Exceptionex)
332-
{
333-
_logger.LogError(ex,"Workflow Error");
334-
returnOk(TransResponse.BuildFailureResponse());
335-
}
336-
}
337260
}
338261
}

‎src/Dtmworkflow/Workflow.Imp.cs‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
3939
varstatus=reply.Transaction.Status;
4040
if(status==DtmCommon.Constant.StatusSucceed)
4141
{
42-
varsRes=Convert.FromBase64String(reply.Transaction.Result);
42+
varsRes=reply.Transaction.Result!=null
43+
?Convert.FromBase64String(reply.Transaction.Result)
44+
:null;
4345
returnsRes;
4446
}
4547
elseif(status==DtmCommon.Constant.StatusFailed)

‎tests/Dtmworkflow.Tests/WorkflowHttpTests.cs‎

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,114 @@ public async void Commit_Should_Be_Executed()
255255
rollBackFunc.Verify(x=>x.Invoke(It.IsAny<BranchBarrier>()),Times.Never);
256256
commitFunc.Verify(x=>x.Invoke(It.IsAny<BranchBarrier>()),Times.Once);
257257
}
258+
259+
[Fact]
260+
publicasyncTaskExecute_Result_Should_Be_WfFunc2()
261+
{
262+
varfactory=newMock<IWorkflowFactory>();
263+
varhttpClient=newMock<IDtmClient>();
264+
vargrpcClient=newMock<IDtmgRPCClient>();
265+
varhttpBb=newMock<Dtmcli.IBranchBarrierFactory>();
266+
267+
SetupPrepareWorkflow(httpClient,DtmCommon.Constant.StatusPrepared,null);
268+
varwf=SetupWorkFlow(httpClient,grpcClient,httpBb);
269+
270+
factory.Setup(x=>x.NewWorkflow(It.IsAny<string>(),It.IsAny<string>(),It.IsAny<byte[]>(),It.IsAny<bool>())).Returns(wf.Object);
271+
272+
varwfgt=newWorkflowGlobalTransaction(factory.Object,NullLoggerFactory.Instance);
273+
274+
varwfName=nameof(Execute_Result_Should_Be_WfFunc2);
275+
vargid=Guid.NewGuid().ToString("N");
276+
277+
wfgt.Register(wfName,(workflow,data)=>Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
278+
279+
varreq=JsonSerializer.Serialize(new{userId="1",amount=30});
280+
varres=awaitwfgt.Execute(wfName,gid,Encoding.UTF8.GetBytes(req),true);
281+
282+
Assert.Equal("return value from WfFunc2",Encoding.UTF8.GetString(res));
283+
}
284+
285+
[Fact]
286+
publicasyncTaskExecute_Result_Should_Be_Previous()
287+
{
288+
varfactory=newMock<IWorkflowFactory>();
289+
varhttpClient=newMock<IDtmClient>();
290+
vargrpcClient=newMock<IDtmgRPCClient>();
291+
varhttpBb=newMock<Dtmcli.IBranchBarrierFactory>();
292+
293+
SetupPrepareWorkflow(httpClient,DtmCommon.Constant.StatusSucceed,"return value from previous");
294+
varwf=SetupWorkFlow(httpClient,grpcClient,httpBb);
295+
296+
factory.Setup(x=>x.NewWorkflow(It.IsAny<string>(),It.IsAny<string>(),It.IsAny<byte[]>(),It.IsAny<bool>())).Returns(wf.Object);
297+
298+
varwfgt=newWorkflowGlobalTransaction(factory.Object,NullLoggerFactory.Instance);
299+
300+
varwfName=nameof(Execute_Result_Should_Be_Previous);
301+
vargid=Guid.NewGuid().ToString("N");
302+
303+
wfgt.Register(wfName,(workflow,data)=>Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
304+
305+
varreq=JsonSerializer.Serialize(new{userId="1",amount=30});
306+
varres=awaitwfgt.Execute(wfName,gid,Encoding.UTF8.GetBytes(req),true);
307+
308+
Assert.Equal("return value from previous",Encoding.UTF8.GetString(res));
309+
}
310+
311+
[Fact]
312+
publicasyncTaskExecute_Again_Result_Should_Be_Previous()
313+
{
314+
varfactory=newMock<IWorkflowFactory>();
315+
varhttpClient1=newMock<IDtmClient>();
316+
varhttpClient2=newMock<IDtmClient>();
317+
vargrpcClient=newMock<IDtmgRPCClient>();
318+
varhttpBb=newMock<Dtmcli.IBranchBarrierFactory>();
319+
320+
// first
321+
SetupPrepareWorkflow(httpClient1,DtmCommon.Constant.StatusPrepared,null);
322+
varwf=SetupWorkFlow(httpClient1,grpcClient,httpBb);
323+
factory.Setup(x=>x.NewWorkflow(It.IsAny<string>(),It.IsAny<string>(),It.IsAny<byte[]>(),It.IsAny<bool>())).Returns(wf.Object);
324+
varwfgt=newWorkflowGlobalTransaction(factory.Object,NullLoggerFactory.Instance);
325+
varwfName=nameof(Execute_Again_Result_Should_Be_Previous);
326+
vargid=Guid.NewGuid().ToString("N");
327+
wfgt.Register(wfName,(workflow,data)=>Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
328+
varreq=JsonSerializer.Serialize(new{userId="1",amount=30});
329+
varres=awaitwfgt.Execute(wfName,gid,Encoding.UTF8.GetBytes(req),true);
330+
Assert.Equal("return value from WfFunc2",Encoding.UTF8.GetString(res));
331+
332+
// again
333+
SetupPrepareWorkflow(httpClient2,DtmCommon.Constant.StatusSucceed,"return value from previous");
334+
wf=SetupWorkFlow(httpClient2,grpcClient,httpBb);
335+
factory.Setup(x=>x.NewWorkflow(It.IsAny<string>(),It.IsAny<string>(),It.IsAny<byte[]>(),It.IsAny<bool>())).Returns(wf.Object);
336+
wfgt=newWorkflowGlobalTransaction(factory.Object,NullLoggerFactory.Instance);
337+
gid=Guid.NewGuid().ToString("N");
338+
wfgt.Register(wfName,(workflow,data)=>Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
339+
req=JsonSerializer.Serialize(new{userId="1",amount=30});
340+
res=awaitwfgt.Execute(wfName,gid,Encoding.UTF8.GetBytes(req),true);
341+
Assert.Equal("return value from previous",Encoding.UTF8.GetString(res));
342+
}
343+
344+
[Fact]
345+
publicasyncTaskExecute_Again_Result_StringEmpty()
346+
{
347+
varfactory=newMock<IWorkflowFactory>();
348+
varhttpClient=newMock<IDtmClient>();
349+
vargrpcClient=newMock<IDtmgRPCClient>();
350+
varhttpBb=newMock<Dtmcli.IBranchBarrierFactory>();
351+
352+
// again
353+
SetupPrepareWorkflow(httpClient,DtmCommon.Constant.StatusSucceed,null);
354+
varwf=SetupWorkFlow(httpClient,grpcClient,httpBb);
355+
factory.Setup(x=>x.NewWorkflow(It.IsAny<string>(),It.IsAny<string>(),It.IsAny<byte[]>(),It.IsAny<bool>())).Returns(wf.Object);
356+
varwfgt=newWorkflowGlobalTransaction(factory.Object,NullLoggerFactory.Instance);
357+
varwfName=nameof(Execute_Again_Result_StringEmpty);
358+
vargid=Guid.NewGuid().ToString("N");
359+
wfgt.Register(wfName,(workflow,data)=>Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
360+
varreq=JsonSerializer.Serialize(new{userId="1",amount=30});
361+
varres=awaitwfgt.Execute(wfName,gid,Encoding.UTF8.GetBytes(req),true);
362+
Assert.Null(res);
363+
}
258364

259-
privatevoidSetupPrepareWorkflow(Mock<IDtmClient>httpClient,stringstatus,stringresult,List<DtmProgressDto>progressDtos=null)
365+
privatevoidSetupPrepareWorkflow(Mock<IDtmClient>httpClient,stringstatus,string?result,List<DtmProgressDto>progressDtos=null)
260366
{
261367
varhttpResp=newHttpResponseMessage(HttpStatusCode.OK);
262368
httpResp.Content=newStringContent(JsonSerializer.Serialize(
@@ -265,9 +371,9 @@ private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, st
265371
Transaction=newDtmTransactionDto
266372
{
267373
Status=status,
268-
Result=Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
374+
Result=result==null?null:Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
269375
},
270-
Progresses=progressDtos
376+
Progresses=progressDtos??[]
271377
}));
272378
httpClient.Setup(x=>x.PrepareWorkflow(It.IsAny<DtmCommon.TransBase>(),It.IsAny<CancellationToken>())).Returns(Task.FromResult(httpResp));
273379
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp