Skip to content

Commit 782b49d

Browse files
Add new AsyncResponseTransformer: toPublisher() (#2837)
* Add new AsyncResponseTransformer: toPublisher() ## Description Add new AsyncResponseTransformer: toPublisher(). This transformer makes it more convenient for users to directly consume a streaming-response payload (i.e., S3 GetObject) with async clients. This also allows users of Reactor/RxJava to more easily consume a streaming response (e.g., via Flux#from(Publisher)). ## Motivation and Context AsyncResponseTransformer currently lacks a static factory method that would allow a user to stream the response body data. The only methods currently available are toFile() and toBytes(). This lacks parity when compared to AsyncRequestBody#fromPublisher(), which allows declaring a request body from an arbitrary Publisher<ByteBuffer>. It also lacks parity when compared to the sync client's response equivalent, ResponseTransformer#toInputStream(), which streams a response body to an InputStream.
1 parent 49120c7 commit 782b49d

File tree

26 files changed

+1003
-271
lines changed

26 files changed

+1003
-271
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Add new AsyncResponseTransformer: toPublisher(). This transformer makes it more convenient for users to directly consume a streaming-response payload (i.e., S3 GetObject) with async clients. This also allows users of Reactor/RxJava to more easily consume a streaming response (e.g., via Flux#from(Publisher))."
6+
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.nio.ByteBuffer;
3737
import java.util.Collections;
3838
import java.util.List;
39+
import java.util.concurrent.CompletableFuture;
3940
import java.util.concurrent.Executor;
4041
import java.util.concurrent.ScheduledExecutorService;
4142
import java.util.stream.Collectors;
@@ -64,6 +65,8 @@
6465
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
6566
import software.amazon.awssdk.codegen.poet.model.EventStreamSpecHelper;
6667
import software.amazon.awssdk.core.RequestOverrideConfiguration;
68+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
69+
import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils;
6770
import software.amazon.awssdk.core.async.SdkPublisher;
6871
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
6972
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
@@ -78,6 +81,7 @@
7881
import software.amazon.awssdk.protocols.json.AwsJsonProtocolFactory;
7982
import software.amazon.awssdk.utils.CompletableFutureUtils;
8083
import software.amazon.awssdk.utils.FunctionalUtils;
84+
import software.amazon.awssdk.utils.Pair;
8185

8286
public final class AsyncClientClass extends AsyncClientInterface {
8387
private final IntermediateModel model;
@@ -244,7 +248,31 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
244248
CoreMetric.class, "SERVICE_ID", model.getMetadata().getServiceId());
245249
builder.addStatement("apiCallMetricCollector.reportMetric($T.$L, $S)",
246250
CoreMetric.class, "OPERATION_NAME", opModel.getOperationName());
247-
251+
252+
if (opModel.hasStreamingOutput()) {
253+
ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());
254+
255+
builder.addStatement("$T<$T<$T, ReturnT>, $T<$T>> $N = $T.wrapWithEndOfStreamFuture($N)",
256+
Pair.class,
257+
AsyncResponseTransformer.class,
258+
responseType,
259+
CompletableFuture.class,
260+
Void.class,
261+
"pair",
262+
AsyncResponseTransformerUtils.class,
263+
"asyncResponseTransformer");
264+
265+
builder.addStatement("$N = $N.left()",
266+
"asyncResponseTransformer",
267+
"pair");
268+
269+
builder.addStatement("$T<$T> $N = $N.right()",
270+
CompletableFuture.class,
271+
Void.class,
272+
"endOfStreamFuture",
273+
"pair");
274+
}
275+
248276
if (shouldUseAsyncWithBodySigner(opModel)) {
249277
builder.addCode(applyAsyncWithBodyV4SignerOverride(opModel));
250278
} else {
@@ -312,8 +340,14 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
312340
.beginControlFlow("catch ($T t)", Throwable.class);
313341

314342
// For streaming operations we also want to notify the response handler of any exception.
343+
if (opModel.hasStreamingOutput()) {
344+
ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());
345+
builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer",
346+
AsyncResponseTransformer.class,
347+
responseType);
348+
}
315349
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
316-
String paramName = opModel.hasStreamingOutput() ? "asyncResponseTransformer" : "asyncResponseHandler";
350+
String paramName = opModel.hasStreamingOutput() ? "finalAsyncResponseTransformer" : "asyncResponseHandler";
317351
builder.addStatement("runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\",\n" +
318352
"() -> $N.exceptionOccurred(t))", paramName);
319353
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import software.amazon.awssdk.core.SdkPojoBuilder;
4646
import software.amazon.awssdk.core.SdkResponse;
4747
import software.amazon.awssdk.core.async.AsyncRequestBody;
48+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4849
import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler;
4950
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
5051
import software.amazon.awssdk.core.http.HttpResponseHandler;
@@ -223,7 +224,6 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
223224

