Skip to content

Support upload directory in s3 transfer manager #2743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<slf4j.version>1.7.30</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<commons.io.version>2.5</commons.io.version>
<equalsverifier.version>3.5</equalsverifier.version>
<equalsverifier.version>3.7.1</equalsverifier.version>
<!-- Update netty-open-ssl-version accordingly whenever we update netty version-->
<!-- https://github.com/netty/netty/blob/4.1/pom.xml search "tcnative.version" -->
<netty.version>4.1.68.Final</netty.version>
Expand Down
10 changes: 10 additions & 0 deletions services-custom/s3-transfer-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.jimfs</groupId>
<artifactId>jimfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.utils.IoUtils.closeQuietly;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.testutils.FileUtils;
import software.amazon.awssdk.utils.Logger;

public class S3TransferManagerUploadDirectoryIntegrationTest extends S3IntegrationTestBase {
private static final Logger log = Logger.loggerFor(S3TransferManagerUploadDirectoryIntegrationTest.class);
private static final String TEST_BUCKET = temporaryBucketName(S3TransferManagerUploadIntegrationTest.class);

private static S3TransferManager tm;
private static Path directory;
private static S3Client s3Client;
private static String randomString;

@BeforeClass
public static void setUp() throws Exception {
S3IntegrationTestBase.setUp();
createBucket(TEST_BUCKET);
randomString = RandomStringUtils.random(100);
directory = createLocalTestDirectory();

tm = S3TransferManager.builder()
.s3ClientConfiguration(b -> b.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.maxConcurrency(100))
.build();

s3Client = S3Client.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).region(DEFAULT_REGION)
.build();
}

@AfterClass
public static void teardown() {
try {
FileUtils.cleanUpTestDirectory(directory);
} catch (Exception exception) {
log.warn(() -> "Failed to clean up test directory " + directory, exception);
}

try {
deleteBucketAndAllContents(TEST_BUCKET);
} catch (Exception exception) {
log.warn(() -> "Failed to delete s3 bucket " + TEST_BUCKET, exception);
}

closeQuietly(tm, log.logger());
closeQuietly(s3Client, log.logger());
S3IntegrationTestBase.cleanUp();
}

@Test
public void uploadDirectory_filesSentCorrectly() {
String prefix = "yolo";
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
assertThat(completedUploadDirectory.failedUploads()).isEmpty();

List<String> keys =
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)
.collect(Collectors.toList());

assertThat(keys).containsOnly(prefix + "/bar.txt", prefix + "/foo/1.txt", prefix + "/foo/2.txt");

keys.forEach(k -> verifyContent(k, k.substring(prefix.length() + 1) + randomString));
}

@Test
public void uploadDirectory_withDelimiter_filesSentCorrectly() {
String prefix = "hello";
String delimiter = "0";
UploadDirectoryTransfer uploadDirectory = tm.uploadDirectory(u -> u.sourceDirectory(directory)
.bucket(TEST_BUCKET)
.delimiter(delimiter)
.prefix(prefix)
.overrideConfiguration(o -> o.recursive(true)));
CompletedUploadDirectory completedUploadDirectory = uploadDirectory.completionFuture().join();
assertThat(completedUploadDirectory.failedUploads()).isEmpty();

List<String> keys =
s3Client.listObjectsV2Paginator(b -> b.bucket(TEST_BUCKET).prefix(prefix)).contents().stream().map(S3Object::key)
.collect(Collectors.toList());

assertThat(keys).containsOnly(prefix + "0bar.txt", prefix + "0foo01.txt", prefix + "0foo02.txt");
keys.forEach(k -> {
String path = k.replace(delimiter, "/");
verifyContent(k, path.substring(prefix.length() + 1) + randomString);
});
}

