Skip to content

Commit b0fd0e6

Browse files
authored
Fix request cancellation issue in the AWS CRT-based S3 client that co… (#4955)
* Fix request cancellation issue in the AWS CRT-based S3 client that could lead to memory leak * Fix build and address feedback * Fix test * Fix build
1 parent 2692379 commit b0fd0e6

File tree

21 files changed

+495
-162
lines changed

21 files changed

+495
-162
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT-based S3 client",
4+
"contributor": "",
5+
"description": "Fixed memory leak issue when a request was cancelled in the AWS CRT-based S3 client."
6+
}

build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,15 @@
5353
<!-- Allow private field declaration before public, to have correct initialization order -->
5454
<suppress checks="DeclarationOrder"
5555
files=".*SdkAdvancedClientOption\.java$"/>
56+
57+
<!-- Ignore usage of S3MetaRequest in S3MetaRequestWrapper. !-->
58+
<suppress checks="Regexp"
59+
files="software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper.java"/>
60+
61+
<!-- Ignore usage of S3MetaRequest in CrtS3ClientDownloadBenchmark. !-->
62+
<suppress checks="Regexp"
63+
files="software.amazon.awssdk.s3benchmarks.CrtS3ClientDownloadBenchmark.java"/>
64+
<!-- Ignore usage of S3MetaRequest in CrtS3ClientUploadBenchmark. !-->
65+
<suppress checks="Regexp"
66+
files="software.amazon.awssdk.s3benchmarks.CrtS3ClientUploadBenchmark.java"/>
5667
</suppressions>

build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@
359359
<property name="ignoreComments" value="true"/>
360360
</module>
361361