224225
boolean isStreaming = opModel.hasStreamingOutput() || opModel.hasEventStreamOutput();
225226
String protocolFactory = protocolFactoryLiteral(intermediateModel, opModel);
226-
String customerResponseHandler = opModel.hasEventStreamOutput() ? "asyncResponseHandler" : "asyncResponseTransformer";
227227
TypeName responseType = opModel.hasEventStreamOutput() && !isRestJson ? ClassName.get(SdkResponse.class)
228228
: pojoResponseType;
229229
TypeName executeFutureValueType = executeFutureValueType(opModel, poetExtensions);
@@ -245,7 +245,13 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
245245
.add(".withInput($L)$L);",
246246
opModel.getInput().getVariableName(), asyncResponseTransformerVariable(isStreaming, isRestJson, opModel));
247247

248-
248+
if (opModel.hasStreamingOutput()) {
249+
builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer",
250+
AsyncResponseTransformer.class,
251+
pojoResponseType);
252+
}
253+
String customerResponseHandler = opModel.hasEventStreamOutput() ?
254+
"asyncResponseHandler" : "finalAsyncResponseTransformer";
249255
String whenComplete = whenCompleteBody(opModel, customerResponseHandler);
250256
if (!whenComplete.isEmpty()) {
251257
String whenCompletedFutureName = "whenCompleted";

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/ProtocolSpec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ default String streamingOutputWhenComplete(String responseHandlerName) {
154154
+ " runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\", () "
155155
+ "-> %s.exceptionOccurred(e));%n"
156156
+ " }%n"
157-
+ "%s"
157+
+ " endOfStreamFuture.whenComplete((r2, e2) -> {%n"
158+
+ " %s%n"
159+
+ " });"
158160
+ "})", responseHandlerName, publishMetrics());
159161

160162
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryProtocolSpec.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
3030
import software.amazon.awssdk.codegen.poet.PoetExtensions;
3131
import software.amazon.awssdk.codegen.poet.client.traits.HttpChecksumRequiredTrait;
32+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3233
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
3334
import software.amazon.awssdk.core.http.HttpResponseHandler;
3435
import software.amazon.awssdk.protocols.query.AwsQueryProtocolFactory;
@@ -151,8 +152,11 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
151152
builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
152153
executeFutureValueType), whenCompleteFutureName);
153154
if (opModel.hasStreamingOutput()) {
155+
builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer",
156+
AsyncResponseTransformer.class,
157+
pojoResponseType);
154158
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName,
155-
streamingOutputWhenComplete("asyncResponseTransformer"));
159+
streamingOutputWhenComplete("finalAsyncResponseTransformer"));
156160
} else {
157161
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, publishMetricsWhenComplete());
158162
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/XmlProtocolSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
209209
s3ArnableFields(opModel, model).ifPresent(builder::add);
210210

211211
builder.add(".withInput($L)", opModel.getInput().getVariableName());
212-
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
212+
if (opModel.hasEventStreamOutput()) {
213213
builder.add(", $N", executionResponseTransformerName);
214214
}
215215
builder.addStatement(")");
@@ -218,7 +218,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
218218
builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
219219
executeFutureValueType), whenCompleteFutureName);
220220

221-
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
221+
if (opModel.hasEventStreamOutput()) {
222222
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName,
223223
whenCompleteBlock(opModel, "asyncResponseHandler",
224224
eventStreamTransformFutureName));

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.core.SdkResponse;
3131
import software.amazon.awssdk.core.async.AsyncRequestBody;
3232
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
33+
import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils;
3334
import software.amazon.awssdk.core.async.SdkPublisher;
3435
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
3536
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
@@ -106,6 +107,7 @@
106107
import software.amazon.awssdk.services.json.transform.StreamingOutputOperationRequestMarshaller;
107108
import software.amazon.awssdk.utils.CompletableFutureUtils;
108109
import software.amazon.awssdk.utils.HostnameValidator;
110+
import software.amazon.awssdk.utils.Pair;
109111

