Skip to content

Commit b895633

Browse files
committed
spring-projectsGH-4708 : in multi-threaded step, enforce the first chunk to be written by first thread
1 parent 09df30f commit b895633

File tree

3 files changed

+131
-5
lines changed

3 files changed

+131
-5
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/repeat/support/TaskExecutorRepeatTemplate.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.batch.repeat.support;
1818

19+
import java.util.concurrent.CountDownLatch;
20+
1921
import org.springframework.batch.repeat.RepeatCallback;
2022
import org.springframework.batch.repeat.RepeatContext;
2123
import org.springframework.batch.repeat.RepeatException;
@@ -59,6 +61,12 @@ public class TaskExecutorRepeatTemplate extends RepeatTemplate {
5961

6062
private TaskExecutor taskExecutor = new SyncTaskExecutor();
6163

64+
/**
65+
* A latch to ensure to manage the first chunk by the the first thread. This is
66+
* specifically required to manage data with record separators like JSON.
67+
*/
68+
private final CountDownLatch latch = new CountDownLatch(1);
69+
6270
/**
6371
* Public setter for the throttle limit. The throttle limit is the largest number of
6472
* concurrent tasks that can be executing at one time - if a new task arrives and the
@@ -110,7 +118,7 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
110118
* Wrap the callback in a runnable that will add its result to the queue when
111119
* it is ready.
112120
*/
113-
runnable = new ExecutingRunnable(callback, context, queue);
121+
runnable = new ExecutingRunnable(callback, context, queue, latch);
114122

115123
/*
116124
* Tell the runnable that it can expect a result. This could have been
@@ -130,6 +138,13 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
130138
*/
131139
update(context);
132140

141+
/*
142+
* Wait for the first chunk to be managed before to create other threads. This
143+
* will ensure to correctly write first data chunk with record separators like
144+
* JSON.
145+
*/
146+
latch.await();
147+
133148
/*
134149
* Keep going until we get a result that is finished, or early termination...
135150
*/
@@ -216,14 +231,17 @@ private class ExecutingRunnable implements Runnable, ResultHolder {
216231

217232
private volatile Throwable error;
218233

219-
public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
234+
private CountDownLatch latch;
235+
236+
public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue,
237+
CountDownLatch latch) {
220238

221239
super();
222240

223241
this.callback = callback;
224242
this.context = context;
225243
this.queue = queue;
226-
244+
this.latch = latch;
227245
}
228246

229247
/**
@@ -272,6 +290,11 @@ public void run() {
272290

273291
queue.put(this);
274292

293+
/*
294+
* If this is the first chunk, then release the latch so that other
295+
* threads can be created.
296+
*/
297+
this.latch.countDown();
275298
}
276299
}
277300

spring-batch-infrastructure/src/test/java/org/springframework/batch/repeat/support/AbstractTradeBatchTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.batch.repeat.support;
1818

19+
import java.util.ArrayList;
20+
1921
import org.junit.jupiter.api.BeforeEach;
2022

2123
import org.springframework.batch.item.Chunk;
@@ -42,12 +44,16 @@ abstract class AbstractTradeBatchTests {
4244

4345
Resource resource = new ClassPathResource("trades.csv", getClass());
4446

45-
protected TradeWriter processor = new TradeWriter();
47+
protected TradeWriter processor;
4648

4749
protected TradeItemReader provider;
4850

51+
protected ArrayList<Trade> output;
52+
4953
@BeforeEach
5054
void setUp() throws Exception {
55+
output = new ArrayList<>();
56+
processor = new TradeWriter(output);
5157
provider = new TradeItemReader(resource);
5258
provider.open(new ExecutionContext());
5359
}
@@ -79,10 +85,17 @@ protected static class TradeWriter implements ItemWriter<Trade> {
7985

8086
int count = 0;
8187

88+
private ArrayList<Trade> out;
89+
90+
public TradeWriter(ArrayList<Trade> out) {
91+
this.out = out;
92+
}
93+
8294
// This has to be synchronized because we are going to test the state
8395
// (count) at the end of a concurrent batch run.
8496
@Override
8597
public synchronized void write(Chunk<? extends Trade> data) {
98+
out.addAll(data.getItems());
8699
count++;
87100
}
88101

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.repeat.support;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
21+
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.RepeatedTest;
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
28+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
29+
30+
/**
31+
* Tests for concurrent behaviour in repeat template, dedicated to the first chunk, that
32+
* must be managed first when output format has separator between items, like JSON.
33+
*
34+
* @author Gerald Lelarge
35+
*
36+
*/
37+
class TaskExecutorRepeatTemplateFirstChunkTests extends AbstractTradeBatchTests {
38+
39+
private TaskExecutorRepeatTemplate template;
40+
41+
private int chunkSize = 5;
42+
43+
private final ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
44+
45+
@BeforeEach
46+
void setUp() throws Exception {
47+
48+
super.setUp();
49+
50+
threadPool.setMaxPoolSize(10);
51+
threadPool.setCorePoolSize(10);
52+
threadPool.setQueueCapacity(0);
53+
threadPool.afterPropertiesSet();
54+
55+
template = new TaskExecutorRepeatTemplate();
56+
template.setTaskExecutor(threadPool);
57+
// Limit the number of threads to 2
58+
template.setThrottleLimit(2);
59+
// Limit the number of items to read to be able to test the second item from the
60+
// output. If the chunkSize is greater than 2, the test could fail.
61+
template.setCompletionPolicy(new SimpleCompletionPolicy(chunkSize));
62+
}
63+
64+
@AfterEach
65+
void tearDown() {
66+
threadPool.destroy();
67+
}
68+
69+
/**
70+
* Test method for {@link TaskExecutorRepeatTemplate#iterate(RepeatCallback)}.
71+
* Repeat the tests 20 times to increase the probability of detecting a concurrency.
72+
*/
73+
@Test
74+
@RepeatedTest(value = 20)
75+
void testExecute() {
76+
77+
// given
78+
template.iterate(new ItemReaderRepeatCallback<>(provider, processor));
79+
80+
// then
81+
// The first element is the first item of the input trades.csv.
82+
assertEquals("UK21341EAH45", output.get(0).getIsin());
83+
// The others can have different orders.
84+
for (int i = 1; i < output.size(); i++) {
85+
assertNotEquals("UK21341EAH45", output.get(i).getIsin());
86+
}
87+
assertEquals(chunkSize, processor.count);
88+
}
89+
90+
}

0 commit comments

Comments
 (0)