362+
<!-- Checks that we don't use S3MetaRequest -->
363+
<module name="Regexp">
364+
<property name="format" value="\bS3MetaRequest\b"/>
365+
<property name="illegalPattern" value="true"/>
366+
<property name="message" value="Don't use S3MetaRequest directly. Use software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper instead"/>
367+
<property name="ignoreComments" value="true"/>
368+
</module>
369+
362370
<!-- Checks that we don't implement AutoCloseable/Closeable -->
363371
<module name="Regexp">
364372
<property name="format" value="(class|interface).*(implements|extends).*[^\w](Closeable|AutoCloseable)[^\w]"/>

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtFileUploadTest.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.crt.s3.ResumeToken;
4141
import software.amazon.awssdk.crt.s3.S3MetaRequest;
4242
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
43+
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestWrapper;
4344
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
4445
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
4546
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
@@ -53,7 +54,7 @@ class CrtFileUploadTest {
5354
private static final int NUM_OF_PARTS_COMPLETED = 5;
5455
private static final long PART_SIZE_IN_BYTES = 8 * MB;
5556
private static final String MULTIPART_UPLOAD_ID = "someId";
56-
private S3MetaRequest metaRequest;
57+
private S3MetaRequestPauseObservable observable;
5758
private static FileSystem fileSystem;
5859
private static File file;
5960
private static ResumeToken token;
@@ -77,7 +78,7 @@ public static void tearDown() throws IOException {
7778

7879
@BeforeEach
7980
void setUpBeforeEachTest() {
80-
metaRequest = Mockito.mock(S3MetaRequest.class);
81+
observable = Mockito.mock(S3MetaRequestPauseObservable.class);
8182
}
8283

8384
@Test
@@ -102,17 +103,13 @@ void pause_futureCompleted_shouldReturnNormally() {
102103
.sdkResponse(putObjectResponse)
103104
.transferredBytes(0L)
104105
.build());
105-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
106-
107106
UploadFileRequest request = uploadFileRequest();
108107

109108
CrtFileUpload fileUpload =
110109
new CrtFileUpload(future, transferProgress, observable, request);
111110

112-
observable.subscribe(metaRequest);
113-
114111
ResumableFileUpload resumableFileUpload = fileUpload.pause();
115-
Mockito.verify(metaRequest, Mockito.never()).pause();
112+
Mockito.verify(observable, Mockito.never()).pause();
116113
assertThat(resumableFileUpload.totalParts()).isEmpty();
117114
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
118115
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
@@ -130,10 +127,7 @@ void pauseTwice_shouldReturnTheSame() {
130127
.transferredBytes(1000L)
131128
.build());
132129
UploadFileRequest request = uploadFileRequest();
133-
134-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
135-
when(metaRequest.pause()).thenReturn(token);
136-
observable.subscribe(metaRequest);
130+
when(observable.pause()).thenReturn(token);
137131

138132
CrtFileUpload fileUpload =
139133
new CrtFileUpload(future, transferProgress, observable, request);
@@ -154,10 +148,8 @@ void pause_crtThrowException_shouldPropogate() {
154148
.build());
155149
UploadFileRequest request = uploadFileRequest();
156150

157-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
158151
CrtRuntimeException exception = new CrtRuntimeException("exception");
159-
when(metaRequest.pause()).thenThrow(exception);
160-
observable.subscribe(metaRequest);
152+
when(observable.pause()).thenThrow(exception);
161153

162154
CrtFileUpload fileUpload =
163155
new CrtFileUpload(future, transferProgress, observable, request);
@@ -173,17 +165,14 @@ void pause_futureNotComplete_shouldPause() {
173165
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
174166
.transferredBytes(0L)
175167
.build());
176-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
177-
when(metaRequest.pause()).thenReturn(token);
168+
when(observable.pause()).thenReturn(token);
178169
UploadFileRequest request = uploadFileRequest();
179170

180171
CrtFileUpload fileUpload =
181172
new CrtFileUpload(future, transferProgress, observable, request);
182173

183-
observable.subscribe(metaRequest);
184-
185174
ResumableFileUpload resumableFileUpload = fileUpload.pause();
186-
Mockito.verify(metaRequest).pause();
175+
Mockito.verify(observable).pause();
187176
assertThat(resumableFileUpload.totalParts()).hasValue(TOTAL_PARTS);
188177
assertThat(resumableFileUpload.partSizeInBytes()).hasValue(PART_SIZE_IN_BYTES);
189178
assertThat(resumableFileUpload.multipartUploadId()).hasValue(MULTIPART_UPLOAD_ID);
@@ -204,17 +193,14 @@ void pause_singlePart_shouldPause() {
204193
.sdkResponse(putObjectResponse)
205194
.transferredBytes(0L)
206195
.build());
207-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
208-
when(metaRequest.pause()).thenThrow(new CrtRuntimeException(6));
196+
when(observable.pause()).thenThrow(new CrtRuntimeException(6));
209197
UploadFileRequest request = uploadFileRequest();
210198

211199
CrtFileUpload fileUpload =
212200
new CrtFileUpload(future, transferProgress, observable, request);
213201

214-
observable.subscribe(metaRequest);
215-
216202
ResumableFileUpload resumableFileUpload = fileUpload.pause();
217-
Mockito.verify(metaRequest).pause();
203+
Mockito.verify(observable).pause();
218204
assertThat(resumableFileUpload.totalParts()).isEmpty();
219205
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
220206
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) throws Interr
160160
assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class);
161161
assertThat(transferListener.isTransferComplete()).isFalse();
162162
assertThat(transferListener.isTransferInitiated()).isTrue();
163-
assertMockOnFailure(transferListenerMock);
163+
Mockito.verify(transferListenerMock, times(1)).transferFailed(ArgumentMatchers.any());
164+
Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any());
165+
Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any());
164166

