Skip to content

Commit aa7543b

Browse files
committed
Remove job execution id from StepExecutionRequest
Step execution IDs are globally unique, and a step execution always belongs to a job execution. This commit removes the need to provide a job execution id when sending step execution requests as this is not used by clients (StepExecutionRequestHandler). The job execution ID is still used in the correlation ID of the message but is not part of the step execution request anymore.
1 parent a371d24 commit aa7543b

File tree

5 files changed

+26
-28
lines changed

5 files changed

+26
-28
lines changed

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,10 @@ protected Set<StepExecution> doHandle(StepExecution managerStepExecution,
210210

211211
int count = 0;
212212

213+
long jobExecutionId = managerStepExecution.getJobExecution().getId();
213214
for (StepExecution stepExecution : partitionStepExecutions) {
214215
Message<StepExecutionRequest> request = createMessage(count++, partitionStepExecutions.size(),
215-
new StepExecutionRequest(stepName, stepExecution.getJobExecutionId(), stepExecution.getId()),
216-
replyChannel);
216+
new StepExecutionRequest(stepName, stepExecution.getId()), jobExecutionId, replyChannel);
217217
if (logger.isDebugEnabled()) {
218218
logger.debug("Sending request: " + request);
219219
}
@@ -279,11 +279,11 @@ else if (logger.isDebugEnabled()) {
279279
}
280280

281281
private Message<StepExecutionRequest> createMessage(int sequenceNumber, int sequenceSize,
282-
StepExecutionRequest stepExecutionRequest, PollableChannel replyChannel) {
282+
StepExecutionRequest stepExecutionRequest, long jobExecutionId, PollableChannel replyChannel) {
283283
return MessageBuilder.withPayload(stepExecutionRequest)
284284
.setSequenceNumber(sequenceNumber)
285285
.setSequenceSize(sequenceSize)
286-
.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
286+
.setCorrelationId(jobExecutionId + ":" + stepExecutionRequest.getStepName())
287287
.setReplyChannel(replyChannel)
288288
.build();
289289
}

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/StepExecutionRequest.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2009-2018 the original author or authors.
2+
* Copyright 2009-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,33 +28,24 @@ public class StepExecutionRequest implements Serializable {
2828

2929
private static final long serialVersionUID = 1L;
3030

31-
private Long stepExecutionId;
31+
private long stepExecutionId;
3232

3333
private String stepName;
3434

35-
// FIXME not used on the remote side, can we remove it?
36-
private Long jobExecutionId;
37-
3835
private StepExecutionRequest() {
3936
// For Jackson deserialization
4037
}
4138

4239
/**
4340
* Create a new {@link StepExecutionRequest} instance.
4441
* @param stepName the name of the step to execute
45-
* @param jobExecutionId the id of the job execution
4642
* @param stepExecutionId the id of the step execution
4743
*/
48-
public StepExecutionRequest(String stepName, Long jobExecutionId, Long stepExecutionId) {
44+
public StepExecutionRequest(String stepName, long stepExecutionId) {
4945
this.stepName = stepName;
50-
this.jobExecutionId = jobExecutionId;
5146
this.stepExecutionId = stepExecutionId;
5247
}
5348

54-
public Long getJobExecutionId() {
55-
return jobExecutionId;
56-
}
57-
5849
public Long getStepExecutionId() {
5950
return stepExecutionId;
6051
}
@@ -65,8 +56,7 @@ public String getStepName() {
6556

6657
@Override
6758
public String toString() {
68-
return String.format("StepExecutionRequest: [jobExecutionId=%d, stepExecutionId=%d, stepName=%s]",
69-
jobExecutionId, stepExecutionId, stepName);
59+
return String.format("StepExecutionRequest: [stepExecutionId=%d, stepName=%s]", stepExecutionId, stepName);
7060
}
7161

7262
}

spring-batch-integration/src/main/java/org/springframework/batch/integration/remote/RemoteStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
101101
getJobRepository().updateExecutionContext(workerStepExecution);
102102

103103
// send step execution request and wait for the remote step to finish
104-
StepExecutionRequest stepExecutionRequest = new StepExecutionRequest(this.remoteStepName, jobExecution.getId(),
104+
StepExecutionRequest stepExecutionRequest = new StepExecutionRequest(this.remoteStepName,
105105
workerStepExecution.getId());
106106
this.messagingTemplate.convertAndSend(this.messageChannel, stepExecutionRequest);
107107
StepExecution updatedWorkerExecution = pollRemoteStep(workerStepExecution);

spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ void testHandleNoReply() throws Exception {
8484
Message message = mock();
8585
// when
8686
HashSet<StepExecution> stepExecutions = new HashSet<>();
87-
stepExecutions
88-
.add(new StepExecution(1L, "step1", new JobExecution(5L, new JobInstance(1L, "job"), new JobParameters())));
87+
JobInstance jobInstance = new JobInstance(1L, "job");
88+
JobExecution jobExecution = new JobExecution(5L, jobInstance, new JobParameters());
89+
StepExecution stepExecution = new StepExecution(1L, "step1", jobExecution);
90+
stepExecutions.add(stepExecution);
8991
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
9092
when(message.getPayload()).thenReturn(Collections.emptySet());
9193
when(operations.receive((PollableChannel) any())).thenReturn(message);
94+
when(managerStepExecution.getJobExecution()).thenReturn(jobExecution);
9295
// set
9396
messageChannelPartitionHandler.setMessagingOperations(operations);
9497

@@ -113,11 +116,14 @@ void testHandleWithReplyChannel() throws Exception {
113116
PollableChannel replyChannel = mock();
114117
// when
115118
HashSet<StepExecution> stepExecutions = new HashSet<>();
116-
stepExecutions
117-
.add(new StepExecution(1L, "step1", new JobExecution(5L, new JobInstance(1L, "job"), new JobParameters())));
119+
JobInstance jobInstance = new JobInstance(1L, "job");
120+
JobExecution jobExecution = new JobExecution(5L, jobInstance, new JobParameters());
121+
StepExecution stepExecution = new StepExecution(1L, "step1", jobExecution);
122+
stepExecutions.add(stepExecution);
118123
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
119124
when(message.getPayload()).thenReturn(Collections.emptySet());
120125
when(operations.receive(replyChannel)).thenReturn(message);
126+
when(managerStepExecution.getJobExecution()).thenReturn(jobExecution);
121127
// set
122128
messageChannelPartitionHandler.setMessagingOperations(operations);
123129
messageChannelPartitionHandler.setReplyChannel(replyChannel);
@@ -141,8 +147,11 @@ void messageReceiveTimeout() throws Exception {
141147
MessagingTemplate operations = mock();
142148
// when
143149
HashSet<StepExecution> stepExecutions = new HashSet<>();
144-
stepExecutions
145-
.add(new StepExecution(1L, "step1", new JobExecution(5L, new JobInstance(1L, "job"), new JobParameters())));
150+
JobInstance jobInstance = new JobInstance(1L, "job");
151+
JobExecution jobExecution = new JobExecution(5L, jobInstance, new JobParameters());
152+
StepExecution stepExecution = new StepExecution(1L, "step1", jobExecution);
153+
stepExecutions.add(stepExecution);
154+
when(managerStepExecution.getJobExecution()).thenReturn(jobExecution);
146155
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
147156
// set
148157
messageChannelPartitionHandler.setMessagingOperations(operations);

spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/StepExecutionRequestTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030
class StepExecutionRequestTests {
3131

3232
private static final String SERIALIZED_REQUEST = """
33-
{"stepName":"step","jobExecutionId":1,"stepExecutionId":1}""";
33+
{"stepName":"step","stepExecutionId":1}""";
3434

3535
private final JsonMapper jsonMapper = new JsonMapper();
3636

3737
@Test
3838
void stepExecutionRequestShouldBeSerializableWithJackson() throws IOException {
3939
// given
40-
StepExecutionRequest request = new StepExecutionRequest("step", 1L, 1L);
40+
StepExecutionRequest request = new StepExecutionRequest("step", 1L);
4141

4242
// when
4343
String serializedRequest = this.jsonMapper.writeValueAsString(request);
@@ -55,7 +55,6 @@ void stepExecutionRequestShouldBeDeserializableWithJackson() throws IOException
5555
// then
5656
assertNotNull(deserializedRequest);
5757
assertEquals("step", deserializedRequest.getStepName());
58-
assertEquals(1L, deserializedRequest.getJobExecutionId().longValue());
5958
assertEquals(1L, deserializedRequest.getStepExecutionId().longValue());
6059
}
6160

0 commit comments

Comments
 (0)