Skip to content

Commit c1b79e5

Browse files
committed
BATCH-2442: fix infinite loop when item processor fails during a scan
Currently, when the processor throws an exception during a scan, the chunk is never marked as complete and the step never finishes. Moreover, items that were processed unsuccessfully are still written. This commit fixes the issue by excluding failed items from the scan. Resolves BATCH-2442
1 parent ea6c312 commit c1b79e5

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

spring-batch-core-tests/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import org.junit.Before;
44
import org.junit.Test;
55
import org.junit.runner.RunWith;
6+
import org.springframework.batch.core.BatchStatus;
7+
import org.springframework.batch.core.ExitStatus;
68
import org.springframework.batch.core.JobExecution;
79
import org.springframework.batch.core.JobParameters;
810
import org.springframework.batch.core.Step;
@@ -22,6 +24,7 @@
2224
import org.springframework.transaction.PlatformTransactionManager;
2325

2426
import java.util.ArrayList;
27+
import java.util.Arrays;
2528
import java.util.List;
2629

2730
import static org.junit.Assert.assertEquals;
@@ -139,7 +142,55 @@ public void testFilterCountOnRetryWithNonTransactionalProcessorWhenSkipInWrite()
139142
assertEquals(19, stepExecution.getWriteCount());
140143
assertEquals(1, stepExecution.getWriteSkipCount());
141144
}
142-
145+
146+
@Test(timeout = 3000)
147+
public void testExceptionInProcessDuringChunkScan() throws Exception {
148+
// Given
149+
ListItemReader<Integer> itemReader = new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
150+
ItemProcessor<Integer, Integer> itemProcessor = new ItemProcessor<Integer, Integer>() {
151+
int cpt;
152+
153+
@Override
154+
public Integer process(Integer item) throws Exception {
155+
cpt++;
156+
if (cpt == 7) { // item 2 succeeds the first time but fails during the scan
157+
throw new Exception("Error during process");
158+
}
159+
return item;
160+
}
161+
};
162+
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
163+
int cpt;
164+
165+
@Override
166+
public void write(List<? extends Integer> items) throws Exception {
167+
cpt++;
168+
if (cpt == 1) {
169+
throw new Exception("Error during write");
170+
}
171+
}
172+
};
173+
Step step = new StepBuilderFactory(jobRepository, transactionManager).get("step")
174+
.<Integer, Integer>chunk(5)
175+
.reader(itemReader)
176+
.processor(itemProcessor)
177+
.writer(itemWriter)
178+
.faultTolerant()
179+
.skip(Exception.class)
180+
.skipLimit(3)
181+
.build();
182+
183+
// When
184+
StepExecution stepExecution = execute(step);
185+
186+
// Then
187+
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
188+
assertEquals(ExitStatus.COMPLETED, stepExecution.getExitStatus());
189+
assertEquals(7, stepExecution.getReadCount());
190+
assertEquals(6, stepExecution.getWriteCount());
191+
assertEquals(1, stepExecution.getProcessSkipCount());
192+
}
193+
143194
private List<Integer> createItems() {
144195
List<Integer> items = new ArrayList<>(TOTAL_ITEMS);
145196
for (int i = 1; i <= TOTAL_ITEMS; i++) {

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,14 @@ private void scan(final StepContribution contribution, final Chunk<I> inputs, fi
580580
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
581581
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
582582

583+
//BATCH-2442 : do not scan skipped items
584+
if (!inputs.getSkips().isEmpty()) {
585+
if (outputIterator.hasNext()) {
586+
outputIterator.remove();
587+
return;
588+
}
589+
}
590+
583591
List<O> items = Collections.singletonList(outputIterator.next());
584592
inputIterator.next();
585593
try {

0 commit comments

Comments
 (0)