Skip to content

Commit e5d8d08

Browse files
committed
Implement s3 transfermanager upload object
1 parent 028fe92 commit e5d8d08

File tree

24 files changed

+1345
-158
lines changed

24 files changed

+1345
-158
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import software.amazon.awssdk.annotations.SdkInternalApi;
3131
import software.amazon.awssdk.core.async.AsyncRequestBody;
3232
import software.amazon.awssdk.core.internal.util.NoopSubscription;
33+
import software.amazon.awssdk.utils.Logger;
34+
import software.amazon.awssdk.utils.Validate;
3335
import software.amazon.awssdk.utils.builder.SdkBuilder;
3436

3537
/**
@@ -40,6 +42,7 @@
4042
*/
4143
@SdkInternalApi
4244
public final class FileAsyncRequestBody implements AsyncRequestBody {
45+
private static final Logger log = Logger.loggerFor(FileAsyncRequestBody.class);
4346

4447
/**
4548
* Default size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
@@ -56,9 +59,12 @@ public final class FileAsyncRequestBody implements AsyncRequestBody {
5659
*/
5760
private final int chunkSizeInBytes;
5861

62+
private final long position;
63+
5964
private FileAsyncRequestBody(DefaultBuilder builder) {
6065
this.path = builder.path;
6166
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
67+
this.position = builder.position == null ? 0 : Validate.isNotNegative(builder.position, "position");
6268
}
6369

6470
@Override
@@ -78,7 +84,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
7884
// We need to synchronize here because the subscriber could call
7985
// request() from within onSubscribe which would potentially
8086
// trigger onNext before onSubscribe is finished.
81-
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
87+
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes, position);
8288
synchronized (subscription) {
8389
s.onSubscribe(subscription);
8490
}
@@ -122,12 +128,21 @@ public interface Builder extends SdkBuilder<Builder, FileAsyncRequestBody> {
122128
*/
123129
Builder chunkSizeInBytes(Integer chunkSize);
124130

131+
/**
132+
* Sets the file position at which the request body begins.
133+
*
134+
* @param position the position of the file
135+
* @return The builder for method chaining.
136+
*/
137+
Builder position(Long position);
138+
125139
}
126140

