Skip to content

Commit 8cf0648

Browse files
authored
Fail UploadDirectory if xform throws (#5756)
* Fail UploadDirectory if xform throws This fixes an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws. * Synchronize cancel * Update
1 parent df11e4d commit 8cf0648

File tree

6 files changed

+124
-5
lines changed

6 files changed

+124
-5
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Fix an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws."
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
4040
private final int maxConcurrentExecutions;
4141
private final AtomicInteger numRequestsInFlight;
4242
private volatile boolean upstreamDone;
43-
private Subscription subscription;
43+
private volatile Subscription subscription;
4444

4545
private final Set<CompletableFuture<?>> requestsInFlight;
4646

@@ -75,7 +75,18 @@ public void onSubscribe(Subscription subscription) {
7575
@Override
7676
public void onNext(T item) {
7777
numRequestsInFlight.incrementAndGet();
78-
CompletableFuture<?> currentRequest = consumer.apply(item);
78+
CompletableFuture<?> currentRequest;
79+
80+
try {
81+
currentRequest = consumer.apply(item);
82+
} catch (Throwable t) {
83+
synchronized (this) {
84+
subscription.cancel();
85+
}
86+
onError(t);
87+
return;
88+
}
89+
7990
requestsInFlight.add(currentRequest);
8091
currentRequest.whenComplete((r, t) -> {
8192
checkForCompletion(numRequestsInFlight.decrementAndGet());

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelper.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,15 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
107107
iterablePublisher.subscribe(bufferingSubscriber);
108108
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);
109109

110-
allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
111-
.failedTransfers(failedFileUploads)
112-
.build()));
110+
allOfFutures.whenComplete((r, t) -> {
111+
if (t != null) {
112+
returnFuture.completeExceptionally(SdkClientException.create("Failed to send request", t));
113+
return;
114+
}
115+
returnFuture.complete(CompletedDirectoryUpload.builder()
116+
.failedTransfers(failedFileUploads)
117+
.build());
118+
});
113119
}
114120

115121
private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
1923

2024
import io.reactivex.Flowable;
2125
import io.reactivex.Observable;
@@ -36,6 +40,7 @@
3640
import org.junit.jupiter.api.Test;
3741
import org.junit.jupiter.params.ParameterizedTest;
3842
import org.junit.jupiter.params.provider.ValueSource;
43+
import org.reactivestreams.Subscription;
3944
import software.amazon.awssdk.utils.async.SimplePublisher;
4045

4146
class AsyncBufferingSubscriberTest {
@@ -108,4 +113,21 @@ void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() {
108113
assertThat(futures.get(0)).isCancelled();
109114
assertThat(futures.get(1)).isCancelled();
110115
}
116+
117+
@Test
118+
public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExceptionally() {
119+
RuntimeException exception = new RuntimeException("test");
120+
CompletableFuture<Void> future = new CompletableFuture<>();
121+
AsyncBufferingSubscriber<String> subscriber = new AsyncBufferingSubscriber<>(s -> {
122+
throw exception;
123+
}, future, 1);
124+
125+
Subscription mockSubscription = mock(Subscription.class);
126+
127+
subscriber.onSubscribe(mockSubscription);
128+
subscriber.onNext("item");
129+
130+
verify(mockSubscription, times(1)).cancel();
131+
assertThatThrownBy(future::join).hasCause(exception);
132+
}
111133
}

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.CancellationException;
4040
import java.util.concurrent.CompletableFuture;
4141
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Consumer;
4243
import java.util.function.Function;
4344
import java.util.stream.Collectors;
4445
import org.assertj.core.util.Sets;
@@ -471,6 +472,56 @@ void downloadDirectory_notDirectory_shouldCompleteFutureExceptionally(FileSystem
471472
.hasMessageContaining("is not a directory").hasCauseInstanceOf(IllegalArgumentException.class);
472473
}
473474

475+
@Test
476+
void downloadDirectory_withDownloadRequestTransformer_transformerThrows_failsDownload() {
477+
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");
478+
479+
FileDownload fileDownload = newSuccessfulDownload();
480+
FileDownload fileDownload2 = newSuccessfulDownload();
481+
482+
when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);
483+
484+
485+
RuntimeException exception = new RuntimeException("boom");
486+
Consumer<DownloadFileRequest.Builder> downloadFileRequestTransformer = b -> {
487+
throw exception;
488+
};
489+
490+
DirectoryDownload downloadDirectory =
491+
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
492+
.destination(directory)
493+
.bucket("bucket")
494+
.downloadFileRequestTransformer(downloadFileRequestTransformer)
495+
.build());
496+
497+
assertThatThrownBy(downloadDirectory.completionFuture()::join).getCause().hasCause(exception);
498+
}
499+
500+
@Test
501+
void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_failsDownload() {
502+
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");
503+
504+
FileDownload fileDownload = newSuccessfulDownload();
505+
FileDownload fileDownload2 = newSuccessfulDownload();
506+
507+
when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);
508+
509+
510+
RuntimeException exception = new RuntimeException("boom");
511+
Consumer<ListObjectsV2Request.Builder> downloadFileRequestTransformer = b -> {
512+
throw exception;
513+
};
514+
515+
DirectoryDownload downloadDirectory =
516+
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
517+
.destination(directory)
518+
.bucket("bucket")
519+
.listObjectsV2RequestTransformer(downloadFileRequestTransformer)
520+
.build());
521+
522+
assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
523+
}
524+
474525
private static DefaultFileDownload completedDownload() {
475526
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
476527
.response(GetObjectResponse.builder().build())

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/UploadDirectoryHelperTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,29 @@ void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessfully()
436436
assertThat(keys).containsOnly("2.txt");
437437
}
438438

439+
@Test
440+
public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
441+
when(singleUploadFunction.apply(any())).thenReturn(null);
442+
443+
RuntimeException exception = new RuntimeException("boom");
444+
445+
Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer = r -> {
446+
throw exception;
447+
};
448+
449+
CompletableFuture<CompletedDirectoryUpload> uploadFuture =
450+
uploadDirectoryHelper.uploadDirectory(
451+
UploadDirectoryRequest.builder()
452+
.source(directory)
453+
.bucket("bucket")
454+
.uploadFileRequestTransformer(uploadFileRequestTransformer)
455+
.build())
456+
.completionFuture();
457+
458+
assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
459+
}
460+
461+
439462
private DefaultFileUpload completedUpload() {
440463
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()
441464
.response(PutObjectResponse.builder().build())

0 commit comments

Comments
 (0)