Skip to content

Commit 346dcce

Browse files
fmbenhassinemminella
authored andcommitted
BATCH-2442: fix infinite loop when item processor fails during a scan
Currently, when the processor throws an exception during a scan, the chunk is never marked as complete and the step never finishes. Moreover, items that were processed unsuccessfully are still written. This commit fixes the issue by excluding failed items from the scan. Resolves BATCH-2442
1 parent fa54236 commit 346dcce

File tree

2 files changed

+247
-7
lines changed

2 files changed

+247
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package org.springframework.batch.core.test.step;
2+
3+
import org.junit.Before;
4+
import org.junit.Test;
5+
import org.junit.runner.RunWith;
6+
import org.springframework.batch.core.BatchStatus;
7+
import org.springframework.batch.core.ExitStatus;
8+
import org.springframework.batch.core.JobExecution;
9+
import org.springframework.batch.core.JobParameters;
10+
import org.springframework.batch.core.Step;
11+
import org.springframework.batch.core.StepExecution;
12+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
13+
import org.springframework.batch.core.repository.JobRepository;
14+
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
15+
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
16+
import org.springframework.batch.core.step.skip.SkipPolicy;
17+
import org.springframework.batch.item.ItemProcessor;
18+
import org.springframework.batch.item.ItemReader;
19+
import org.springframework.batch.item.ItemWriter;
20+
import org.springframework.batch.item.support.ListItemReader;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.test.context.ContextConfiguration;
23+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
24+
import org.springframework.transaction.PlatformTransactionManager;
25+
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
30+
import static org.junit.Assert.assertEquals;
31+
32+
/**
33+
* Tests for fault tolerant {@link org.springframework.batch.core.step.item.ChunkOrientedTasklet}.
34+
*/
35+
@ContextConfiguration(locations = "/simple-job-launcher-context.xml")
36+
@RunWith(SpringJUnit4ClassRunner.class)
37+
public class FaultTolerantStepIntegrationTests {
38+
39+
private static final int TOTAL_ITEMS = 30;
40+
private static final int CHUNK_SIZE = TOTAL_ITEMS;
41+
42+
@Autowired
43+
private JobRepository jobRepository;
44+
45+
@Autowired
46+
private PlatformTransactionManager transactionManager;
47+
48+
private SkipPolicy skipPolicy;
49+
50+
private FaultTolerantStepBuilder<Integer, Integer> stepBuilder;
51+
52+
@Before
53+
public void setUp() {
54+
ItemReader<Integer> itemReader = new ListItemReader<>(createItems());
55+
56+
ItemProcessor<Integer, Integer> itemProcessor = new ItemProcessor<Integer, Integer>() {
57+
@Override
58+
public Integer process(Integer item) throws Exception {
59+
return item > 20 ? null : item;
60+
}
61+
};
62+
63+
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
64+
@Override
65+
public void write(List<? extends Integer> items) throws Exception {
66+
if (items.contains(1)) {
67+
throw new IllegalArgumentException();
68+
}
69+
}
70+
};
71+
72+
skipPolicy = new SkipIllegalArgumentExceptionSkipPolicy();
73+
stepBuilder = new StepBuilderFactory(jobRepository, transactionManager).get("step")
74+
.<Integer, Integer>chunk(CHUNK_SIZE)
75+
.reader(itemReader)
76+
.processor(itemProcessor)
77+
.writer(itemWriter)
78+
.faultTolerant();
79+
}
80+
81+
@Test
82+
public void testFilterCountWithTransactionalProcessorWhenSkipInWrite() throws Exception {
83+
// Given
84+
Step step = stepBuilder
85+
.skipPolicy(skipPolicy)
86+
.build();
87+
88+
// When
89+
StepExecution stepExecution = execute(step);
90+
91+
// Then
92+
assertEquals(TOTAL_ITEMS, stepExecution.getReadCount());
93+
assertEquals(10, stepExecution.getFilterCount());
94+
assertEquals(19, stepExecution.getWriteCount());
95+
assertEquals(1, stepExecution.getWriteSkipCount());
96+
}
97+
98+
@Test
99+
public void testFilterCountWithNonTransactionalProcessorWhenSkipInWrite() throws Exception {
100+
// Given
101+
Step step = stepBuilder
102+
.skipPolicy(skipPolicy)
103+
.processorNonTransactional()
104+
.build();
105+
106+
// When
107+
StepExecution stepExecution = execute(step);
108+
109+
// Then
110+
assertEquals(TOTAL_ITEMS, stepExecution.getReadCount());
111+
assertEquals(10, stepExecution.getFilterCount());
112+
assertEquals(19, stepExecution.getWriteCount());
113+
assertEquals(1, stepExecution.getWriteSkipCount());
114+
}
115+
116+
@Test
117+
public void testFilterCountOnRetryWithTransactionalProcessorWhenSkipInWrite() throws Exception {
118+
// Given
119+
Step step = stepBuilder
120+
.retry(IllegalArgumentException.class)
121+
.retryLimit(2)
122+
.skipPolicy(skipPolicy)
123+
.build();
124+
125+
// When
126+
StepExecution stepExecution = execute(step);
127+
128+
// Then
129+
assertEquals(TOTAL_ITEMS, stepExecution.getReadCount());
130+
// filter count is expected to be counted on each retry attempt
131+
assertEquals(20, stepExecution.getFilterCount());
132+
assertEquals(19, stepExecution.getWriteCount());
133+
assertEquals(1, stepExecution.getWriteSkipCount());
134+
}
135+
136+
@Test
137+
public void testFilterCountOnRetryWithNonTransactionalProcessorWhenSkipInWrite() throws Exception {
138+
// Given
139+
Step step = stepBuilder
140+
.retry(IllegalArgumentException.class)
141+
.retryLimit(2)
142+
.skipPolicy(skipPolicy)
143+
.processorNonTransactional()
144+
.build();
145+
146+
// When
147+
StepExecution stepExecution = execute(step);
148+
149+
// Then
150+
assertEquals(TOTAL_ITEMS, stepExecution.getReadCount());
151+
// filter count is expected to be counted on each retry attempt
152+
assertEquals(20, stepExecution.getFilterCount());
153+
assertEquals(19, stepExecution.getWriteCount());
154+
assertEquals(1, stepExecution.getWriteSkipCount());
155+
}
156+
157+
@Test(timeout = 3000)
158+
public void testExceptionInProcessDuringChunkScan() throws Exception {
159+
// Given
160+
ListItemReader<Integer> itemReader = new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
161+
ItemProcessor<Integer, Integer> itemProcessor = new ItemProcessor<Integer, Integer>() {
162+
int cpt;
163+
164+
@Override
165+
public Integer process(Integer item) throws Exception {
166+
cpt++;
167+
if (cpt == 7) { // item 2 succeeds the first time but fails during the scan
168+
throw new Exception("Error during process");
169+
}
170+
return item;
171+
}
172+
};
173+
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
174+
int cpt;
175+
176+
@Override
177+
public void write(List<? extends Integer> items) throws Exception {
178+
cpt++;
179+
if (cpt == 1) {
180+
throw new Exception("Error during write");
181+
}
182+
}
183+
};
184+
Step step = new StepBuilderFactory(jobRepository, transactionManager).get("step")
185+
.<Integer, Integer>chunk(5)
186+
.reader(itemReader)
187+
.processor(itemProcessor)
188+
.writer(itemWriter)
189+
.faultTolerant()
190+
.skip(Exception.class)
191+
.skipLimit(3)
192+
.build();
193+
194+
// When
195+
StepExecution stepExecution = execute(step);
196+
197+
// Then
198+
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
199+
assertEquals(ExitStatus.COMPLETED, stepExecution.getExitStatus());
200+
assertEquals(7, stepExecution.getReadCount());
201+
assertEquals(6, stepExecution.getWriteCount());
202+
assertEquals(1, stepExecution.getProcessSkipCount());
203+
}
204+
205+
private List<Integer> createItems() {
206+
List<Integer> items = new ArrayList<>(TOTAL_ITEMS);
207+
for (int i = 1; i <= TOTAL_ITEMS; i++) {
208+
items.add(i);
209+
}
210+
return items;
211+
}
212+
213+
private StepExecution execute(Step step) throws Exception {
214+
JobExecution jobExecution = jobRepository.createJobExecution(
215+
"job" + Math.random(), new JobParameters());
216+
StepExecution stepExecution = jobExecution.createStepExecution("step");
217+
jobRepository.add(stepExecution);
218+
step.execute(stepExecution);
219+
return stepExecution;
220+
}
221+
222+
private class SkipIllegalArgumentExceptionSkipPolicy implements SkipPolicy {
223+
224+
@Override
225+
public boolean shouldSkip(Throwable throwable, int skipCount)
226+
throws SkipLimitExceededException {
227+
return throwable instanceof IllegalArgumentException;
228+
}
229+
230+
}
231+
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,16 @@
1616

