Skip to content

Commit 0e423db

Browse files
committed
Update the pool size for asyncFutureCompletionPool and validate peak thread count in stability tests
1 parent df2f9e3 commit 0e423db

File tree

8 files changed

+92
-7
lines changed

8 files changed

+92
-7
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "Update the pool size for default async future completion executor service. See [#1251](https://github.com/aws/aws-sdk-java-v2/issues/1251), [#994](https://github.com/aws/aws-sdk-java-v2/issues/994)"
5+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,16 +264,20 @@ protected AttributeMap childHttpConfig() {
264264
}
265265

266266
/**
267-
* Finalize which async executor service will be used for the created client.
267+
* Finalize which async executor service will be used for the created client. The default async executor
268+
* service has at least 8 core threads and can scale up to at least 64 threads when needed depending
269+
* on the number of processors available.
268270
*/
269271
private Executor resolveAsyncFutureCompletionExecutor(SdkClientConfiguration config) {
270272
Supplier<Executor> defaultExecutor = () -> {
271-
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50,
273+
int processors = Runtime.getRuntime().availableProcessors();
274+
int corePoolSize = Math.max(8, processors);
275+
int maxPoolSize = Math.max(64, processors * 2);
276+
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
272277
10, TimeUnit.SECONDS,
273-
new LinkedBlockingQueue<>(10_000),
278+
new LinkedBlockingQueue<>(1_000),
274279
new ThreadFactoryBuilder()
275280
.threadNamePrefix("sdk-async-response").build());
276-
277281
// Allow idle core threads to time out
278282
executor.allowCoreThreadTimeOut(true);
279283
return executor;
@@ -421,4 +425,5 @@ public void close() {
421425
}
422426
}
423427