110112
/**
111113
* Internal implementation of {@link JsonAsyncClient}.
@@ -998,6 +1000,10 @@ public <ReturnT> CompletableFuture<ReturnT> streamingInputOutputOperation(
9981000
try {
9991001
apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service");
10001002
apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingInputOutputOperation");
1003+
Pair<AsyncResponseTransformer<StreamingInputOutputOperationResponse, ReturnT>, CompletableFuture<Void>> pair =
1004+
AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer);
1005+
asyncResponseTransformer = pair.left();
1006+
CompletableFuture<Void> endOfStreamFuture = pair.right();
10011007
streamingInputOutputOperationRequest = applySignerOverride(streamingInputOutputOperationRequest,
10021008
Aws4UnsignedPayloadSigner.create());
10031009
JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true)
@@ -1021,18 +1027,22 @@ public <ReturnT> CompletableFuture<ReturnT> streamingInputOutputOperation(
10211027
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
10221028
.withMetricCollector(apiCallMetricCollector).withAsyncRequestBody(requestBody)
10231029
.withInput(streamingInputOutputOperationRequest), asyncResponseTransformer);
1030+
AsyncResponseTransformer<StreamingInputOutputOperationResponse, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer;
10241031
CompletableFuture<ReturnT> whenCompleted = executeFuture.whenComplete((r, e) -> {
10251032
if (e != null) {
10261033
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
1027-
() -> asyncResponseTransformer.exceptionOccurred(e));
1034+
() -> finalAsyncResponseTransformer.exceptionOccurred(e));
10281035
}
1029-
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
1036+
endOfStreamFuture.whenComplete((r2, e2) -> {
1037+
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
1038+
});
10301039
});
10311040
executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture);
10321041
return executeFuture;
10331042
} catch (Throwable t) {
1043+
AsyncResponseTransformer<StreamingInputOutputOperationResponse, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer;
10341044
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
1035-
() -> asyncResponseTransformer.exceptionOccurred(t));
1045+
() -> finalAsyncResponseTransformer.exceptionOccurred(t));
10361046
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
10371047
return CompletableFutureUtils.failedFuture(t);
10381048
}
@@ -1073,6 +1083,10 @@ public <ReturnT> CompletableFuture<ReturnT> streamingOutputOperation(
10731083
try {
10741084
apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service");
10751085
apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingOutputOperation");
1086+
Pair<AsyncResponseTransformer<StreamingOutputOperationResponse, ReturnT>, CompletableFuture<Void>> pair =
1087+
AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer);
1088+
asyncResponseTransformer = pair.left();
1089+
CompletableFuture<Void> endOfStreamFuture = pair.right();
10761090
JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true)
10771091
.isPayloadJson(false).build();
10781092

@@ -1089,18 +1103,22 @@ public <ReturnT> CompletableFuture<ReturnT> streamingOutputOperation(
10891103
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
10901104
.withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest),
10911105
asyncResponseTransformer);
1106+
AsyncResponseTransformer<StreamingOutputOperationResponse, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer;
10921107
CompletableFuture<ReturnT> whenCompleted = executeFuture.whenComplete((r, e) -> {
10931108
if (e != null) {
10941109
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
1095-
() -> asyncResponseTransformer.exceptionOccurred(e));
1110+
() -> finalAsyncResponseTransformer.exceptionOccurred(e));
10961111
}
1097-
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
1112+
endOfStreamFuture.whenComplete((r2, e2) -> {
1113+
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
1114+
});
10981115
});
10991116
executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture);
11001117
return executeFuture;
11011118
} catch (Throwable t) {
1119+
AsyncResponseTransformer<StreamingOutputOperationResponse, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer;
11021120
runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring",
1103-
() -> asyncResponseTransformer.exceptionOccurred(t));
1121+
() -> finalAsyncResponseTransformer.exceptionOccurred(t));
11041122
metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect()));
11051123
return CompletableFutureUtils.failedFuture(t);
11061124
}

0 commit comments

Comments
 (0)