Skip to content

Commit 20ca190

Browse files
committed
Implement S3 CRT upload
1 parent c76d19b commit 20ca190

File tree

8 files changed

+891
-17
lines changed

8 files changed

+891
-17
lines changed

services/s3/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<dependency>
6565
<groupId>software.amazon.awssdk.crt</groupId>
6666
<artifactId>aws-crt</artifactId>
67-
<version>${awscrt.version}</version>
67+
<version>1.0.0-SNAPSHOT</version>
6868
</dependency>
6969
<dependency>
7070
<groupId>software.amazon.awssdk</groupId>
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
20+
import io.reactivex.Flowable;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.nio.file.Files;
24+
import java.security.NoSuchAlgorithmException;
25+
import java.util.List;
26+
import java.util.Optional;
27+
import java.util.Random;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
30+
import org.junit.After;
31+
import org.junit.AfterClass;
32+
import org.junit.Before;
33+
import org.junit.BeforeClass;
34+
import org.junit.Test;
35+
import org.reactivestreams.Subscriber;
36+
import software.amazon.awssdk.core.ResponseBytes;
37+
import software.amazon.awssdk.core.ResponseInputStream;
38+
import software.amazon.awssdk.core.async.AsyncRequestBody;
39+
import software.amazon.awssdk.core.sync.ResponseTransformer;
40+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
41+
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
42+
import software.amazon.awssdk.testutils.RandomTempFile;
43+
44+
public class CrtClientIntegrationTest extends S3IntegrationTestBase {
45+
private static final String TEST_BUCKET = temporaryBucketName(CrtClientIntegrationTest.class);
46+
private static final String TEST_KEY = "8mib_file.dat";
47+
private static final int OBJ_SIZE = 8 * 1024 * 1024;
48+
49+
private static RandomTempFile testFile;
50+
51+
private S3CrtAsyncClient s3Crt;
52+
53+
@BeforeClass
54+
public static void setup() throws Exception {
55+
S3IntegrationTestBase.setUp();
56+
createBucket(TEST_BUCKET);
57+
58+
testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE);
59+
}
60+
61+
@Before
62+
public void methodSetup() {
63+
s3Crt = S3CrtAsyncClient.builder()
64+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
65+
.region(DEFAULT_REGION)
66+
.build();
67+
}
68+
69+
@After
70+
public void methodTeardown() {
71+
s3Crt.close();
72+
}
73+
74+
@AfterClass
75+
public static void teardown() throws IOException {
76+
deleteBucketAndAllContents(TEST_BUCKET);
77+
Files.delete(testFile.toPath());
78+
}
79+
80+
@Test
81+
public void putObject_fileRequestBody_objectSentCorrectly() throws IOException, NoSuchAlgorithmException {
82+
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
83+
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();
84+
85+
ResponseInputStream<GetObjectResponse> objContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
86+
ResponseTransformer.toInputStream());
87+
88+
byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
89+
90+
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
91+
}
92+
93+
@Test
94+
public void putObject_byteBufferBody_objectSentCorrectly() throws IOException, NoSuchAlgorithmException {
95+
byte[] data = new byte[16384];
96+
new Random().nextBytes(data);
97+
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
98+
99+
AsyncRequestBody body = AsyncRequestBody.fromByteBuffer(byteBuffer);
100+
101+
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();
102+
103+
ResponseBytes<GetObjectResponse> responseBytes = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
104+
ResponseTransformer.toBytes());
105+
106+
byte[] expectedSum = ChecksumUtils.computeCheckSum(byteBuffer);
107+
108+
assertThat(ChecksumUtils.computeCheckSum(responseBytes.asByteBuffer())).isEqualTo(expectedSum);
109+
}
110+
111+
@Test
112+
public void putObject_customRequestBody_objectSentCorrectly() throws IOException, NoSuchAlgorithmException {
113+
Random rng = new Random();
114+
int bufferSize = 16384;
115+
int nBuffers = 15;
116+
List<ByteBuffer> bodyData = Stream.generate(() -> {
117+
byte[] data = new byte[bufferSize];
118+
rng.nextBytes(data);
119+
return ByteBuffer.wrap(data);
120+
}).limit(nBuffers).collect(Collectors.toList());
121+
122+
long contentLength = bufferSize * nBuffers;
123+
124+
byte[] expectedSum = ChecksumUtils.computeCheckSum(bodyData);
125+
126+
Flowable<ByteBuffer> publisher = Flowable.fromIterable(bodyData);
127+
128+
AsyncRequestBody customRequestBody = new AsyncRequestBody() {
129+
@Override
130+
public Optional<Long> contentLength() {
131+
return Optional.of(contentLength);
132+
}
133+
134+
@Override
135+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
136+
publisher.subscribe(subscriber);
137+
}
138+
};
139+
140+
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), customRequestBody).join();
141+
142+
ResponseInputStream<GetObjectResponse> objContent = s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
143+
ResponseTransformer.toInputStream());
144+
145+
146+
assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
147+
}
148+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/DefaultS3CrtAsyncClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static software.amazon.awssdk.services.s3.internal.S3CrtUtils.createCrtCredentialsProvider;
2020

