Skip to content

Commit 4280728

Browse files
tishunjinkshower
andauthored
Add defensive copy for Futures allOf() method (#2943) (#3040)
* Add defensive copy Closes #2935 * Polishing * Forgot to call formatter, shame on me --------- Co-authored-by: jinkshower <[email protected]>
1 parent 8e3230e commit 4280728

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

src/main/java/io/lettuce/core/internal/Futures.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* without further notice.
1313
*
1414
* @author Mark Paluch
15+
* @author jinkshower
1516
* @since 5.1
1617
*/
1718
public abstract class Futures {
@@ -21,7 +22,7 @@ private Futures() {
2122
}
2223

2324
/**
24-
* Create a composite {@link CompletableFuture} is composed from the given {@code stages}.
25+
* Create a composite {@link CompletableFuture} that is composed of the given {@code stages}.
2526
*
2627
* @param stages must not be {@code null}.
2728
* @return the composed {@link CompletableFuture}.
@@ -32,10 +33,11 @@ public static CompletableFuture<Void> allOf(Collection<? extends CompletionStage
3233

3334
LettuceAssert.notNull(stages, "Futures must not be null");
3435

35-
CompletableFuture[] futures = new CompletableFuture[stages.size()];
36+
CompletionStage[] copies = stages.toArray(new CompletionStage[0]);
37+
CompletableFuture[] futures = new CompletableFuture[copies.length];
3638

3739
int index = 0;
38-
for (CompletionStage<?> stage : stages) {
40+
for (CompletionStage<?> stage : copies) {
3941
futures[index++] = stage.toCompletableFuture();
4042
}
4143

src/test/java/io/lettuce/core/internal/FuturesUnitTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
import static org.assertj.core.api.Assertions.assertThat;
55
import static org.assertj.core.api.Assertions.assertThatThrownBy;
66

7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.List;
710
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
814

915
import org.junit.jupiter.api.BeforeEach;
1016
import org.junit.jupiter.api.Test;
@@ -16,6 +22,7 @@
1622
* Unit tests for {@link Futures}.
1723
*
1824
* @author Mark Paluch
25+
* @author Tihomir Mateev
1926
*/
2027
class FuturesUnitTests {
2128

@@ -56,4 +63,33 @@ void awaitAllShouldSetInterruptedBit() {
5663
assertThat(Thread.currentThread().isInterrupted()).isTrue();
5764
}
5865

66+
@Test
67+
void allOfShouldNotThrow() throws InterruptedException {
68+
int threadCount = 100;
69+
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
70+
List<Throwable> issues = new ArrayList<>();
71+
List<CompletableFuture<Void>> futures = Collections.synchronizedList(new ArrayList<>());
72+
// Submit multiple threads to perform concurrent operations
73+
CountDownLatch latch = new CountDownLatch(threadCount);
74+
for (int i = 0; i < threadCount; i++) {
75+
executorService.submit(() -> {
76+
try {
77+
for (int y = 0; y < 1000; y++) {
78+
futures.add(new CompletableFuture<>());
79+
}
80+
81+
Futures.allOf(futures);
82+
} catch (Exception e) {
83+
issues.add(e);
84+
} finally {
85+
latch.countDown();
86+
}
87+
});
88+
}
89+
90+
// wait for all threads to complete
91+
latch.await();
92+
assertThat(issues).doesNotHaveAnyElementsOfTypes(ArrayIndexOutOfBoundsException.class);
93+
}
94+
5995
}

0 commit comments

Comments
 (0)