private static Path createLocalTestDirectory() throws IOException {
Path directory = Files.createTempDirectory("test");

String directoryName = directory.toString();

Files.createDirectory(Paths.get(directory + "/foo"));
Files.write(Paths.get(directoryName, "bar.txt"), ("bar.txt" + randomString).getBytes(StandardCharsets.UTF_8));
Files.write(Paths.get(directoryName, "foo/1.txt"), ("foo/1.txt" + randomString).getBytes(StandardCharsets.UTF_8));
Files.write(Paths.get(directoryName, "foo/2.txt"), ("foo/2.txt" + randomString).getBytes(StandardCharsets.UTF_8));

return directory;
}

private static void verifyContent(String key, String expectedContent) {
String actualContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(key),
ResponseTransformer.toBytes()).asUtf8String();

assertThat(actualContent).isEqualTo(expectedContent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,95 @@

import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.utils.Validate;

/**
* A completed download transfer.
* Represents a completed download transfer from Amazon S3. It can be used to track
* the underlying {@link GetObjectResponse}
*
* @see S3TransferManager#download(DownloadRequest)
*/
@SdkPublicApi
@SdkPreviewApi
public interface CompletedDownload extends CompletedTransfer {
public final class CompletedDownload implements CompletedTransfer {
private final GetObjectResponse response;

private CompletedDownload(DefaultBuilder builder) {
this.response = Validate.paramNotNull(builder.response, "response");
}

/**
* Returns the API response from the {@link S3TransferManager#download(DownloadRequest)}
* @return the response
*/
GetObjectResponse response();
public GetObjectResponse response() {
return response;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CompletedDownload that = (CompletedDownload) o;

return response.equals(that.response);
}

@Override
public int hashCode() {
return response.hashCode();
}

public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder {
/**
* Specify the {@link GetObjectResponse} from {@link S3AsyncClient#getObject}
*
* @param response the response
* @return This builder for method chaining.
*/
Builder response(GetObjectResponse response);

/**
* Builds a {@link CompletedUpload} based on the properties supplied to this builder
* @return An initialized {@link CompletedUpload}
*/
CompletedDownload build();
}

private static final class DefaultBuilder implements Builder {
private GetObjectResponse response;

private DefaultBuilder() {
}

@Override
public Builder response(GetObjectResponse response) {
this.response = response;
return this;
}

public void setResponse(GetObjectResponse response) {
response(response);
}

public GetObjectResponse getResponse() {
return response;
}

@Override
public CompletedDownload build() {
return new CompletedDownload(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,98 @@

import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;

/**
* A completed upload transfer.
* Represents a completed upload transfer to Amazon S3. It can be used to track
* the underlying {@link PutObjectResponse}
*
* @see S3TransferManager#upload(UploadRequest)
*/
@SdkPublicApi
@SdkPreviewApi
public interface CompletedUpload extends CompletedTransfer {
public final class CompletedUpload implements CompletedTransfer {
private final PutObjectResponse response;

private CompletedUpload(DefaultBuilder builder) {
this.response = Validate.paramNotNull(builder.response, "response");
}

/**
* Returns the API response from the {@link S3TransferManager#upload(UploadRequest)}
* @return the response
*/
PutObjectResponse response();
public PutObjectResponse response() {
return response;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CompletedUpload that = (CompletedUpload) o;

return response.equals(that.response);
}

@Override
public int hashCode() {
return response.hashCode();
}

@Override
public String toString() {
return ToString.builder("CompletedUpload")
.add("response", response)
.build();
}

/**
* Creates a default builder for {@link CompletedUpload}.
*/
public static Builder builder() {
return new DefaultBuilder();
}

public interface Builder {
/**
* Specify the {@link PutObjectResponse} from {@link S3AsyncClient#putObject}
*
* @param response the response
* @return This builder for method chaining.
*/
Builder response(PutObjectResponse response);

/**
* Builds a {@link CompletedUpload} based on the properties supplied to this builder
* @return An initialized {@link CompletedUpload}
*/
CompletedUpload build();
}

private static class DefaultBuilder implements Builder {
private PutObjectResponse response;

private DefaultBuilder() {
}

@Override
public Builder response(PutObjectResponse response) {
this.response = response;
return this;
}

@Override
public CompletedUpload build() {
return new CompletedUpload(this);
}
}
}
Loading