Skip to content

Commit 0342a87

Browse files
committed
Plain transplant of existing AsyncResponseTransformer
1 parent eb84c15 commit 0342a87

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

src/main/java/com/madgag/aws/sdk/async/responsebytes/Main.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import com.google.common.hash.HashCode;
44
import com.google.common.hash.Hashing;
5+
import com.madgag.aws.sdk.async.responsebytes.awssdk.core.async.AsyncResponseTransformerAlternative;
56
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
67
import software.amazon.awssdk.core.ResponseBytes;
7-
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
88
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
99
import software.amazon.awssdk.services.s3.S3AsyncClient;
1010
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -23,7 +23,7 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
2323
KnownS3Object knownS3Object = KnownS3Object.BIG;
2424
ResponseBytes<GetObjectResponse> responseBytes = s3Client.getObject(
2525
GetObjectRequest.builder().bucket(knownS3Object.bucket()).key(knownS3Object.key()).build(),
26-
AsyncResponseTransformer.toBytes()
26+
AsyncResponseTransformerAlternative.toBytes()
2727
).get();
2828
System.out.println(responseBytes.response().contentLength());
2929
HashCode hashOfDownloadedData = Hashing.sha256().hashBytes(responseBytes.asByteBuffer());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.madgag.aws.sdk.async.responsebytes.awssdk.core.async;
2+
3+
import com.madgag.aws.sdk.async.responsebytes.awssdk.core.internal.async.ByteArrayAsyncResponseTransformerAlternative;
4+
import software.amazon.awssdk.core.ResponseBytes;
5+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
6+
7+
public class AsyncResponseTransformerAlternative {
8+
public static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() {
9+
return new ByteArrayAsyncResponseTransformerAlternative<>();
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.madgag.aws.sdk.async.responsebytes.awssdk.core.internal.async;
17+
18+
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
19+
20+
import java.io.ByteArrayOutputStream;
21+
import java.nio.ByteBuffer;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.reactivestreams.Subscriber;
24+
import org.reactivestreams.Subscription;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.core.ResponseBytes;
27+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
28+
import software.amazon.awssdk.core.async.SdkPublisher;
29+
import software.amazon.awssdk.utils.BinaryUtils;
30+
31+
/**
32+
* Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
33+
* conversions into types, like strings.
34+
*
35+
* This can be created with static methods on {@link AsyncResponseTransformer}.
36+
*
37+
* @param <ResponseT> Pojo response type.
38+
* @see AsyncResponseTransformer#toBytes()
39+
*/
40+
@SdkInternalApi
41+
public final class ByteArrayAsyncResponseTransformerAlternative<ResponseT> implements
42+
AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> {
43+
44+
private volatile CompletableFuture<byte[]> cf;
45+
private volatile ResponseT response;
46+
47+
@Override
48+
public CompletableFuture<ResponseBytes<ResponseT>> prepare() {
49+
cf = new CompletableFuture<>();
50+
return cf.thenApply(arr -> ResponseBytes.fromByteArray(response, arr));
51+
}
52+
53+
@Override
54+
public void onResponse(ResponseT response) {
55+
this.response = response;
56+
}
57+
58+
@Override
59+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
60+
publisher.subscribe(new BaosSubscriber(cf));
61+
}
62+
63+
@Override
64+
public void exceptionOccurred(Throwable throwable) {
65+
cf.completeExceptionally(throwable);
66+
}
67+
68+
static class BaosSubscriber implements Subscriber<ByteBuffer> {
69+
private final CompletableFuture<byte[]> resultFuture;
70+
71+
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
72+
73+
private Subscription subscription;
74+
75+
BaosSubscriber(CompletableFuture<byte[]> resultFuture) {
76+
this.resultFuture = resultFuture;
77+
}
78+
79+
@Override
80+
public void onSubscribe(Subscription s) {
81+
if (this.subscription != null) {
82+
s.cancel();
83+
return;
84+
}
85+
this.subscription = s;
86+
subscription.request(Long.MAX_VALUE);
87+
}
88+
89+
@Override
90+
public void onNext(ByteBuffer byteBuffer) {
91+
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
92+
subscription.request(1);
93+
}
94+
95+
@Override
96+
public void onError(Throwable throwable) {
97+
baos = null;
98+
resultFuture.completeExceptionally(throwable);
99+
}
100+
101+
@Override
102+
public void onComplete() {
103+
resultFuture.complete(baos.toByteArray());
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)