165167
}
166168

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import software.amazon.awssdk.crt.s3.ResumeToken;
4646
import software.amazon.awssdk.crt.s3.S3Client;
4747
import software.amazon.awssdk.crt.s3.S3ClientOptions;
48-
import software.amazon.awssdk.crt.s3.S3MetaRequest;
4948
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
5049
import software.amazon.awssdk.http.Header;
5150
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
@@ -54,7 +53,6 @@
5453
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5554
import software.amazon.awssdk.regions.Region;
5655
import software.amazon.awssdk.utils.AttributeMap;
57-
import software.amazon.awssdk.utils.Logger;
5856
import software.amazon.awssdk.utils.NumericUtils;
5957
import software.amazon.awssdk.utils.http.SdkHttpUtils;
6058

@@ -64,7 +62,6 @@
6462
*/
6563
@SdkInternalApi
6664
public final class S3CrtAsyncHttpClient implements SdkAsyncHttpClient {
67-
private static final Logger log = Logger.loggerFor(S3CrtAsyncHttpClient.class);
6865

6966
private final S3Client crtS3Client;
7067

@@ -133,10 +130,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
133130
URI uri = asyncRequest.request().getUri();
134131
HttpRequest httpRequest = toCrtRequest(asyncRequest);
135132
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
133+
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
136134
S3CrtResponseHandlerAdapter responseHandler =
137135
new S3CrtResponseHandlerAdapter(executeFuture,
138136
asyncRequest.responseHandler(),
139-
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER));
137+
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
138+
s3MetaRequestFuture);
140139

141140
S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);
142141

@@ -160,16 +159,19 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
160159
.withRequestFilePath(requestFilePath)
161160
.withSigningConfig(signingConfig);
162161

163-
S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions);
164-
S3MetaRequestPauseObservable observable =
165-
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
162+
try {
163+
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));
164+
s3MetaRequestFuture.complete(requestWrapper);
166165

167-
responseHandler.metaRequest(s3MetaRequest);
166+
S3MetaRequestPauseObservable observable =
167+
httpExecutionAttributes.getAttribute(METAREQUEST_PAUSE_OBSERVABLE);
168168

169-
if (observable != null) {
170-
observable.subscribe(s3MetaRequest);
169+
if (observable != null) {
170+
observable.subscribe(requestWrapper);
171+
}
172+
} finally {
173+
signingConfig.close();
171174
}
172-
closeResourceCallback(executeFuture, s3MetaRequest, responseHandler, signingConfig);
173175

174176
return executeFuture;
175177
}
@@ -215,23 +217,6 @@ private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequ
215217
return S3MetaRequestOptions.MetaRequestType.DEFAULT;
216218
}
217219

218-
private static void closeResourceCallback(CompletableFuture<Void> executeFuture,
219-
S3MetaRequest s3MetaRequest,
220-
S3CrtResponseHandlerAdapter responseHandler,
221-
AwsSigningConfig signingConfig) {
222-
executeFuture.whenComplete((r, t) -> {
223-
if (executeFuture.isCancelled()) {
224-
log.debug(() -> "The request is cancelled, cancelling meta request");
225-
responseHandler.cancelRequest();
226-
s3MetaRequest.cancel();
227-
signingConfig.close();
228-
} else {
229-
s3MetaRequest.close();
230-
signingConfig.close();
231-
}
232-
});
233-
}
234-
235220
private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
236221
SdkHttpRequest sdkRequest = asyncRequest.request();
237222

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@
2525
import software.amazon.awssdk.crt.CRT;
2626
import software.amazon.awssdk.crt.http.HttpHeader;
2727
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
28-
import software.amazon.awssdk.crt.s3.S3MetaRequest;
2928
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
3029
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
31-
import software.amazon.awssdk.http.SdkCancellationException;
3230
import software.amazon.awssdk.http.SdkHttpResponse;
3331
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
3432
import software.amazon.awssdk.utils.Logger;
@@ -46,20 +44,43 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
4644
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();
4745

4846
private final SdkHttpResponse.Builder initialHeadersResponse = SdkHttpResponse.builder();
49-
private volatile S3MetaRequest metaRequest;
47+
private final CompletableFuture<S3MetaRequestWrapper> metaRequestFuture;
5048

