Skip to content

Commit c76d19b

Browse files
committed
Implement getObject in S3CrtAsyncClient
1 parent 0102460 commit c76d19b

File tree

11 files changed

+893
-7
lines changed

11 files changed

+893
-7
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
<rxjava.version>2.1.9</rxjava.version>
113113
<commons-codec.verion>1.10</commons-codec.verion>
114114
<jmh.version>1.21</jmh.version>
115-
<awscrt.version>0.10.9</awscrt.version>
115+
<awscrt.version>1.0.0-SNAPSHOT</awscrt.version>
116116

117117
<!-- Test dependencies -->
118118
<junit.version>4.12</junit.version>
@@ -163,7 +163,7 @@
163163
<unitils.version>3.3</unitils.version>
164164

165165
<!-- Reactive Streams version -->
166-
<reactive-streams.version>1.0.2</reactive-streams.version>
166+
<reactive-streams.version>1.0.3</reactive-streams.version>
167167

168168
<skip.unit.tests>${skipTests}</skip.unit.tests>
169169
</properties>

services/s3/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,10 @@
111111
<artifactId>wiremock</artifactId>
112112
<scope>test</scope>
113113
</dependency>
114+
<dependency>
115+
<groupId>org.reactivestreams</groupId>
116+
<artifactId>reactive-streams-tck</artifactId>
117+
<scope>test</scope>
118+
</dependency>
114119
</dependencies>
115120
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal;
17+
18+
import com.amazonaws.s3.ResponseDataConsumer;
19+
import com.amazonaws.s3.model.GetObjectOutput;
20+
import java.nio.ByteBuffer;
21+
import java.util.concurrent.CompletableFuture;
22+
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.core.SdkStandardLogger;
24+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
25+
import software.amazon.awssdk.crt.CrtRuntimeException;
26+
import software.amazon.awssdk.crt.http.HttpHeader;
27+
import software.amazon.awssdk.http.HttpStatusFamily;
28+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
29+
import software.amazon.awssdk.utils.Logger;
30+
31+
/**
32+
* Adapt the SDK API {@link AsyncResponseTransformer} to the CRT API {@link ResponseDataConsumer}.
33+
*/
34+
@SdkInternalApi
35+
public final class CrtResponseDataConsumerAdapter<ReturnT> implements ResponseDataConsumer<GetObjectOutput> {
36+
private static final Logger log = Logger.loggerFor(CrtResponseDataConsumerAdapter.class);
37+
private final AsyncResponseTransformer<GetObjectResponse, ReturnT> transformer;
38+
private final CompletableFuture<ReturnT> future;
39+
private final S3CrtDataPublisher publisher;
40+
41+
public CrtResponseDataConsumerAdapter(AsyncResponseTransformer<GetObjectResponse, ReturnT> transformer) {
42+
this.transformer = transformer;
43+
this.future = transformer.prepare();
44+
this.publisher = new S3CrtDataPublisher();
45+
}
46+
47+
public CompletableFuture<ReturnT> transformerFuture() {
48+
return future;
49+
}
50+
51+
@Override
52+
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
53+
if (HttpStatusFamily.of(statusCode) == HttpStatusFamily.SUCCESSFUL) {
54+
SdkStandardLogger.REQUEST_LOGGER.debug(() -> "Received successful response: " + statusCode);
55+
} else {
56+
SdkStandardLogger.REQUEST_LOGGER.debug(() -> "Received error response: " + statusCode);
57+
}
58+
}
59+
60+
@Override
61+
public void onResponse(GetObjectOutput output) {
62+
GetObjectResponse response = S3CrtUtils.adaptGetObjectOutput(output);
63+
transformer.onResponse(response);
64+
transformer.onStream(publisher);
65+
}
66+
67+
@Override
68+
public void onResponseData(ByteBuffer byteBuffer) {
69+
log.debug(() -> "Received data of size " + byteBuffer.remaining());
70+
publisher.deliverData(byteBuffer);
71+
}
72+
73+
@Override
74+
public void onException(CrtRuntimeException e) {
75+
log.debug(() -> "An error occurred ", e);
76+
transformer.exceptionOccurred(e);
77+
publisher.notifyError(e);
78+
}
79+
80+
@Override
81+
public void onFinished() {
82+
log.debug(() -> "Finished streaming ");
83+
publisher.notifyStreamingFinished();
84+
}
85+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/DefaultS3CrtAsyncClient.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,26 @@
1919
import static software.amazon.awssdk.services.s3.internal.S3CrtUtils.createCrtCredentialsProvider;
2020

2121
import com.amazonaws.s3.S3NativeClient;
22+
import java.util.concurrent.CompletableFuture;
2223
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2325
import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider;
2426
import software.amazon.awssdk.services.s3.S3CrtAsyncClient;
27+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
28+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2529

2630
@SdkInternalApi
2731
public final class DefaultS3CrtAsyncClient implements S3CrtAsyncClient {
2832
private final S3NativeClient s3NativeClient;
2933
private final S3NativeClientConfiguration configuration;
3034

31-
private final CredentialsProvider credentialsProvider;
32-
33-
3435
public DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
35-
this.credentialsProvider = createCrtCredentialsProvider(builder.credentialsProvider());
36+
CredentialsProvider credentialsProvider = builder.credentialsProvider() == null ? null :
37+
createCrtCredentialsProvider(builder.credentialsProvider());
3638

3739
this.configuration = S3NativeClientConfiguration.builder()
3840
.credentialsProvider(credentialsProvider)
39-
.signingRegion(builder.region().id())
41+
.signingRegion(builder.region() == null ? null : builder.region().id())
4042
.partSizeBytes(builder.partSizeBytes())
4143
.maxThroughputGbps(builder.maxThroughputGbps())
4244
.build();
@@ -48,7 +50,29 @@ public DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
4850
configuration.maxThroughputGbps());
4951
}
5052

53+
@Override
54+
public <ReturnT> CompletableFuture<ReturnT> getObject(
55+
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ReturnT> asyncResponseTransformer) {
56+
57+
CompletableFuture<ReturnT> future = new CompletableFuture<>();
58+
com.amazonaws.s3.model.GetObjectRequest crtGetObjectRequest = S3CrtUtils.adaptGetObjectRequest(getObjectRequest);
59+
CrtResponseDataConsumerAdapter<ReturnT> adapter = new CrtResponseDataConsumerAdapter<>(asyncResponseTransformer);
5160

61+
CompletableFuture<ReturnT> adapterFuture = adapter.transformerFuture();
62+
63+
s3NativeClient.getObject(crtGetObjectRequest, adapter);
64+
65+
adapterFuture.whenComplete((r, t) -> {
66+
if (t == null) {
67+
future.complete(r);
68+
} else {
69+
future.completeExceptionally(t);
70+
}
71+
// TODO: Offload to future completion thread
72+
});
73+
74+
return future;
75+
}
5276

5377
@Override
5478
public String serviceName() {

0 commit comments

Comments
 (0)