21+
import com.amazonaws.s3.RequestDataSupplier;
2122
import com.amazonaws.s3.S3NativeClient;
2223
import java.util.concurrent.CompletableFuture;
2324
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -26,6 +27,11 @@
2627
import software.amazon.awssdk.services.s3.S3CrtAsyncClient;
2728
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2829
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
30+
import com.amazonaws.s3.model.PutObjectOutput;
31+
import software.amazon.awssdk.core.async.AsyncRequestBody;
32+
import software.amazon.awssdk.services.s3.internal.s3crt.RequestDataSupplierAdapter;
33+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
34+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
2935

3036
@SdkInternalApi
3137
public final class DefaultS3CrtAsyncClient implements S3CrtAsyncClient {
@@ -74,6 +80,20 @@ public <ReturnT> CompletableFuture<ReturnT> getObject(
7480
return future;
7581
}
7682

83+
public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) {
84+
com.amazonaws.s3.model.PutObjectRequest adaptedRequest = S3CrtUtils.toCrtPutObjectRequest(putObjectRequest);
85+
86+
if (adaptedRequest.contentLength() == null && requestBody.contentLength().isPresent()) {
87+
adaptedRequest = adaptedRequest.toBuilder().contentLength(requestBody.contentLength().get())
88+
.build();
89+
}
90+
91+
CompletableFuture<PutObjectOutput> putObjectOutputCompletableFuture = s3NativeClient.putObject(adaptedRequest,
92+
adaptToDataSupplier(requestBody));
93+
94+
return putObjectOutputCompletableFuture.thenApply(S3CrtUtils::fromCrtPutObjectOutput);
95+
}
96+
7797
@Override
7898
public String serviceName() {
7999
return "s3";
@@ -84,4 +104,8 @@ public void close() {
84104
s3NativeClient.close();
85105
configuration.close();
86106
}
107+
108+
private static RequestDataSupplier adaptToDataSupplier(AsyncRequestBody requestBody) {
109+
return new RequestDataSupplierAdapter(requestBody);
110+
}
87111
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/S3CrtUtils.java

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515

1616
package software.amazon.awssdk.services.s3.internal;
1717

18+
import com.amazonaws.s3.model.ObjectCannedACL;
19+
import com.amazonaws.s3.model.ObjectLockLegalHoldStatus;
20+
import com.amazonaws.s3.model.ObjectLockMode;
21+
import com.amazonaws.s3.model.PutObjectOutput;
22+
import com.amazonaws.s3.model.RequestPayer;
23+
import com.amazonaws.s3.model.ServerSideEncryption;
24+
import com.amazonaws.s3.model.StorageClass;
1825
import software.amazon.awssdk.annotations.SdkInternalApi;
1926
import software.amazon.awssdk.auth.credentials.AwsCredentials;
2027
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -23,6 +30,8 @@
2330
import software.amazon.awssdk.crt.auth.credentials.StaticCredentialsProvider;
2431
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
2532
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
33+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
34+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
2635

2736
@SdkInternalApi
2837
public final class S3CrtUtils {
@@ -64,21 +73,82 @@ public static com.amazonaws.s3.model.GetObjectRequest adaptGetObjectRequest(GetO
6473
// TODO: codegen
6574
public static GetObjectResponse adaptGetObjectOutput(com.amazonaws.s3.model.GetObjectOutput response) {
6675
return GetObjectResponse.builder()
67-
.bucketKeyEnabled(response.bucketKeyEnabled())
68-
.acceptRanges(response.acceptRanges())
69-
.contentDisposition(response.contentDisposition())
70-
.cacheControl(response.cacheControl())
71-
.contentEncoding(response.contentEncoding())
72-
.contentLanguage(response.contentLanguage())
73-
.contentRange(response.contentRange())
74-
.contentLength(response.contentLength())
75-
.contentType(response.contentType())
76-
.deleteMarker(response.deleteMarker())
77-
.eTag(response.eTag())
78-
.expiration(response.expiration())
79-
.expires(response.expires())
80-
.lastModified(response.lastModified())
81-
.metadata(response.metadata())
82-
.build();
76+
.bucketKeyEnabled(response.bucketKeyEnabled())
77+
.acceptRanges(response.acceptRanges())
78+
.contentDisposition(response.contentDisposition())
79+
.cacheControl(response.cacheControl())
80+
.contentEncoding(response.contentEncoding())
81+
.contentLanguage(response.contentLanguage())
82+
.contentRange(response.contentRange())
83+
.contentLength(response.contentLength())
84+
.contentType(response.contentType())
85+
.deleteMarker(response.deleteMarker())
86+
.eTag(response.eTag())
87+
.expiration(response.expiration())
88+
.expires(response.expires())
89+
.lastModified(response.lastModified())
90+
.metadata(response.metadata())
91+
.build();
92+
}
93+
94+
//TODO: codegen
95+
public static com.amazonaws.s3.model.PutObjectRequest toCrtPutObjectRequest(PutObjectRequest sdkPutObject) {
96+
return com.amazonaws.s3.model.PutObjectRequest.builder()
97+
.contentLength(sdkPutObject.contentLength())
98+
.aCL(ObjectCannedACL.fromValue(sdkPutObject.aclAsString()))
99+
.bucket(sdkPutObject.bucket())
100+
.key(sdkPutObject.key())
101+
.bucketKeyEnabled(sdkPutObject.bucketKeyEnabled())
102+
.cacheControl(sdkPutObject.cacheControl())
103+
.contentDisposition(sdkPutObject.contentDisposition())
104+
.contentEncoding(sdkPutObject.contentEncoding())
105+
.contentLanguage(sdkPutObject.contentLanguage())
106+
.contentMD5(sdkPutObject.contentMD5())
107+
.contentType(sdkPutObject.contentType())
108+
.expectedBucketOwner(sdkPutObject.expectedBucketOwner())
109+
.expires(sdkPutObject.expires())
110+
.grantFullControl(sdkPutObject.grantFullControl())
111+
.grantRead(sdkPutObject.grantRead())
112+
.grantReadACP(sdkPutObject.grantReadACP())
113+
.grantWriteACP(sdkPutObject.grantWriteACP())
114+
.metadata(sdkPutObject.metadata())
115+
.objectLockLegalHoldStatus(ObjectLockLegalHoldStatus.fromValue(sdkPutObject.objectLockLegalHoldStatusAsString()))
116+
.objectLockMode(ObjectLockMode.fromValue(sdkPutObject.objectLockModeAsString()))
117+
.objectLockRetainUntilDate(sdkPutObject.objectLockRetainUntilDate())
118+
.requestPayer(RequestPayer.fromValue(sdkPutObject.requestPayerAsString()))
119+
.serverSideEncryption(ServerSideEncryption.fromValue(sdkPutObject.requestPayerAsString()))
120+
.sSECustomerAlgorithm(sdkPutObject.sseCustomerAlgorithm())
121+
.sSECustomerKey(sdkPutObject.sseCustomerKey())
122+
.sSECustomerKeyMD5(sdkPutObject.sseCustomerKeyMD5())
123+
.sSEKMSEncryptionContext(sdkPutObject.ssekmsEncryptionContext())
124+
.sSEKMSKeyId(sdkPutObject.ssekmsKeyId())
125+
.storageClass(StorageClass.fromValue(sdkPutObject.storageClassAsString()))
126+
.tagging(sdkPutObject.tagging())
127+
.websiteRedirectLocation(sdkPutObject.websiteRedirectLocation())
128+
.build();
129+
}
130+
131+
//TODO: codegen
132+
public static PutObjectResponse fromCrtPutObjectOutput(PutObjectOutput crtPutObjectOutput) {
133+
// TODO: Provide the HTTP request-level data (e.g. response metadata, HTTP response)
134+
PutObjectResponse.Builder builder = PutObjectResponse.builder()
135+
.bucketKeyEnabled(crtPutObjectOutput.bucketKeyEnabled())
136+
.eTag(crtPutObjectOutput.eTag())
137+
.expiration(crtPutObjectOutput.expiration())
138+
.sseCustomerAlgorithm(crtPutObjectOutput.sSECustomerAlgorithm())
139+
.sseCustomerKeyMD5(crtPutObjectOutput.sSECustomerKeyMD5())
140+
.ssekmsEncryptionContext(crtPutObjectOutput.sSEKMSEncryptionContext())
141+
.ssekmsKeyId(crtPutObjectOutput.sSEKMSKeyId())
142+
.versionId(crtPutObjectOutput.versionId());
143+
144+
if (crtPutObjectOutput.requestCharged() != null) {
145+
builder.requestCharged(crtPutObjectOutput.requestCharged().value());
146+
}
147+
148+
if (crtPutObjectOutput.serverSideEncryption() != null) {
149+
builder.serverSideEncryption(crtPutObjectOutput.serverSideEncryption().value());
150+
}
151+
152+
return builder.build();
83153
}
84154
}

0 commit comments

Comments
 (0)