1+ /*
2+ * Copyright 2025-present the original author or authors.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * https://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+ package org .springframework .batch .core .step .item ;
17+
18+ import java .util .Set ;
19+
20+ import org .junit .jupiter .api .Assertions ;
21+ import org .junit .jupiter .api .Test ;
22+
23+ import org .springframework .batch .core .ExitStatus ;
24+ import org .springframework .batch .core .job .Job ;
25+ import org .springframework .batch .core .job .JobExecution ;
26+ import org .springframework .batch .core .job .parameters .JobParameters ;
27+ import org .springframework .batch .core .job .parameters .JobParametersBuilder ;
28+ import org .springframework .batch .core .launch .JobOperator ;
29+ import org .springframework .batch .core .repository .JobRepository ;
30+ import org .springframework .batch .core .step .FatalStepExecutionException ;
31+ import org .springframework .batch .core .step .Step ;
32+ import org .springframework .batch .core .step .StepExecution ;
33+ import org .springframework .batch .core .step .builder .ChunkOrientedStepBuilder ;
34+ import org .springframework .batch .core .step .skip .LimitCheckingExceptionHierarchySkipPolicy ;
35+ import org .springframework .batch .core .step .skip .SkipLimitExceededException ;
36+ import org .springframework .batch .infrastructure .item .ItemProcessor ;
37+ import org .springframework .batch .infrastructure .item .ItemReader ;
38+ import org .springframework .batch .infrastructure .item .ItemWriter ;
39+ import org .springframework .batch .infrastructure .item .file .FlatFileParseException ;
40+ import org .springframework .context .ApplicationContext ;
41+ import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
42+ import org .springframework .context .annotation .Bean ;
43+ import org .springframework .context .annotation .Configuration ;
44+ import org .springframework .core .retry .RetryPolicy ;
45+ import org .springframework .core .task .SimpleAsyncTaskExecutor ;
46+ import org .springframework .dao .DataIntegrityViolationException ;
47+ import org .springframework .jdbc .core .JdbcTemplate ;
48+ import org .springframework .jdbc .support .JdbcTransactionManager ;
49+ import org .springframework .test .jdbc .JdbcTestUtils ;
50+
51+ /**
52+ * Integration tests for the fault-tolerance features of {@link ChunkOrientedStep}.
53+ *
54+ * @author Mahmoud Ben Hassine
55+ */
56+ public class ChunkOrientedStepFaultToleranceIntegrationTests {
57+
58+ // TODO use parameterized tests for serial and concurrent steps
59+ // The outcome should be the same for both
60+
61+ @ Test
62+ void testFaultTolerantChunkOrientedStepSuccess () throws Exception {
63+ // given
64+ System .setProperty ("skipLimit" , "3" );
65+ ApplicationContext context = new AnnotationConfigApplicationContext (TestConfiguration .class ,
66+ FaultTolerantChunkOrientedStepConfiguration .class );
67+ JobOperator jobOperator = context .getBean (JobOperator .class );
68+ Job job = context .getBean (Job .class );
69+ JdbcTemplate jdbcTemplate = context .getBean (JdbcTemplate .class );
70+
71+ // when
72+ JobParameters jobParameters = new JobParametersBuilder ().addString ("file" , "data/persons-bad-data.csv" )
73+ .toJobParameters ();
74+ JobExecution jobExecution = jobOperator .start (job , jobParameters );
75+
76+ // then
77+ Assertions .assertEquals (ExitStatus .COMPLETED .getExitCode (), jobExecution .getExitStatus ().getExitCode ());
78+ StepExecution stepExecution = jobExecution .getStepExecutions ().iterator ().next ();
79+ ExitStatus stepExecutionExitStatus = stepExecution .getExitStatus ();
80+ Assertions .assertEquals (ExitStatus .COMPLETED .getExitCode (), stepExecutionExitStatus .getExitCode ());
81+ Assertions .assertEquals (4 , stepExecution .getReadCount ());
82+ Assertions .assertEquals (3 , stepExecution .getWriteCount ());
83+ Assertions .assertEquals (3 , stepExecution .getCommitCount ());
84+ Assertions .assertEquals (0 , stepExecution .getRollbackCount ());
85+ Assertions .assertEquals (2 , stepExecution .getReadSkipCount ());
86+ Assertions .assertEquals (1 , stepExecution .getWriteSkipCount ());
87+ Assertions .assertEquals (3 , stepExecution .getSkipCount ());
88+ Assertions .assertEquals (3 , JdbcTestUtils .countRowsInTable (jdbcTemplate , "person_target" ));
89+ System .clearProperty ("skipLimit" );
90+ }
91+
92+ @ Test
93+ void testConcurrentFaultTolerantChunkOrientedStepSuccess () throws Exception {
94+ // given
95+ System .setProperty ("skipLimit" , "3" );
96+ ApplicationContext context = new AnnotationConfigApplicationContext (TestConfiguration .class ,
97+ ConcurrentFaultTolerantChunkOrientedStepConfiguration .class );
98+ JobOperator jobOperator = context .getBean (JobOperator .class );
99+ Job job = context .getBean (Job .class );
100+ JdbcTemplate jdbcTemplate = context .getBean (JdbcTemplate .class );
101+
102+ // when
103+ JobParameters jobParameters = new JobParametersBuilder ().addString ("file" , "data/persons-bad-data.csv" )
104+ .toJobParameters ();
105+ JobExecution jobExecution = jobOperator .start (job , jobParameters );
106+
107+ // then
108+ Assertions .assertEquals (ExitStatus .COMPLETED .getExitCode (), jobExecution .getExitStatus ().getExitCode ());
109+ StepExecution stepExecution = jobExecution .getStepExecutions ().iterator ().next ();
110+ ExitStatus stepExecutionExitStatus = stepExecution .getExitStatus ();
111+ Assertions .assertEquals (ExitStatus .COMPLETED .getExitCode (), stepExecutionExitStatus .getExitCode ());
112+ Assertions .assertEquals (4 , stepExecution .getReadCount ());
113+ Assertions .assertEquals (3 , stepExecution .getWriteCount ());
114+ Assertions .assertEquals (3 , stepExecution .getCommitCount ());
115+ Assertions .assertEquals (0 , stepExecution .getRollbackCount ());
116+ Assertions .assertEquals (2 , stepExecution .getReadSkipCount ());
117+ Assertions .assertEquals (1 , stepExecution .getWriteSkipCount ());
118+ Assertions .assertEquals (3 , stepExecution .getSkipCount ());
119+ Assertions .assertEquals (3 , JdbcTestUtils .countRowsInTable (jdbcTemplate , "person_target" ));
120+ System .clearProperty ("skipLimit" );
121+ }
122+
123+ @ Test
124+ void testFaultTolerantChunkOrientedStepFailure () throws Exception {
125+ // given
126+ System .setProperty ("skipLimit" , "1" );
127+ ApplicationContext context = new AnnotationConfigApplicationContext (TestConfiguration .class ,
128+ FaultTolerantChunkOrientedStepConfiguration .class );
129+ JobOperator jobOperator = context .getBean (JobOperator .class );
130+ Job job = context .getBean (Job .class );
131+ JdbcTemplate jdbcTemplate = context .getBean (JdbcTemplate .class );
132+
133+ // when
134+ JobParameters jobParameters = new JobParametersBuilder ().addString ("file" , "data/persons-bad-data.csv" )
135+ .toJobParameters ();
136+ JobExecution jobExecution = jobOperator .start (job , jobParameters );
137+
138+ // then
139+ Assertions .assertEquals (ExitStatus .FAILED .getExitCode (), jobExecution .getExitStatus ().getExitCode ());
140+ StepExecution stepExecution = jobExecution .getStepExecutions ().iterator ().next ();
141+ ExitStatus stepExecutionExitStatus = stepExecution .getExitStatus ();
142+ Assertions .assertEquals (ExitStatus .FAILED .getExitCode (), stepExecutionExitStatus .getExitCode ());
143+ Throwable failureException = stepExecution .getFailureExceptions ().iterator ().next ();
144+ Assertions .assertInstanceOf (FatalStepExecutionException .class , failureException );
145+ Assertions .assertInstanceOf (SkipLimitExceededException .class , failureException .getCause ());
146+ Assertions .assertEquals (3 , stepExecution .getReadCount ());
147+ Assertions .assertEquals (2 , stepExecution .getWriteCount ());
148+ Assertions .assertEquals (1 , stepExecution .getCommitCount ());
149+ Assertions .assertEquals (1 , stepExecution .getRollbackCount ());
150+ Assertions .assertEquals (1 , stepExecution .getReadSkipCount ());
151+ Assertions .assertEquals (0 , stepExecution .getWriteSkipCount ());
152+ Assertions .assertEquals (1 , stepExecution .getSkipCount ());
153+ Assertions .assertEquals (2 , JdbcTestUtils .countRowsInTable (jdbcTemplate , "person_target" ));
154+ System .clearProperty ("skipLimit" );
155+ }
156+
157+ @ Test
158+ void testConcurrentFaultTolerantChunkOrientedStepFailure () throws Exception {
159+ // given
160+ System .setProperty ("skipLimit" , "1" );
161+ ApplicationContext context = new AnnotationConfigApplicationContext (TestConfiguration .class ,
162+ ConcurrentFaultTolerantChunkOrientedStepConfiguration .class );
163+ JobOperator jobOperator = context .getBean (JobOperator .class );
164+ Job job = context .getBean (Job .class );
165+ JdbcTemplate jdbcTemplate = context .getBean (JdbcTemplate .class );
166+
167+ // when
168+ JobParameters jobParameters = new JobParametersBuilder ().addString ("file" , "data/persons-bad-data.csv" )
169+ .toJobParameters ();
170+ JobExecution jobExecution = jobOperator .start (job , jobParameters );
171+
172+ // then
173+ Assertions .assertEquals (ExitStatus .FAILED .getExitCode (), jobExecution .getExitStatus ().getExitCode ());
174+ StepExecution stepExecution = jobExecution .getStepExecutions ().iterator ().next ();
175+ ExitStatus stepExecutionExitStatus = stepExecution .getExitStatus ();
176+ Assertions .assertEquals (ExitStatus .FAILED .getExitCode (), stepExecutionExitStatus .getExitCode ());
177+ Throwable failureException = stepExecution .getFailureExceptions ().iterator ().next ();
178+ Assertions .assertInstanceOf (FatalStepExecutionException .class , failureException );
179+ Assertions .assertInstanceOf (SkipLimitExceededException .class , failureException .getCause ());
180+ Assertions .assertEquals (3 , stepExecution .getReadCount ());
181+ Assertions .assertEquals (2 , stepExecution .getWriteCount ());
182+ Assertions .assertEquals (1 , stepExecution .getCommitCount ());
183+ Assertions .assertEquals (1 , stepExecution .getRollbackCount ());
184+ Assertions .assertEquals (1 , stepExecution .getReadSkipCount ());
185+ Assertions .assertEquals (0 , stepExecution .getWriteSkipCount ());
186+ Assertions .assertEquals (1 , stepExecution .getSkipCount ());
187+ Assertions .assertEquals (2 , JdbcTestUtils .countRowsInTable (jdbcTemplate , "person_target" ));
188+ System .clearProperty ("skipLimit" );
189+ }
190+
191+ @ Configuration
192+ static class FaultTolerantChunkOrientedStepConfiguration {
193+
194+ @ Bean
195+ public Step faulTolerantChunkOrientedStep (JobRepository jobRepository ,
196+ JdbcTransactionManager transactionManager , ItemReader <Person > itemReader ,
197+ ItemProcessor <Person , Person > itemProcessor , ItemWriter <Person > itemWriter ) {
198+ // retry policy configuration
199+ int retryLimit = 3 ;
200+ Set <Class <? extends Throwable >> nonRetrybaleExceptions = Set .of (FlatFileParseException .class ,
201+ DataIntegrityViolationException .class );
202+ RetryPolicy retryPolicy = RetryPolicy .builder ()
203+ .maxRetries (retryLimit )
204+ .excludes (nonRetrybaleExceptions )
205+ .build ();
206+
207+ // skip policy configuration
208+ int skipLimit = Integer .parseInt (System .getProperty ("skipLimit" ));
209+ Set <Class <? extends Throwable >> skippableExceptions = Set .of (FlatFileParseException .class ,
210+ DataIntegrityViolationException .class );
211+ LimitCheckingExceptionHierarchySkipPolicy skipPolicy = new LimitCheckingExceptionHierarchySkipPolicy (
212+ skippableExceptions , skipLimit );
213+
214+ return new ChunkOrientedStepBuilder <Person , Person >(jobRepository , 2 ).reader (itemReader )
215+ .processor (itemProcessor )
216+ .writer (itemWriter )
217+ .transactionManager (transactionManager )
218+ .faultTolerant ()
219+ .retryPolicy (retryPolicy )
220+ .skipPolicy (skipPolicy )
221+ .build ();
222+ }
223+
224+ }
225+
226+ @ Configuration
227+ static class ConcurrentFaultTolerantChunkOrientedStepConfiguration {
228+
229+ @ Bean
230+ public Step concurrentFaulTolerantChunkOrientedStep (JobRepository jobRepository ,
231+ JdbcTransactionManager transactionManager , ItemReader <Person > itemReader ,
232+ ItemProcessor <Person , Person > itemProcessor , ItemWriter <Person > itemWriter ) {
233+ // retry policy configuration
234+ int retryLimit = 3 ;
235+ Set <Class <? extends Throwable >> nonRetrybaleExceptions = Set .of (FlatFileParseException .class ,
236+ DataIntegrityViolationException .class );
237+ RetryPolicy retryPolicy = RetryPolicy .builder ()
238+ .maxRetries (retryLimit )
239+ .excludes (nonRetrybaleExceptions )
240+ .build ();
241+
242+ // skip policy configuration
243+ int skipLimit = Integer .parseInt (System .getProperty ("skipLimit" ));
244+ Set <Class <? extends Throwable >> skippableExceptions = Set .of (FlatFileParseException .class ,
245+ DataIntegrityViolationException .class );
246+ LimitCheckingExceptionHierarchySkipPolicy skipPolicy = new LimitCheckingExceptionHierarchySkipPolicy (
247+ skippableExceptions , skipLimit );
248+
249+ return new ChunkOrientedStepBuilder <Person , Person >(jobRepository , 2 ).reader (itemReader )
250+ .processor (itemProcessor )
251+ .writer (itemWriter )
252+ .transactionManager (transactionManager )
253+ .taskExecutor (new SimpleAsyncTaskExecutor ())
254+ .faultTolerant ()
255+ .retryPolicy (retryPolicy )
256+ .skipPolicy (skipPolicy )
257+ .build ();
258+ }
259+
260+ }
261+
262+ }
0 commit comments