Skip to content

Commit 70929c3

Browse files
authored
Added AsyncResponseTransformer.toBlockingInputStream. (#3562)
* Added AsyncResponseTransformer.toBlockingInputStream, allowing streaming operation responses to be read as if they're an InputStream. * Fix sonarcloud issue.
1 parent 0fd7ea5 commit 70929c3

File tree

11 files changed

+1088
-3
lines changed

11 files changed

+1088
-3
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Added AsyncResponseTransformer.toBlockingInputStream, allowing streaming operation responses to be read as if they're an InputStream."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.core;
1717

18+
import java.io.InputStream;
1819
import software.amazon.awssdk.annotations.SdkPublicApi;
1920
import software.amazon.awssdk.core.io.SdkFilterInputStream;
2021
import software.amazon.awssdk.http.Abortable;
@@ -43,6 +44,12 @@ public ResponseInputStream(ResponseT resp, AbortableInputStream in) {
4344
this.abortable = Validate.paramNotNull(in, "abortableInputStream");
4445
}
4546

47+
public ResponseInputStream(ResponseT resp, InputStream in) {
48+
super(in);
49+
this.response = Validate.paramNotNull(resp, "response");
50+
this.abortable = in instanceof Abortable ? (Abortable) in : null;
51+
}
52+
4653
/**
4754
* @return The unmarshalled POJO response associated with this content.
4855
*/
@@ -52,6 +59,8 @@ public ResponseT response() {
5259

5360
@Override
5461
public void abort() {
55-
abortable.abort();
62+
if (abortable != null) {
63+
abortable.abort();
64+
}
5665
}
5766
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.io.File;
19+
import java.io.InputStream;
1920
import java.nio.ByteBuffer;
2021
import java.nio.file.Path;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.function.Consumer;
2324
import software.amazon.awssdk.annotations.SdkPublicApi;
2425
import software.amazon.awssdk.core.FileTransformerConfiguration;
2526
import software.amazon.awssdk.core.ResponseBytes;
27+
import software.amazon.awssdk.core.ResponseInputStream;
2628
import software.amazon.awssdk.core.SdkResponse;
2729
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
2830
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
31+
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
2932
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
3033
import software.amazon.awssdk.utils.Validate;
3134

@@ -232,4 +235,32 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
232235
static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, ResponsePublisher<ResponseT>> toPublisher() {
233236
return new PublisherAsyncResponseTransformer<>();
234237
}
238+
239+
/**
240+
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an
241+
* {@link InputStream}.
242+
* <p>
243+
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will
244+
* be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This
245+
* behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only
246+
* have their {@link CompletableFuture} completed after the entire response body has finished streaming.
247+
* <p>
248+
* You are responsible for performing blocking reads from this input stream and closing the stream when you are
249+
* finished.
250+
* <p>
251+
* Example usage:
252+
* <pre>
253+
* {@code
254+
* CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture =
255+
* s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream());
256+
* try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.join()) {
257+
* responseStream.transferTo(System.out); // BLOCKS the calling thread
258+
* }
259+
* }
260+
* </pre>
261+
*/
262+
static <ResponseT extends SdkResponse>
263+
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
264+
return new InputStreamResponseTransformer<>();
265+
}
235266
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.concurrent.CompletableFuture;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.core.ResponseInputStream;
22+
import software.amazon.awssdk.core.SdkResponse;
23+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
24+
import software.amazon.awssdk.core.async.SdkPublisher;
25+
import software.amazon.awssdk.utils.async.InputStreamSubscriber;
26+
27+
/**
28+
* A {@link AsyncResponseTransformer} that allows performing blocking reads on the response data.
29+
* <p>
30+
* Created with {@link AsyncResponseTransformer#toBlockingInputStream()}.
31+
*/
32+
@SdkInternalApi
33+
public class InputStreamResponseTransformer<ResponseT extends SdkResponse>
34+
implements AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> {
35+
36+
private volatile CompletableFuture<ResponseInputStream<ResponseT>> future;
37+
private volatile ResponseT response;
38+
39+
@Override
40+
public CompletableFuture<ResponseInputStream<ResponseT>> prepare() {
41+
CompletableFuture<ResponseInputStream<ResponseT>> result = new CompletableFuture<>();
42+
this.future = result;
43+
return result;
44+
}
45+
46+
@Override
47+
public void onResponse(ResponseT response) {
48+
this.response = response;
49+
}
50+
51+
@Override
52+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
53+
InputStreamSubscriber inputStreamSubscriber = new InputStreamSubscriber();
54+
publisher.subscribe(inputStreamSubscriber);
55+
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
56+
}
57+
58+
@Override
59+
public void exceptionOccurred(Throwable error) {
60+
future.completeExceptionally(error);
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.nio.ByteBuffer;
23+
import java.util.concurrent.CompletableFuture;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import software.amazon.awssdk.core.ResponseInputStream;
27+
import software.amazon.awssdk.core.SdkResponse;
28+
import software.amazon.awssdk.core.async.SdkPublisher;
29+
import software.amazon.awssdk.core.protocol.VoidSdkResponse;
30+
import software.amazon.awssdk.utils.async.SimplePublisher;
31+
32+
class InputStreamResponseTransformerTest {
33+
private SimplePublisher<ByteBuffer> publisher;
34+
private InputStreamResponseTransformer<SdkResponse> transformer;
35+
private SdkResponse response;
36+
private CompletableFuture<ResponseInputStream<SdkResponse>> resultFuture;
37+
38+
@BeforeEach
39+
public void setup() {
40+
publisher = new SimplePublisher<>();
41+
transformer = new InputStreamResponseTransformer<>();
42+
resultFuture = transformer.prepare();
43+
response = VoidSdkResponse.builder().build();
44+
45+
transformer.onResponse(response);
46+
47+
assertThat(resultFuture).isNotDone();
48+
49+
transformer.onStream(SdkPublisher.adapt(publisher));
50+
51+
assertThat(resultFuture).isCompleted();
52+
assertThat(resultFuture.join().response()).isEqualTo(response);
53+
}
54+
55+
@Test
56+
public void inputStreamReadsAreFromPublisher() throws IOException {
57+
InputStream stream = resultFuture.join();
58+
59+
publisher.send(ByteBuffer.wrap(new byte[] { 0, 1, 2 }));
60+
publisher.complete();
61+
62+
assertThat(stream.read()).isEqualTo(0);
63+
assertThat(stream.read()).isEqualTo(1);
64+
assertThat(stream.read()).isEqualTo(2);
65+
assertThat(stream.read()).isEqualTo(-1);
66+
}
67+
68+
@Test
69+
public void inputStreamArrayReadsAreFromPublisher() throws IOException {
70+
InputStream stream = resultFuture.join();
71+
72+
publisher.send(ByteBuffer.wrap(new byte[] { 0, 1, 2 }));
73+
publisher.complete();
74+
75+
byte[] data = new byte[3];
76+
assertThat(stream.read(data)).isEqualTo(3);
77+
78+
assertThat(data[0]).isEqualTo((byte) 0);
79+
assertThat(data[1]).isEqualTo((byte) 1);
80+
assertThat(data[2]).isEqualTo((byte) 2);
81+
assertThat(stream.read(data)).isEqualTo(-1);
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services;
17+
18+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.post;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.github.tomakehurst.wiremock.WireMockServer;
24+
import java.io.IOException;
25+
import java.net.URI;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.concurrent.CompletableFuture;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.Timeout;
31+
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
32+
import software.amazon.awssdk.core.ResponseInputStream;
33+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
34+
import software.amazon.awssdk.regions.Region;
35+
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
36+
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationRequest;
37+
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationResponse;
38+
39+
@Timeout(5)
40+
public class BlockingAsyncRequestResponseBodyTest {
41+
private final WireMockServer wireMock = new WireMockServer(0);
42+
private ProtocolRestJsonAsyncClient client;
43+
44+
@BeforeEach
45+
public void setup() {
46+
wireMock.start();
47+
client = ProtocolRestJsonAsyncClient.builder()
48+
.region(Region.US_WEST_2)
49+
.credentialsProvider(AnonymousCredentialsProvider.create())
50+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
51+
.build();
52+
}
53+
54+
@Test
55+
public void blockingResponseTransformer_readsRightValue() {
56+
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));
57+
58+
CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
59+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
60+
AsyncResponseTransformer.toBlockingInputStream());
61+
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();
62+
63+
assertThat(responseStream).asString(StandardCharsets.UTF_8).isEqualTo("hello");
64+
assertThat(responseStream.response().sdkHttpResponse().statusCode()).isEqualTo(200);
65+
}
66+
67+
@Test
68+
public void blockingResponseTransformer_abortCloseDoesNotThrow() throws IOException {
69+
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));
70+
71+
CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
72+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
73+
AsyncResponseTransformer.toBlockingInputStream());
74+
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();
75+
responseStream.abort();
76+
responseStream.close();
77+
}
78+
79+
@Test
80+
public void blockingResponseTransformer_closeDoesNotThrow() throws IOException {
81+
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));
82+
83+
CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
84+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
85+
AsyncResponseTransformer.toBlockingInputStream());
86+
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();
87+
responseStream.close();
88+
}
89+
}

0 commit comments

Comments
 (0)