5149
private final PublisherListener<S3MetaRequestProgress> progressListener;
5250

5351
private volatile boolean responseHandlingInitiated;
5452

5553
public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture,
5654
SdkAsyncHttpResponseHandler responseHandler,
57-
PublisherListener<S3MetaRequestProgress> progressListener) {
55+
PublisherListener<S3MetaRequestProgress> progressListener,
56+
CompletableFuture<S3MetaRequestWrapper> metaRequestFuture) {
5857
this.resultFuture = executeFuture;
58+
this.metaRequestFuture = metaRequestFuture;
59+
60+
resultFuture.whenComplete((r, t) -> {
61+
S3MetaRequestWrapper s3MetaRequest = s3MetaRequest();
62+
if (s3MetaRequest == null) {
63+
return;
64+
}
65+
66+
if (t != null) {
67+
s3MetaRequest.cancel();
68+
}
69+
s3MetaRequest.close();
70+
});
71+
5972
this.responseHandler = responseHandler;
6073
this.progressListener = progressListener == null ? new NoOpPublisherListener() : progressListener;
6174
}
6275

76+
private S3MetaRequestWrapper s3MetaRequest() {
77+
if (!metaRequestFuture.isDone()) {
78+
return null;
79+
}
80+
81+
return metaRequestFuture.join();
82+
}
83+
6384
@Override
6485
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
6586
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
@@ -87,6 +108,13 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
87108
return;
88109
}
89110

111+
S3MetaRequestWrapper metaRequest = s3MetaRequest();
112+
if (metaRequest == null) {
113+
// should not happen
114+
failResponseHandlerAndFuture(SdkClientException.create("Unexpected exception occurred: s3metaRequest is not "
115+
+ "initialized yet"));
116+
return;
117+
}
90118
metaRequest.incrementReadWindow(bytesReceived);
91119
});
92120

@@ -115,22 +143,10 @@ private void onSuccessfulResponseComplete() {
115143
return;
116144
}
117145
this.progressListener.subscriberOnComplete();
118-
completeFutureAndCloseRequest();
146+
resultFuture.complete(null);
119147
});
120148
}
121149

122-
private void completeFutureAndCloseRequest() {
123-
resultFuture.complete(null);
124-
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
125-
() -> metaRequest.close());
126-
}
127-
128-
public void cancelRequest() {
129-
SdkCancellationException sdkClientException =
130-
new SdkCancellationException("request is cancelled");
131-
failResponseHandlerAndFuture(sdkClientException);
132-
}
133-
134150
private void handleError(S3FinishedResponseContext context) {
135151
int crtCode = context.getErrorCode();
136152
HttpHeader[] headers = context.getErrorHeaders();
@@ -168,27 +184,21 @@ private void onErrorResponseComplete(byte[] errorPayload) {
168184
failResponseHandlerAndFuture(throwable);
169185
return null;
170186
}
171-
completeFutureAndCloseRequest();
187+
resultFuture.complete(null);
172188
return null;
173189
});
174190
}
175191

176192
private void failResponseHandlerAndFuture(Throwable exception) {
177-
resultFuture.completeExceptionally(exception);
178193
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
179194
() -> responseHandler.onError(exception));
180-
runAndLogError(log.logger(), "Exception thrown in S3MetaRequest#close, ignoring",
181-
() -> metaRequest.close());
195+
resultFuture.completeExceptionally(exception);
182196
}
183197

184198
private static boolean isErrorResponse(int responseStatus) {
185199
return responseStatus != 0;
186200
}
187201

188-
public void metaRequest(S3MetaRequest s3MetaRequest) {
189-
metaRequest = s3MetaRequest;
190-
}
191-
192202
@Override
193203
public void onProgress(S3MetaRequestProgress progress) {
194204
this.progressListener.subscriberOnNext(progress);

0 commit comments

Comments
 (0)