Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6ef9e79

Browse files
committed
[Fix-17732] Change workflow instance status to failure when command handle failed
1 parentf5535dc commit6ef9e79

File tree

8 files changed

+69
-12
lines changed

8 files changed

+69
-12
lines changed

‎dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.java‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ int updateWorkflowInstanceState(
138138
@Param("originState")WorkflowExecutionStatusoriginState,
139139
@Param("targetState")WorkflowExecutionStatustargetState);
140140

141+
intforceUpdateWorkflowInstanceState(@Param("id")Integerid,@Param("status")WorkflowExecutionStatusstatus);
142+
141143
/**
142144
* update workflow instance by tenantCode
143145
*

‎dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkflowInstanceDao.java‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ void updateWorkflowInstanceState(Integer workflowInstanceId,
3939
WorkflowExecutionStatusoriginState,
4040
WorkflowExecutionStatustargetState);
4141

42+
voidforceUpdateWorkflowInstanceState(Integerid,WorkflowExecutionStatusstatus);
43+
4244
/**
4345
* performs an "upsert" operation (update or insert) on a WorkflowInstance object within a new transaction
4446
*

‎dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImpl.java‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public void updateWorkflowInstanceState(Integer workflowInstanceId, WorkflowExec
7575
}
7676
}
7777

78+
@Override
79+
publicvoidforceUpdateWorkflowInstanceState(Integerid,WorkflowExecutionStatusstatus) {
80+
mybatisMapper.forceUpdateWorkflowInstanceState(id,status);
81+
}
82+
7883
@Override
7984
@Transactional(propagation =Propagation.REQUIRES_NEW,isolation =Isolation.READ_COMMITTED,rollbackFor =Exception.class)
8085
publicvoidperformTransactionalUpsert(WorkflowInstanceworkflowInstance) {

‎dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkflowInstanceMapper.xml‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@
158158
where id = #{workflowInstanceId} and state = #{originState}
159159
</update>
160160

161+
<updateid="forceUpdateWorkflowInstanceState">
162+
update t_ds_workflow_instance set state = #{status} where id = #{id}
163+
</update>
164+
161165
<updateid="updateWorkflowInstanceByTenantCode">
162166
update t_ds_workflow_instance
163167
set tenant_code = #{destTenantCode}

‎dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/WorkflowInstanceDaoImplTest.java‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ void updateWorkflowInstanceState_failed() {
9696
unsupportedOperationException.getMessage());
9797
}
9898

99+
@Test
100+
voidforceUpdateWorkflowInstanceState() {
101+
WorkflowInstanceworkflowInstance =createWorkflowInstance(1L,1,WorkflowExecutionStatus.RUNNING_EXECUTION);
102+
workflowInstanceDao.insert(workflowInstance);
103+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstance.getId(),WorkflowExecutionStatus.FAILURE);
104+
assertEquals(WorkflowExecutionStatus.FAILURE,
105+
workflowInstanceDao.queryById(workflowInstance.getId()).getState());
106+
}
107+
99108
@Test
100109
voidqueryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
101110
longworkflowDefinitionCode =1L;

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java‎

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
importorg.apache.dolphinscheduler.common.utils.JSONUtils;
2727
importorg.apache.dolphinscheduler.dao.entity.Command;
2828
importorg.apache.dolphinscheduler.dao.entity.WorkflowInstance;
29+
importorg.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2930
importorg.apache.dolphinscheduler.meter.metrics.MetricsProvider;
3031
importorg.apache.dolphinscheduler.meter.metrics.SystemMetrics;
3132
importorg.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@@ -52,6 +53,7 @@
5253

5354
importorg.springframework.beans.factory.annotation.Autowired;
5455
importorg.springframework.stereotype.Service;
56+
importorg.springframework.transaction.support.TransactionTemplate;
5557

5658
/**
5759
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
@@ -75,6 +77,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
7577
@Autowired
7678
privateIWorkflowRepositoryworkflowRepository;
7779

80+
@Autowired
81+
privateWorkflowInstanceDaoworkflowInstanceDao;
82+
7883
@Autowired
7984
privateWorkflowExecutionRunnableFactoryworkflowExecutionRunnableFactory;
8085

@@ -84,6 +89,9 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {
8489
@Autowired
8590
privateWorkflowEventBusCoordinatorworkflowEventBusCoordinator;
8691

92+
@Autowired
93+
privateTransactionTemplatetransactionTemplate;
94+
8795
privateExecutorServicecommandHandleThreadPool;
8896

8997
privatebooleanflag =false;
@@ -189,8 +197,18 @@ private Void bootstrapError(Command command, Throwable throwable) {
189197
throwable);
190198
returnnull;
191199
}
192-
log.error("Failed bootstrap command {} ",JSONUtils.toPrettyJsonString(command),throwable);
193-
commandService.moveToErrorCommand(command,ExceptionUtils.getStackTrace(throwable));
200+
201+
transactionTemplate.execute(status -> {
202+
log.warn("Failed bootstrap command {} ",JSONUtils.toPrettyJsonString(command),throwable);
203+
finalintworkflowInstanceId =command.getWorkflowInstanceId();
204+
205+
workflowInstanceDao.forceUpdateWorkflowInstanceState(workflowInstanceId,WorkflowExecutionStatus.FAILURE);
206+
log.info("Set workflow instance {} state to FAILURE",workflowInstanceId);
207+
208+
commandService.moveToErrorCommand(command,ExceptionUtils.getStackTrace(throwable));
209+
log.info("Move command {} to error command table",command.getId());
210+
returnnull;
211+
});
194212
returnnull;
195213
}
196214

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowGraph.java‎

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
importjava.util.List;
2929
importjava.util.Map;
3030
importjava.util.Set;
31-
importjava.util.function.Function;
3231
importjava.util.stream.Collectors;
3332

3433
publicclassWorkflowGraphimplementsIWorkflowGraph {
@@ -46,12 +45,20 @@ public WorkflowGraph(List<WorkflowTaskRelation> workflowTaskRelations, List<Task
4645
this.predecessors =newHashMap<>();
4746
this.successors =newHashMap<>();
4847

49-
this.taskDefinitionMap =taskDefinitions
50-
.stream()
51-
.collect(Collectors.toMap(TaskDefinition::getName,Function.identity()));
52-
this.taskDefinitionCodeMap =taskDefinitions
53-
.stream()
54-
.collect(Collectors.toMap(TaskDefinition::getCode,Function.identity()));
48+
this.taskDefinitionMap =newHashMap<>(taskDefinitions.size());
49+
this.taskDefinitionCodeMap =newHashMap<>(taskDefinitions.size());
50+
for (TaskDefinitiontaskDefinition :taskDefinitions) {
51+
if (taskDefinitionMap.containsKey(taskDefinition.getName())) {
52+
thrownewIllegalArgumentException(
53+
"Duplicate task name: " +taskDefinition.getName() +" in the workflow");
54+
}
55+
taskDefinitionMap.put(taskDefinition.getName(),taskDefinition);
56+
if (taskDefinitionCodeMap.containsKey(taskDefinition.getCode())) {
57+
thrownewIllegalArgumentException(
58+
"Duplicate task code: " +taskDefinition.getCode() +" in the workflow");
59+
}
60+
taskDefinitionCodeMap.put(taskDefinition.getCode(),taskDefinition);
61+
}
5562

5663
addTaskNodes(taskDefinitions);
5764
addTaskEdge(workflowTaskRelations);

‎dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java‎

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
importorg.apache.dolphinscheduler.common.constants.Constants;
2323
importorg.apache.dolphinscheduler.common.enums.CommandType;
24+
importorg.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2425
importorg.apache.dolphinscheduler.common.utils.JSONUtils;
2526
importorg.apache.dolphinscheduler.dao.entity.Command;
2627
importorg.apache.dolphinscheduler.dao.entity.ErrorCommand;
@@ -29,6 +30,7 @@
2930
importorg.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
3031
importorg.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
3132
importorg.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
33+
importorg.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
3234

3335
importorg.apache.commons.lang3.StringUtils;
3436

@@ -41,6 +43,7 @@
4143

4244
importorg.springframework.beans.factory.annotation.Autowired;
4345
importorg.springframework.stereotype.Component;
46+
importorg.springframework.transaction.annotation.Transactional;
4447

4548
importcom.fasterxml.jackson.databind.node.ObjectNode;
4649
importio.micrometer.core.annotation.Counted;
@@ -64,11 +67,18 @@ public class CommandServiceImpl implements CommandService {
6467
@Autowired
6568
privateWorkflowDefinitionMapperprocessDefineMapper;
6669

70+
@Autowired
71+
privateWorkflowInstanceDaoworkflowInstanceDao;
72+
6773
@Override
74+
@Transactional
6875
publicvoidmoveToErrorCommand(Commandcommand,Stringmessage) {
69-
ErrorCommanderrorCommand =newErrorCommand(command,message);
70-
this.errorCommandMapper.insert(errorCommand);
71-
this.commandMapper.deleteById(command.getId());
76+
finalErrorCommanderrorCommand =newErrorCommand(command,message);
77+
errorCommandMapper.insert(errorCommand);
78+
commandMapper.deleteById(command.getId());
79+
workflowInstanceDao.forceUpdateWorkflowInstanceState(command.getWorkflowInstanceId(),
80+
WorkflowExecutionStatus.FAILURE);
81+
log.info("Set workflow instance {} state to FAILURE",command.getWorkflowInstanceId());
7282
}
7383

7484
@Override

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp