Skip to content

Refactor S3TransferManager to support non-file-based transfers #2817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-2a00881.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Refactor S3TransferManager to support non-file-based transfers"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -25,13 +31,6 @@
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

public class CrtExceptionTransformationIntegrationTest extends S3IntegrationTestBase {

private static final String BUCKET = temporaryBucketName(CrtExceptionTransformationIntegrationTest.class);
Expand Down Expand Up @@ -87,21 +86,21 @@ public void getObjectNoSuchBucket() throws IOException {
@Test
public void transferManagerDownloadObjectWithNoSuchKey() throws IOException {
String randomBaseDirectory = Files.createTempDirectory(getClass().getSimpleName()).toString();
assertThatThrownBy(() -> transferManager.download(DownloadRequest.builder()
.getObjectRequest(GetObjectRequest.builder().bucket(BUCKET).key("randomKey").build())
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
.build()).completionFuture().join())
assertThatThrownBy(() -> transferManager.downloadFile(DownloadFileRequest.builder()
.getObjectRequest(GetObjectRequest.builder().bucket(BUCKET).key("randomKey").build())
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
.build()).completionFuture().join())
.hasCauseInstanceOf(NoSuchKeyException.class)
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist");
}

@Test
public void transferManagerDownloadObjectWithNoSuchBucket() throws IOException {
String randomBaseDirectory = Files.createTempDirectory(getClass().getSimpleName()).toString();
assertThatThrownBy(() -> transferManager.download(DownloadRequest.builder()
.getObjectRequest(GetObjectRequest.builder().bucket("nonExistingTestBucket").key(KEY).build())
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
.build()).completionFuture().join())
assertThatThrownBy(() -> transferManager.downloadFile(DownloadFileRequest.builder()
.getObjectRequest(GetObjectRequest.builder().bucket("nonExistingTestBucket").key(KEY).build())
.destination(Paths.get(randomBaseDirectory).resolve("testFile"))
.build()).completionFuture().join())
.hasCauseInstanceOf(NoSuchBucketException.class)
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchBucketException: The specified bucket does not exist");
}
Expand All @@ -127,10 +126,10 @@ public void putObjectNoSuchBucket() throws IOException {

@Test
public void transferManagerUploadObjectWithNoSuchObject() throws IOException{
assertThatThrownBy(() -> transferManager.upload(UploadRequest.builder()
.putObjectRequest(PutObjectRequest.builder().bucket("nonExistingTestBucket").key("someKey").build())
.source(testFile.toPath())
.build()).completionFuture().join())
assertThatThrownBy(() -> transferManager.uploadFile(UploadFileRequest.builder()
.putObjectRequest(PutObjectRequest.builder().bucket("nonExistingTestBucket").key("someKey").build())
.source(testFile.toPath())
.build()).completionFuture().join())
.hasCauseInstanceOf(NoSuchBucketException.class)
.hasMessageContaining("software.amazon.awssdk.services.s3.model.NoSuchBucketException: The specified bucket does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
Expand All @@ -46,7 +49,7 @@ public static void setup() throws IOException {
.build(), file.toPath());
tm = S3TransferManager.builder()
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
.build();
}

Expand All @@ -58,15 +61,30 @@ public static void cleanup() {
}

@Test
public void download_shouldWork() throws IOException {
public void download_toFile() throws IOException {
Path path = RandomTempFile.randomUncreatedFile().toPath();
Download download = tm.download(DownloadRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());
CompletedDownload completedDownload = download.completionFuture().join();
FileDownload download =
tm.downloadFile(DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());
CompletedFileDownload completedFileDownload = download.completionFuture().join();
assertThat(Md5Utils.md5AsBase64(path.toFile())).isEqualTo(Md5Utils.md5AsBase64(file));
assertThat(completedDownload.response().responseMetadata().requestId()).isNotNull();
assertThat(completedFileDownload.response().responseMetadata().requestId()).isNotNull();
}

@Test
public void download_toBytes() throws Exception {
Download<ResponseBytes<GetObjectResponse>> download =
tm.download(DownloadRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.responseTransformer(AsyncResponseTransformer.toBytes())
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());
CompletedDownload<ResponseBytes<GetObjectResponse>> completedDownload = download.completionFuture().join();
ResponseBytes<GetObjectResponse> result = completedDownload.result();
assertThat(Md5Utils.md5AsBase64(result.asByteArray())).isEqualTo(Md5Utils.md5AsBase64(file));
assertThat(result.response().responseMetadata().requestId()).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public static void teardown() {
@Test
public void uploadDirectory_filesSentCorrectly() {
String prefix = "yolo";
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
assertThat(completedUploadDirectory.failedUploads()).isEmpty();
DirectoryUpload uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedDirectoryUpload completedDirectoryUpload = uploadDirectory.completionFuture().join();
assertThat(completedDirectoryUpload.failedTransfers()).isEmpty();

List<String> keys =
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)
Expand All @@ -105,13 +105,13 @@ public void uploadDirectory_filesSentCorrectly() {
public void uploadDirectory_withDelimiter_filesSentCorrectly() {
String prefix = "hello";
String delimiter = "0";
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.delimiter(delimiter)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
assertThat(completedUploadDirectory.failedUploads()).isEmpty();
DirectoryUpload uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.delimiter(delimiter)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedDirectoryUpload completedDirectoryUpload = uploadDirectory.completionFuture().join();
assertThat(completedDirectoryUpload.failedTransfers()).isEmpty();

List<String> keys =
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.UUID;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.testutils.RandomTempFile;
Expand Down Expand Up @@ -62,16 +65,17 @@ public static void teardown() throws IOException {
}

@Test
public void upload_fileSentCorrectly() throws IOException {
Upload upload = tm.upload(UploadRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.source(testFile.toPath())
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());
public void upload_file_SentCorrectly() throws IOException {
FileUpload fileUpload =
tm.uploadFile(UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.source(testFile.toPath())
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());

CompletedUpload completedUpload = upload.completionFuture().join();
assertThat(completedUpload.response().responseMetadata().requestId()).isNotNull();
assertThat(completedUpload.response().sdkHttpResponse()).isNotNull();
CompletedFileUpload completedFileUpload = fileUpload.completionFuture().join();
assertThat(completedFileUpload.response().responseMetadata().requestId()).isNotNull();
assertThat(completedFileUpload.response().sdkHttpResponse()).isNotNull();

ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());
Expand All @@ -80,4 +84,27 @@ public void upload_fileSentCorrectly() throws IOException {
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
}

@Test
public void upload_asyncRequestBody_SentCorrectly() throws IOException {
String content = UUID.randomUUID().toString();

Upload upload =
tm.upload(UploadRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.requestBody(AsyncRequestBody.fromString(content))
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());

CompletedUpload completedUpload = upload.completionFuture().join();
assertThat(completedUpload.response().responseMetadata().requestId()).isNotNull();
assertThat(completedUpload.response().sdkHttpResponse()).isNotNull();

ResponseInputStream<GetObjectResponse> obj = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());

assertThat(ChecksumUtils.computeCheckSum(content.getBytes(StandardCharsets.UTF_8)))
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3;

import java.util.Collection;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;

/**
* A completed directory-based transfer.
*
* @see CompletedDirectoryUpload
*/
@SdkPublicApi
@SdkPreviewApi
public interface CompletedDirectoryTransfer extends CompletedTransfer {

/**
* An immutable collection of failed transfers with error details, request metadata about each file that is failed to
* transfer.
*
* <p>
* Failed single object transfers can be retried by calling {@link S3TransferManager#uploadFile(UploadFileRequest)} or
* {@link S3TransferManager#downloadFile(DownloadFileRequest)}.
*
* <pre>
* {@code
* // Retrying failed uploads if the exception is retryable
* List<CompletableFuture<CompletedUpload>> futures =
* completedDirectoryUpload.failedTransfers()
* .stream()
* .filter(failedSingleFileUpload -> isRetryable(failedSingleFileUpload.exception()))
* .map(failedSingleFileUpload ->
* tm.upload(failedSingleFileUpload.request()).completionFuture())
* .collect(Collectors.toList());
* CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
* }
* </pre>
*
* @return a list of failed transfers
*/
Collection<? extends FailedObjectTransfer> failedTransfers();
}
Loading