428+
424429
}

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/TestResult.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class TestResult {
2626
private final int ioExceptionCount;
2727
private final int clientExceptionCount;
2828
private final int unknownExceptionCount;
29+
private final int peakThreadCount;
30+
private final double heapMemoryAfterGCUsage;
2931

3032
private TestResult(Builder builder) {
3133
this.testName = builder.testName;
@@ -34,6 +36,8 @@ private TestResult(Builder builder) {
3436
this.ioExceptionCount = builder.ioExceptionCount;
3537
this.clientExceptionCount = builder.clientExceptionCount;
3638
this.unknownExceptionCount = builder.unknownExceptionCount;
39+
this.heapMemoryAfterGCUsage = builder.heapMemoryAfterGCUsage;
40+
this.peakThreadCount = builder.peakThreadCount;
3741
}
3842

3943
public static Builder builder() {
@@ -64,6 +68,14 @@ public int unknownExceptionCount() {
6468
return unknownExceptionCount;
6569
}
6670

71+
public int peakThreadCount() {
72+
return peakThreadCount;
73+
}
74+
75+
public double heapMemoryAfterGCUsage() {
76+
return heapMemoryAfterGCUsage;
77+
}
78+
6779
@Override
6880
public String toString() {
6981
return "{" +
@@ -73,6 +85,8 @@ public String toString() {
7385
", ioExceptionCount: " + ioExceptionCount +
7486
", clientExceptionCount: " + clientExceptionCount +
7587
", unknownExceptionCount: " + unknownExceptionCount +
88+
", peakThreadCount: " + peakThreadCount +
89+
", heapMemoryAfterGCUsage: " + heapMemoryAfterGCUsage +
7690
'}';
7791
}
7892

@@ -83,6 +97,8 @@ public static class Builder {
8397
private int ioExceptionCount;
8498
private int clientExceptionCount;
8599
private int unknownExceptionCount;
100+
private int peakThreadCount;
101+
private double heapMemoryAfterGCUsage;
86102

87103
private Builder() {
88104
}
@@ -117,6 +133,16 @@ public Builder unknownExceptionCount(int unknownExceptionCount) {
117133
return this;
118134
}
119135

136+
public Builder peakThreadCount(int peakThreadCount) {
137+
this.peakThreadCount = peakThreadCount;
138+
return this;
139+
}
140+
141+
public Builder heapMemoryAfterGCUsage(double heapMemoryAfterGCUsage) {
142+
this.heapMemoryAfterGCUsage = heapMemoryAfterGCUsage;
143+
return this;
144+
}
145+
120146
public TestResult build() {
121147
return new TestResult(this);
122148
}

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/s3/S3AsyncStabilityTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static void setup() {
4545
@AfterAll
4646
public static void cleanup() {
4747
deleteBucketAndAllContents(bucketName);
48+
s3NettyClient.close();
4849
}
4950

5051
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/sqs/SqsAsyncStabilityTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public static void tearDown() {
4747
if (queueUrl != null) {
4848
sqsAsyncClient.deleteQueue(b -> b.queueUrl(queueUrl));
4949
}
50+
sqsAsyncClient.close();
5051
}
5152

5253
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/transcribestreaming/TranscribeStreamingStabilityTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.function.IntFunction;
22+
import org.junit.jupiter.api.AfterAll;
2223
import org.junit.jupiter.api.BeforeAll;
2324
import org.reactivestreams.Publisher;
2425
import org.reactivestreams.Subscriber;
@@ -63,6 +64,11 @@ public static void setup() {
6364
}
6465
}
6566

67+
@AfterAll
68+
public static void tearDown() {
69+
transcribeStreamingClient.close();
70+
}
71+
6672
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
6773
public void startTranscription() {
6874
IntFunction<CompletableFuture<?>> futureIntFunction = i ->

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/utils/RetryableTestExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private static final class RetryableTestsTemplateInvocationContext implements Te
104104
private static final class RetryExecutor implements Iterator<RetryableTestsTemplateInvocationContext> {
105105
private final int maxRetries;
106106
private int totalAttempts;
107-
private static final List<Throwable> throwables = new ArrayList<>();
107+
private List<Throwable> throwables = new ArrayList<>();
108108

109109
RetryExecutor(int maxRetries) {
110110
this.maxRetries = maxRetries;

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/utils/StabilityTestRunner.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616
package software.amazon.awssdk.stability.tests.utils;
1717

1818

19+
import static java.lang.management.MemoryType.HEAP;
20+
1921
import java.io.IOException;
22+
import java.lang.management.ManagementFactory;
23+
import java.lang.management.MemoryPoolMXBean;
24+
import java.lang.management.MemoryUsage;
25+
import java.lang.management.ThreadMXBean;
2026
import java.time.Duration;
2127
import java.util.Arrays;
2228
import java.util.List;
@@ -65,15 +71,23 @@ public class StabilityTestRunner {
6571
private static final Logger log = Logger.loggerFor(StabilityTestRunner.class);
6672
private static final double ALLOWED_FAILURE_RATIO = 0.05;
6773
private static final int TESTS_TIMEOUT_IN_MINUTES = 60;
74+
// The peak thread count might be different depending on the machine the tests are currently running on.
75+
// because of the internal thread pool used in AsynchronousFileChannel
76+
private static final int ALLOWED_PEAK_THREAD_COUNT = 60;
6877

78+
private ThreadMXBean threadMXBean;
6979
private IntFunction<CompletableFuture<?>> futureFactory;
7080
private List<CompletableFuture<?>> futures;
7181
private String testName;
7282
private Duration delay = Duration.ZERO;
7383
private Integer requestCountPerRun;
7484
private Integer totalRuns = 1;
7585

86+
7687
private StabilityTestRunner() {
88+
threadMXBean = ManagementFactory.getThreadMXBean();
89+
// Reset peak thread count for every test
90+
threadMXBean.resetPeakThreadCount();
7791
}
7892

7993
/**
@@ -177,6 +191,26 @@ private TestResult runTestsFromFutureFunction() {
177191
return generateTestResult(totalRequestNumber, testName, exceptionCounter, completableFutures);
178192
}
179193

194+
private double calculateHeapMemoryAfterGCUsage() {
195+
List<MemoryPoolMXBean> memoryPoolMXBeans = ManagementFactory.getMemoryPoolMXBeans();
196+
197+
long used = 0, max = 0;
198+
199+
for (MemoryPoolMXBean memoryPoolMXBean : memoryPoolMXBeans) {
200+
String name = memoryPoolMXBean.getName();
201+
202+
if (!name.contains("Eden")) {
203+
if (memoryPoolMXBean.getType().equals(HEAP)) {
204+
MemoryUsage memoryUsage = memoryPoolMXBean.getCollectionUsage();
205+
used += memoryUsage.getUsed();
206+
max += memoryUsage.getMax() == -1 ? 0 : memoryUsage.getMax();
207+
}
208+
}
209+
}
210+
211+
return used / (double) max;
212+
}
213+
180214
private TestResult runTestsFromFutures() {
181215
ExceptionCounter exceptionCounter = new ExceptionCounter();
182216
CompletableFuture[] completableFutures =
@@ -229,7 +263,7 @@ private static boolean isIOException(Throwable throwable) {
229263
return throwable.getClass().isAssignableFrom(IOException.class);
230264
}
231265

232-
private static TestResult generateTestResult(int totalRequestNumber, String testName, ExceptionCounter exceptionCounter,
266+
private TestResult generateTestResult(int totalRequestNumber, String testName, ExceptionCounter exceptionCounter,
233267
CompletableFuture[] completableFutures) {
234268
try {
235269
CompletableFuture.allOf(completableFutures).get(TESTS_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
@@ -247,6 +281,8 @@ private static TestResult generateTestResult(int totalRequestNumber, String test
247281
.ioExceptionCount(exceptionCounter.ioExceptionCount())
248282
.totalRequestCount(totalRequestNumber)
249283
.unknownExceptionCount(exceptionCounter.unknownExceptionCount())
284+
.peakThreadCount(threadMXBean.getPeakThreadCount())
285+
.heapMemoryAfterGCUsage(calculateHeapMemoryAfterGCUsage())
250286
.build();
251287
}
252288

@@ -286,6 +322,11 @@ private static void processResult(TestResult testResult) {
286322
ALLOWED_FAILURE_RATIO * 100, ratio * 100);
287323
throw new StabilityTestsRetryableException(errorMessage);
288324
}
289-
}
290325

326+
if (testResult.peakThreadCount() > ALLOWED_PEAK_THREAD_COUNT) {
327+
String errorMessage = String.format("The number of peak thread exceeds the allowed peakThread threshold %s",
328+
ALLOWED_PEAK_THREAD_COUNT);
329+
throw new AssertionError(errorMessage);
330+
}
331+
}
291332
}

0 commit comments

Comments
 (0)