Skip to content

Task submitted for execution more than once when parent node is skipped #37

@josefinanoya

Description

@josefinanoya

Hello. I am using your library to schedule tasks in directed acyclic graphs (DAGs). I implemented the "shouldExecute" method so that I can choose whether a node is executed based on whether at least one parent node has a success result (ANY) or if all of them have terminated successfully (ALL).

@Override
    public boolean shouldExecute(ExecutionResults<String, Boolean> parentResults) {
        boolean shouldExecute = true;

        if (parentResults.hasAnyResult()) {
            List<Boolean> results = parentResults.getAll().stream()
                    .map(ExecutionResult::getResult)
                    .collect(Collectors.toList());

            if (shouldExecuteConfig.equals(SHOULD_EXECUTE_ALL)) {
                // All parents must end with success
                shouldExecute = results.stream().allMatch(Boolean.TRUE::equals);
            } else if (shouldExecuteConfig.equals(SHOULD_EXECUTE_ANY)) {
                // At least one parent must end with success
                shouldExecute = results.stream().anyMatch(Boolean.TRUE::equals);
            }
        }
        return shouldExecute;
    }

By doing that, I encountered an error when executing the next type of DAG:
image
where stages 1, 2 and 3 are of type ALL and stage final is type ANY.
In this scenario, stage final should be executed when stages 1 2 and 3 finish, when at least one of them finishes with success.
In the case where stage 1 finishes with success and stage 2 with error, stage 3 is skipped and stage final is executed twice, which is wrong.

2024-07-12 11:24:45 INFO  Paths: 
Path #0
Stage 1[] 
Stage 2[Stage 1] 
Stage 3[Stage 2] 
Stage final[Stage 1, Stage 2, Stage 3] 

2024-07-12 11:25:26 INFO Finished with success Stage 1
2024-07-12 11:26:02 ERROR Finished with error Stage 2
2024-07-12 11:26:02 INFO  Started Stage final
2024-07-12 11:26:02 INFO  Started Stage final
....

A work around to solve this problem is to create another status to the node entity: SUBMITTED. This way a task will only be submitted once to the execution engine.

enum NodeStatus {
      SUBMITTED,ERRORED,SKIPPED,SUCCESS,CANCELLED;
}
private void doExecute(final Collection<Node<T, R>> nodes, final ExecutionConfig config) {
		for (Node<T, R> node : nodes) {
			forceStopIfRequired();
			if (this.state.shouldProcess(node) && !node.isSubmitted()) { // check is node is already submitted
				Task<T, R> task = newTask(config, node);
				ExecutionResults<T, R> parentResults = parentResults(task, node);
				task.setParentResults(parentResults);
				task.setNodeProvider(new DefaultNodeProvider<T, R>(state));
				if (node.isNotProcessed() && task.shouldExecute(parentResults)) {					
					this.state.incrementUnProcessedNodesCount();
					logger.debug("Submitting {} node for execution", node.getValue());
					this.executionEngine.submit(task);
					node.setSubmmitted(); //set node status to submitted 
				} else if (node.isNotProcessed()){
					node.setSkipped();
					logger.debug("Execution Skipped for node # {} ", node.getValue());
					this.state.markProcessingDone(node);
					doExecute(node.getOutGoingNodes(), config);
				}
			} else {
				logger.debug("node {} depends on {}", node.getValue(), node.getInComingNodes());
			}
		}
	}

I expect your comments on this solution, or on any other approach that you may think of. Feel free to add this changes to your repo if you feel it is adecuate. Thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions