diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.java index 25a902b672..2f9a15d8df 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkMessageChannelItemWriter.java @@ -185,16 +185,23 @@ public Collection getStepContributions() { /** * Wait until all the results that are in the pipeline come back to the reply channel. * - * @return true if successfully received a result, false if timed out + * @return true if successfully received all pending results, false if timed out */ private boolean waitForResults() throws AsynchronousFailureException { int count = 0; int maxCount = maxWaitTimeouts; Throwable failure = null; logger.info("Waiting for " + localState.getExpecting() + " results"); + + // Read replies until we had maxCount successive timeouts. while (localState.getExpecting() > 0 && count++ < maxCount) { try { + int expecting = localState.getExpecting(); getNextResult(); + if( localState.getExpecting() < expecting ) { + // We read a reply, so reset the timeout counter + count = 0; + } } catch (Throwable t) { logger.error("Detected error in remote result. Trying to recover " + localState.getExpecting() diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkMessageItemWriterIntegrationTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkMessageItemWriterIntegrationTests.java index 009bb7fb82..ad49e52c60 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkMessageItemWriterIntegrationTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkMessageItemWriterIntegrationTests.java @@ -130,6 +130,30 @@ public void testVanillaIteration() throws Exception { } + @Test + public void testMaxtimeoutsLessThanThrottleLimit() throws Exception { + + // Make certain throttle limit is larger than max wait + // timeouts, this is not the default + writer.setMaxWaitTimeouts(2); + writer.setThrottleLimit(10); + + factory.setItemReader(new ListItemReader(Arrays.asList(StringUtils + .commaDelimitedListToStringArray("1,2,3,4,5,6,7,8,9,10")))); + + + Step step = factory.getObject(); + + StepExecution stepExecution = getStepExecution(step); + step.execute(stepExecution); + + waitForResults(10, 10); + + assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus()); + assertEquals(10, TestItemWriter.count); + assertEquals(10, stepExecution.getReadCount()); + } + @Test public void testSimulatedRestart() throws Exception {