127141
private static final class DefaultBuilder implements Builder {
128142

129143
private Path path;
130144
private Integer chunkSizeInBytes;
145+
private Long position;
131146

132147
@Override
133148
public Builder path(Path path) {
@@ -149,6 +164,17 @@ public void setChunkSizeInBytes(Integer chunkSizeInBytes) {
149164
chunkSizeInBytes(chunkSizeInBytes);
150165
}
151166

167+
168+
@Override
169+
public Builder position(Long position) {
170+
this.position = position;
171+
return this;
172+
}
173+
174+
public void setPosition(Long position) {
175+
position(position);
176+
}
177+
152178
@Override
153179
public FileAsyncRequestBody build() {
154180
return new FileAsyncRequestBody(this);
@@ -163,15 +189,19 @@ private static final class FileSubscription implements Subscription {
163189
private final Subscriber<? super ByteBuffer> subscriber;
164190
private final int chunkSize;
165191

166-
private long position = 0;
192+
private long position;
167193
private AtomicLong outstandingDemand = new AtomicLong(0);
168194
private boolean writeInProgress = false;
169195
private volatile boolean done = false;
170196

171-
private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
197+
private FileSubscription(AsynchronousFileChannel inputChannel,
198+
Subscriber<? super ByteBuffer> subscriber,
199+
int chunkSize,
200+
long position) {
172201
this.inputChannel = inputChannel;
173202
this.subscriber = subscriber;
174203
this.chunkSize = chunkSize;
204+
this.position = position;
175205
}
176206

177207
@Override

services-custom/s3-transfermanager/src/it/java/software/amazon/awssdk/custom/s3/transfer/DownloadIntegrationTest.java

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
package software.amazon.awssdk.custom.s3.transfer;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.custom.s3.transfer.TransferManagerTestUtils.computeMd5;
20+
import static software.amazon.awssdk.custom.s3.transfer.TransferManagerTestUtils.tryDeleteFiles;
21+
1922
import java.io.IOException;
20-
import java.io.InputStream;
21-
import java.io.UncheckedIOException;
2223
import java.nio.file.Files;
2324
import java.nio.file.Path;
2425
import java.nio.file.Paths;
25-
import java.nio.file.StandardOpenOption;
2626
import java.security.MessageDigest;
2727
import java.security.NoSuchAlgorithmException;
2828
import org.junit.AfterClass;
@@ -31,7 +31,6 @@
3131
import software.amazon.awssdk.custom.s3.transfer.util.SizeConstant;
3232
import software.amazon.awssdk.testutils.RandomTempFile;
3333
import software.amazon.awssdk.testutils.service.S3BucketUtils;
34-
import software.amazon.awssdk.utils.BinaryUtils;
3534

3635
/**
3736
* Integration test for TransferManager downloads.
@@ -74,8 +73,8 @@ public static void setup() throws Exception {
7473
testFile8KiB = new RandomTempFile(8 * SizeConstant.KiB).toPath();
7574
testFile16MiB = new RandomTempFile(16 * SizeConstant.MiB).toPath();
7675

77-
testFile8KiBDigest = computeMd5(testFile8KiB);
78-
testFile16MiBDigest = computeMd5(testFile16MiB);
76+
testFile8KiBDigest = computeMd5(testFile8KiB, MD5_DIGEST);
77+
testFile16MiBDigest = computeMd5(testFile16MiB, MD5_DIGEST);
7978

8079
createBucket(BUCKET);
8180
putFile(KEY_8KiB, testFile8KiB);
@@ -106,7 +105,7 @@ private static void downloadTest(String key, String expectedMd5) throws IOExcept
106105
Path tempFile = createTempPath();
107106
try {
108107
transferManager.download(BUCKET, key, tempFile).completionFuture().join();
109-
String downloadedFileMd5 = computeMd5(tempFile);
108+
String downloadedFileMd5 = computeMd5(tempFile, MD5_DIGEST);
110109
assertThat(downloadedFileMd5).isEqualTo(expectedMd5);
111110
} finally {
112111
Files.deleteIfExists(tempFile);
@@ -117,27 +116,5 @@ private static Path createTempPath() {
117116
return TMP_DIR.resolve(DownloadIntegrationTest.class.getSimpleName() + "-" + System.currentTimeMillis());
118117
}
119118

120-
private static String computeMd5(Path file) {
121-
try (InputStream is = Files.newInputStream(file, StandardOpenOption.READ)) {
122-
MD5_DIGEST.reset();
123-
byte[] buff = new byte[4096];
124-
int read;
125-
while ((read = is.read(buff)) != -1) {
126-
MD5_DIGEST.update(buff, 0, read);
127-
}
128-
return BinaryUtils.toBase64(MD5_DIGEST.digest());
129-
} catch (IOException e) {
130-
throw new UncheckedIOException(e);
131-
}
132-
}
133119

134-
private static void tryDeleteFiles(Path... files) {
135-
for (Path file : files) {
136-
try {
137-
Files.deleteIfExists(file);
138-
} catch (IOException e) {
139-
System.err.println("Could not delete file " + file);
140-
}
141-
}
142-
}
143120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.custom.s3.transfer;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.UncheckedIOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.StandardOpenOption;
24+
import java.security.MessageDigest;
25+
import software.amazon.awssdk.utils.BinaryUtils;
26+
27+
public final class TransferManagerTestUtils {
28+
29+
30+
private TransferManagerTestUtils() {
31+
32+
}
33+
34+
public static void tryDeleteFiles(Path... files) {
35+
for (Path file : files) {
36+
try {
37+
Files.deleteIfExists(file);
38+
} catch (IOException e) {
39+
System.err.println("Could not delete file " + file);
40+
}
41+
}
42+
}
43+
44+
public static String computeMd5(Path file, MessageDigest messageDigest) {
45+
try (InputStream is = Files.newInputStream(file, StandardOpenOption.READ)) {
46+
messageDigest.reset();
47+
byte[] buff = new byte[4096];
48+
int read;
49+
while ((read = is.read(buff)) != -1) {
50+
messageDigest.update(buff, 0, read);
51+
}
52+
return BinaryUtils.toBase64(messageDigest.digest());
53+
} catch (IOException e) {
54+
throw new UncheckedIOException(e);
55+
}
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.custom.s3.transfer;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.custom.s3.transfer.TransferManagerTestUtils.computeMd5;
20+
import static software.amazon.awssdk.custom.s3.transfer.TransferManagerTestUtils.tryDeleteFiles;
21+
22+
import java.io.IOException;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.security.MessageDigest;
26+
import java.security.NoSuchAlgorithmException;
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
31+
import software.amazon.awssdk.custom.s3.transfer.util.SizeConstant;
32+
import software.amazon.awssdk.testutils.RandomTempFile;
33+
import software.amazon.awssdk.testutils.service.S3BucketUtils;
34+
35+
/**
36+
* Integration test for TransferManager uploads.
37+
*/
38+
public class UploadIntegrationTest extends S3TransferManagerIntegrationTestBase {
39+
private static S3TransferManager transferManager;
40+
private static final String BUCKET = S3BucketUtils.temporaryBucketName(UploadIntegrationTest.class);
41+
private static final String KEY_8KiB = "8kb_test_file.dat";
42+
private static final String KEY_16MiB = "16mb_test_file.dat";
43+
private static Path testFile8KiB;
44+
private static Path testFile16MiB;
45+
private static final MessageDigest MD5_DIGEST;
46+
private static String testFile8KiBDigest;
47+
private static String testFile16MiBDigest;
48+
49+
static {
50+
try {
51+
MD5_DIGEST = MessageDigest.getInstance("MD5");
52+
} catch (NoSuchAlgorithmException e) {
53+
throw new RuntimeException("Could not instantiate MD5 digest");
54+
}
55+
}
56+
57+
@BeforeClass
58+
public static void setup() throws Exception {
59+
S3TransferManagerIntegrationTestBase.setUp();
60+
61+
transferManager = S3TransferManager.builder()
62+
.s3client(s3Async)
63+
.multipartUploadConfiguration(b -> b.enableMultipartUploads(true)
64+
.multipartUploadThreshold(16 * SizeConstant.MiB - 1))
65+
.build();
66+
67+
testFile8KiB = new RandomTempFile(8 * SizeConstant.KiB).toPath();
68+
testFile16MiB = new RandomTempFile(16 * SizeConstant.MiB).toPath();
69+
70+
testFile8KiBDigest = computeMd5(testFile8KiB, MD5_DIGEST);
71+
testFile16MiBDigest = computeMd5(testFile16MiB, MD5_DIGEST);
72+
73+
createBucket(BUCKET);
74+
}
75+
76+
@AfterClass
77+
public static void teardown() {
78+
deleteBucketAndAllContents(BUCKET);
79+
tryDeleteFiles(testFile8KiB, testFile16MiB);
80+
}
81+
82+
@Test
83+
public void singlePartUpload_apiRequest() throws IOException {
84+
transferManager.upload(BUCKET, KEY_8KiB, testFile8KiB).completionFuture().join();
85+
verify(KEY_8KiB, testFile8KiBDigest);
86+
}
87+
88+
89+
@Test
90+
public void multiPartUpload_apiRequest() throws IOException {
91+
transferManager.upload(BUCKET, KEY_16MiB, testFile16MiB).completionFuture().join();
92+
verify(KEY_16MiB, testFile16MiBDigest);
93+
}
94+
95+
private static void verify(String key, String expectedMd5) throws IOException {
96+
Path tempFile = RandomTempFile.randomUncreatedFile().toPath();
97+
try {
98+
s3Async.getObject(b -> b.bucket(BUCKET).key(key), AsyncResponseTransformer.toFile(tempFile)).join();
99+
String downloadedFileMd5 = computeMd5(tempFile, MD5_DIGEST);
100+
assertThat(downloadedFileMd5).isEqualTo(expectedMd5);
101+
} finally {
102+
Files.deleteIfExists(tempFile);
103+
}
104+
}
105+
}

services-custom/s3-transfermanager/src/main/java/software/amazon/awssdk/custom/s3/transfer/DownloadObjectSpecification.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,14 @@
1515

1616
package software.amazon.awssdk.custom.s3.transfer;
1717

18-
import java.net.URL;
1918
import software.amazon.awssdk.annotations.SdkPublicApi;
2019
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2120

2221
/**
2322
* Union of the various ways to specify how to download an object from S3.
2423
*/
2524
@SdkPublicApi
26-
public abstract class DownloadObjectSpecification {
27-
28-
DownloadObjectSpecification() {
29-
}
30-
31-
/**
32-
* @return {@code true} if this is a presigned URL, {@code false} otherwise.
33-
*/
34-
public boolean isPresignedUrl() {
35-
return false;
36-
}
37-
38-
/**
39-
* @return {@code true} if this is an API request, {@code false} otherwise.
40-
*/
41-
public boolean isApiRequest() {
42-
return false;
43-
}
44-
45-
/**
46-
* @return This specification as a presigned URL.
47-
* @throws IllegalStateException If this is not a presigned URL.
48-
*/
49-
public URL asPresignedUrl() {
50-
throw new IllegalStateException("Not a presigned URL");
51-
}
25+
public abstract class DownloadObjectSpecification implements TransferSpecification {
5226

5327
/**
5428
* @return This specification as an API request.

services-custom/s3-transfermanager/src/main/java/software/amazon/awssdk/custom/s3/transfer/DownloadRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Collection;
1919
import java.util.Objects;
20+
import java.util.Optional;
2021
import software.amazon.awssdk.annotations.SdkPublicApi;
2122
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2223
import software.amazon.awssdk.utils.Validate;
@@ -47,8 +48,8 @@ public DownloadObjectSpecification downloadSpecification() {
4748
* downloads are enabled, this allows the Transfer Manager to omit a call
4849
* to S3 to get the object size.
4950
*/
50-
public Long size() {
51-
return size;
51+
public Optional<Long> size() {
52+
return Optional.ofNullable(size);
5253
}
5354

5455
@Override

0 commit comments

Comments
 (0)