1717
package org.springframework.batch.core.step.item;
1818

19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
1926
import org.apache.commons.logging.Log;
2027
import org.apache.commons.logging.LogFactory;
28+
2129
import org.springframework.batch.core.StepContribution;
2230
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
2331
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
@@ -35,13 +43,6 @@
3543
import org.springframework.retry.RetryException;
3644
import org.springframework.retry.support.DefaultRetryState;
3745

38-
import java.util.ArrayList;
39-
import java.util.Collections;
40-
import java.util.Iterator;
41-
import java.util.List;
42-
import java.util.concurrent.atomic.AtomicInteger;
43-
import java.util.concurrent.atomic.AtomicReference;
44-
4546
/**
4647
* FaultTolerant implementation of the {@link ChunkProcessor} interface, that
4748
* allows for skipping or retry of items that cause exceptions during writing.
@@ -572,6 +573,14 @@ private void scan(final StepContribution contribution, final Chunk<I> inputs, fi
572573
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
573574
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
574575

576+
//BATCH-2442 : do not scan skipped items
577+
if (!inputs.getSkips().isEmpty()) {
578+
if (outputIterator.hasNext()) {
579+
outputIterator.remove();
580+
return;
581+
}
582+
}
583+
575584
List<O> items = Collections.singletonList(outputIterator.next());
576585
inputIterator.next();
577586
try {

0 commit comments

Comments
 (0)