Skip to content

Use more SynchronizedItemReader in tests #4452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -43,6 +41,7 @@
import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.batch.support.transaction.TransactionAwareProxyFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
Expand Down Expand Up @@ -79,7 +78,6 @@ class FaultTolerantStepFactoryBeanRollbackTests {

private JobRepository repository;

@SuppressWarnings("unchecked")
@BeforeEach
void setUp() throws Exception {
reader = new SkipReaderStub<>();
Expand All @@ -103,7 +101,7 @@ void setUp() throws Exception {

factory.setSkipLimit(2);

factory.setSkippableExceptionClasses(getExceptionMap(Exception.class));
factory.setSkippableExceptionClasses(Map.of(Exception.class, true));

EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
Expand Down Expand Up @@ -177,17 +175,16 @@ void testReaderDefaultNoRollbackOnCheckedException() throws Exception {
/**
* Scenario: Exception in reader that should not cause rollback
*/
@SuppressWarnings("unchecked")
@Test
void testReaderAttributesOverrideSkippableNoRollback() throws Exception {
reader.setFailures("2", "3");
reader.setItems("1", "2", "3", "4");
reader.setExceptionType(SkippableException.class);

// No skips by default
factory.setSkippableExceptionClasses(getExceptionMap(RuntimeException.class));
factory.setSkippableExceptionClasses(Map.of(RuntimeException.class, true));
// But this one is explicit in the tx-attrs so it should be skipped
factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));

Step step = factory.getObject();

Expand Down Expand Up @@ -249,11 +246,8 @@ void testNoRollbackInProcessorWhenSkipExceeded() throws Throwable {
processor.clear();
factory.setItemProcessor(processor);

List<Class<? extends Throwable>> exceptions = Arrays.asList(Exception.class);
factory.setNoRollbackExceptionClasses(exceptions);
@SuppressWarnings("unchecked")
Map<Class<? extends Throwable>, Boolean> skippable = getExceptionMap(Exception.class);
factory.setSkippableExceptionClasses(skippable);
factory.setNoRollbackExceptionClasses(List.of(Exception.class));
factory.setSkippableExceptionClasses(Map.of(Exception.class, true));

processor.setFailures("2");

Expand All @@ -279,7 +273,7 @@ void testProcessSkipWithNoRollbackForCheckedException() throws Exception {
processor.setFailures("4");
processor.setExceptionType(SkippableException.class);

factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));

Step step = factory.getObject();

Expand Down Expand Up @@ -359,7 +353,7 @@ void testWriterNoRollbackOnRuntimeException() throws Exception {
writer.setFailures("2", "3");
writer.setExceptionType(SkippableRuntimeException.class);

factory.setNoRollbackExceptionClasses(getExceptionList(SkippableRuntimeException.class));
factory.setNoRollbackExceptionClasses(List.of(SkippableRuntimeException.class));

Step step = factory.getObject();

Expand All @@ -380,7 +374,7 @@ void testWriterNoRollbackOnCheckedException() throws Exception {
writer.setFailures("2", "3");
writer.setExceptionType(SkippableException.class);

factory.setNoRollbackExceptionClasses(getExceptionList(SkippableException.class));
factory.setNoRollbackExceptionClasses(List.of(SkippableException.class));

Step step = factory.getObject();

Expand Down Expand Up @@ -517,12 +511,7 @@ void testSkipInWriterTransactionalReader() throws Exception {

@Test
void testMultithreadedSkipInWriter() throws Exception {
factory.setItemReader(new ItemReader<>() {
@Override
public synchronized String read() throws Exception {
return reader.read();
}
});
factory.setItemReader(new SynchronizedItemReader<>(reader));
writer.setFailures("1", "2", "3", "4", "5");
factory.setCommitInterval(3);
factory.setSkipLimit(10);
Expand Down Expand Up @@ -575,23 +564,9 @@ void testMultipleSkipsInWriterNonTransactionalProcessor() throws Exception {
assertEquals("[1, 2, 3, 4, 5]", processor.getProcessed().toString());
}

@SuppressWarnings("unchecked")
private Collection<Class<? extends Throwable>> getExceptionList(Class<? extends Throwable> arg) {
return Arrays.<Class<? extends Throwable>>asList(arg);
}

@SuppressWarnings("unchecked")
private Map<Class<? extends Throwable>, Boolean> getExceptionMap(Class<? extends Throwable>... args) {
Map<Class<? extends Throwable>, Boolean> map = new HashMap<>();
for (Class<? extends Throwable> arg : args) {
map.put(arg, true);
}
return map;
}

static class ExceptionThrowingChunkListener implements ChunkListener {

private int phase = -1;
private final int phase;

public ExceptionThrowingChunkListener(int throwPhase) {
this.phase = throwPhase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
import org.springframework.batch.core.step.JobRepositorySupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.PassThroughItemProcessor;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.batch.repeat.support.RepeatTemplate;
import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

class AsyncTaskletStepTests {
Expand Down Expand Up @@ -83,8 +83,8 @@ private void setUp() {

RepeatTemplate chunkTemplate = new RepeatTemplate();
chunkTemplate.setCompletionPolicy(new SimpleCompletionPolicy(2));
step.setTasklet(new TestingChunkOrientedTasklet<>(new ListItemReader<>(items), itemProcessor, itemWriter,
chunkTemplate));
step.setTasklet(new TestingChunkOrientedTasklet<>(new SynchronizedItemReader<>(new ListItemReader<>(items)),
itemProcessor, itemWriter, chunkTemplate));

jobRepository = new JobRepositorySupport();
step.setJobRepository(jobRepository);
Expand All @@ -96,12 +96,11 @@ private void setUp() {
template.setTaskExecutor(taskExecutor);
step.setStepOperations(template);

step.registerStream(new ItemStreamSupport() {
step.registerStream(new ItemStream() {
private int count = 0;

@Override
public void update(ExecutionContext executionContext) {
super.update(executionContext);
executionContext.putInt("counter", count++);
}
});
Expand All @@ -125,10 +124,8 @@ void testStepExecutionUpdates() throws Exception {
step.execute(stepExecution);

assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
// assertEquals(25, stepExecution.getReadCount());
// assertEquals(25, processed.size());
assertTrue(stepExecution.getReadCount() >= 25);
assertTrue(processed.size() >= 25);
assertEquals(25, stepExecution.getReadCount());
assertEquals(25, processed.size());

// Check commit count didn't spin out of control waiting for other
// threads to finish...
Expand Down Expand Up @@ -170,17 +167,13 @@ void testStepExecutionFailsWithProcessor() throws Exception {
throttleLimit = 1;
concurrencyLimit = 1;
items = Arrays.asList("one", "barf", "three", "four");
itemProcessor = new ItemProcessor<>() {
@Nullable
@Override
public String process(String item) throws Exception {
logger.info("Item: " + item);
processed.add(item);
if (item.equals("barf")) {
throw new RuntimeException("Planned processor error");
}
return item;
itemProcessor = item -> {
logger.info("Item: " + item);
processed.add(item);
if (item.equals("barf")) {
throw new RuntimeException("Planned processor error");
}
return item;
};
setUp();

Expand Down