diff --git a/.changes/next-release/feature-AWSSDKforJavav2-0e09833.json b/.changes/next-release/feature-AWSSDKforJavav2-0e09833.json new file mode 100644 index 000000000000..8f6427213b26 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-0e09833.json @@ -0,0 +1,6 @@ +{ + "category": "AWS SDK for Java v2", + "contributor": "", + "type": "feature", + "description": "Add new AsyncResponseTransformer: toPublisher(). This transformer makes it more convenient for users to directly consume a streaming-response payload (i.e., S3 GetObject) with async clients. This also allows users of Reactor/RxJava to more easily consume a streaming response (e.g., via Flux#from(Publisher))." +} diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java index 4849b7227e10..00b8bd66be62 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -64,6 +65,8 @@ import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils; import software.amazon.awssdk.codegen.poet.model.EventStreamSpecHelper; import software.amazon.awssdk.core.RequestOverrideConfiguration; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; @@ -78,6 +81,7 @@ import software.amazon.awssdk.protocols.json.AwsJsonProtocolFactory; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.FunctionalUtils; +import software.amazon.awssdk.utils.Pair; public final class AsyncClientClass extends AsyncClientInterface { private final IntermediateModel model; @@ -244,7 +248,31 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation CoreMetric.class, "SERVICE_ID", model.getMetadata().getServiceId()); builder.addStatement("apiCallMetricCollector.reportMetric($T.$L, $S)", CoreMetric.class, "OPERATION_NAME", opModel.getOperationName()); - + + if (opModel.hasStreamingOutput()) { + ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType()); + + builder.addStatement("$T<$T<$T, ReturnT>, $T<$T>> $N = $T.wrapWithEndOfStreamFuture($N)", + Pair.class, + AsyncResponseTransformer.class, + responseType, + CompletableFuture.class, + Void.class, + "pair", + AsyncResponseTransformerUtils.class, + "asyncResponseTransformer"); + + builder.addStatement("$N = $N.left()", + "asyncResponseTransformer", + "pair"); + + builder.addStatement("$T<$T> $N = $N.right()", + CompletableFuture.class, + Void.class, + "endOfStreamFuture", + "pair"); + } + if (shouldUseAsyncWithBodySigner(opModel)) { builder.addCode(applyAsyncWithBodyV4SignerOverride(opModel)); } else { @@ -312,8 +340,14 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation .beginControlFlow("catch ($T t)", Throwable.class); // For streaming operations we also want to notify the response handler of any exception. + if (opModel.hasStreamingOutput()) { + ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType()); + builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer", + AsyncResponseTransformer.class, + responseType); + } if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) { - String paramName = opModel.hasStreamingOutput() ? "asyncResponseTransformer" : "asyncResponseHandler"; + String paramName = opModel.hasStreamingOutput() ? "finalAsyncResponseTransformer" : "asyncResponseHandler"; builder.addStatement("runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\",\n" + "() -> $N.exceptionOccurred(t))", paramName); } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java index 668c89e3d15c..dbf13ddbed14 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.core.SdkPojoBuilder; import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler; import software.amazon.awssdk.core.client.handler.ClientExecutionParams; import software.amazon.awssdk.core.http.HttpResponseHandler; @@ -223,7 +224,6 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper boolean isStreaming = opModel.hasStreamingOutput() || opModel.hasEventStreamOutput(); String protocolFactory = protocolFactoryLiteral(intermediateModel, opModel); - String customerResponseHandler = opModel.hasEventStreamOutput() ? "asyncResponseHandler" : "asyncResponseTransformer"; TypeName responseType = opModel.hasEventStreamOutput() && !isRestJson ? ClassName.get(SdkResponse.class) : pojoResponseType; TypeName executeFutureValueType = executeFutureValueType(opModel, poetExtensions); @@ -245,7 +245,13 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper .add(".withInput($L)$L);", opModel.getInput().getVariableName(), asyncResponseTransformerVariable(isStreaming, isRestJson, opModel)); - + if (opModel.hasStreamingOutput()) { + builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer", + AsyncResponseTransformer.class, + pojoResponseType); + } + String customerResponseHandler = opModel.hasEventStreamOutput() ? + "asyncResponseHandler" : "finalAsyncResponseTransformer"; String whenComplete = whenCompleteBody(opModel, customerResponseHandler); if (!whenComplete.isEmpty()) { String whenCompletedFutureName = "whenCompleted"; diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/ProtocolSpec.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/ProtocolSpec.java index d052202fa88c..ee0738630584 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/ProtocolSpec.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/ProtocolSpec.java @@ -154,7 +154,9 @@ default String streamingOutputWhenComplete(String responseHandlerName) { + " runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\", () " + "-> %s.exceptionOccurred(e));%n" + " }%n" - + "%s" + + " endOfStreamFuture.whenComplete((r2, e2) -> {%n" + + " %s%n" + + " });" + "})", responseHandlerName, publishMetrics()); } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryProtocolSpec.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryProtocolSpec.java index f378dc5d2b66..365f5527028c 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryProtocolSpec.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/QueryProtocolSpec.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.codegen.model.intermediate.OperationModel; import software.amazon.awssdk.codegen.poet.PoetExtensions; import software.amazon.awssdk.codegen.poet.client.traits.HttpChecksumRequiredTrait; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.client.handler.ClientExecutionParams; import software.amazon.awssdk.core.http.HttpResponseHandler; import software.amazon.awssdk.protocols.query.AwsQueryProtocolFactory; @@ -151,8 +152,11 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class), executeFutureValueType), whenCompleteFutureName); if (opModel.hasStreamingOutput()) { + builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer", + AsyncResponseTransformer.class, + pojoResponseType); builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, - streamingOutputWhenComplete("asyncResponseTransformer")); + streamingOutputWhenComplete("finalAsyncResponseTransformer")); } else { builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, publishMetricsWhenComplete()); } diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/XmlProtocolSpec.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/XmlProtocolSpec.java index e21ba3a1cc3f..d903b4aa4d8f 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/XmlProtocolSpec.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/XmlProtocolSpec.java @@ -209,7 +209,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper s3ArnableFields(opModel, model).ifPresent(builder::add); builder.add(".withInput($L)", opModel.getInput().getVariableName()); - if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) { + if (opModel.hasEventStreamOutput()) { builder.add(", $N", executionResponseTransformerName); } builder.addStatement(")"); @@ -218,7 +218,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class), executeFutureValueType), whenCompleteFutureName); - if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) { + if (opModel.hasEventStreamOutput()) { builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, whenCompleteBlock(opModel, "asyncResponseHandler", eventStreamTransformFutureName)); diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java index 84b26aad8c69..0b051f75bf0a 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; @@ -106,6 +107,7 @@ import software.amazon.awssdk.services.json.transform.StreamingOutputOperationRequestMarshaller; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.HostnameValidator; +import software.amazon.awssdk.utils.Pair; /** * Internal implementation of {@link JsonAsyncClient}. @@ -998,6 +1000,10 @@ public CompletableFuture streamingInputOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingInputOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); streamingInputOutputOperationRequest = applySignerOverride(streamingInputOutputOperationRequest, Aws4UnsignedPayloadSigner.create()); JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true) @@ -1021,18 +1027,22 @@ public CompletableFuture streamingInputOutputOperation( .withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler) .withMetricCollector(apiCallMetricCollector).withAsyncRequestBody(requestBody) .withInput(streamingInputOutputOperationRequest), asyncResponseTransformer); + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; CompletableFuture whenCompleted = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture); return executeFuture; } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } @@ -1073,6 +1083,10 @@ public CompletableFuture streamingOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true) .isPayloadJson(false).build(); @@ -1089,18 +1103,22 @@ public CompletableFuture streamingOutputOperation( .withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler) .withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest), asyncResponseTransformer); + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; CompletableFuture whenCompleted = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture); return executeFuture; } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-aws-json-async-client-class.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-aws-json-async-client-class.java index 196defe0d4de..f10d49bbce3a 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-aws-json-async-client-class.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-aws-json-async-client-class.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; @@ -105,6 +106,7 @@ import software.amazon.awssdk.services.json.transform.StreamingOutputOperationRequestMarshaller; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.HostnameValidator; +import software.amazon.awssdk.utils.Pair; /** * Internal implementation of {@link JsonAsyncClient}. @@ -988,6 +990,10 @@ public CompletableFuture streamingInputOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingInputOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); streamingInputOutputOperationRequest = applySignerOverride(streamingInputOutputOperationRequest, Aws4UnsignedPayloadSigner.create()); JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true) @@ -1011,18 +1017,22 @@ public CompletableFuture streamingInputOutputOperation( .withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler) .withMetricCollector(apiCallMetricCollector).withAsyncRequestBody(requestBody) .withInput(streamingInputOutputOperationRequest), asyncResponseTransformer); + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; CompletableFuture whenCompleted = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture); return executeFuture; } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } @@ -1063,6 +1073,10 @@ public CompletableFuture streamingOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Json Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); JsonOperationMetadata operationMetadata = JsonOperationMetadata.builder().hasStreamingSuccessResponse(true) .isPayloadJson(false).build(); @@ -1079,18 +1093,22 @@ public CompletableFuture streamingOutputOperation( .withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler) .withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest), asyncResponseTransformer); + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; CompletableFuture whenCompleted = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); executeFuture = CompletableFutureUtils.forwardExceptionTo(whenCompleted, executeFuture); return executeFuture; } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-query-async-client-class.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-query-async-client-class.java index aed7674458f4..30bc42b75a69 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-query-async-client-class.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-query-async-client-class.java @@ -18,6 +18,7 @@ import software.amazon.awssdk.core.RequestOverrideConfiguration; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.core.client.handler.AsyncClientHandler; @@ -53,6 +54,7 @@ import software.amazon.awssdk.services.query.transform.StreamingOutputOperationRequestMarshaller; import software.amazon.awssdk.services.query.waiters.QueryAsyncWaiter; import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.Pair; /** * Internal implementation of {@link QueryAsyncClient}. @@ -352,6 +354,10 @@ public CompletableFuture streamingOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Query Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); HttpResponseHandler responseHandler = protocolFactory .createResponseHandler(StreamingOutputOperationResponse::builder); @@ -366,17 +372,21 @@ public CompletableFuture streamingOutputOperation( .withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest), asyncResponseTransformer); CompletableFuture whenCompleteFuture = null; + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; whenCompleteFuture = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); return CompletableFutureUtils.forwardExceptionTo(whenCompleteFuture, executeFuture); } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-xml-async-client-class.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-xml-async-client-class.java index 9976215a4ccd..da87c2838291 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-xml-async-client-class.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-xml-async-client-class.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.core.SdkPojoBuilder; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformerUtils; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; @@ -64,6 +65,7 @@ import software.amazon.awssdk.services.xml.transform.StreamingInputOperationRequestMarshaller; import software.amazon.awssdk.services.xml.transform.StreamingOutputOperationRequestMarshaller; import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.Pair; /** * Internal implementation of {@link XmlAsyncClient}. @@ -438,6 +440,10 @@ public CompletableFuture streamingOutputOperation( try { apiCallMetricCollector.reportMetric(CoreMetric.SERVICE_ID, "Xml Service"); apiCallMetricCollector.reportMetric(CoreMetric.OPERATION_NAME, "StreamingOutputOperation"); + Pair, CompletableFuture> pair = + AsyncResponseTransformerUtils.wrapWithEndOfStreamFuture(asyncResponseTransformer); + asyncResponseTransformer = pair.left(); + CompletableFuture endOfStreamFuture = pair.right(); HttpResponseHandler responseHandler = protocolFactory.createResponseHandler( StreamingOutputOperationResponse::builder, new XmlOperationMetadata().withHasStreamingSuccessResponse(true)); @@ -452,17 +458,21 @@ public CompletableFuture streamingOutputOperation( .withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest), asyncResponseTransformer); CompletableFuture whenCompleteFuture = null; + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; whenCompleteFuture = executeFuture.whenComplete((r, e) -> { if (e != null) { runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(e)); + () -> finalAsyncResponseTransformer.exceptionOccurred(e)); } - metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + endOfStreamFuture.whenComplete((r2, e2) -> { + metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); + }); }); return CompletableFutureUtils.forwardExceptionTo(whenCompleteFuture, executeFuture); } catch (Throwable t) { + AsyncResponseTransformer finalAsyncResponseTransformer = asyncResponseTransformer; runAndLogError(log, "Exception thrown in exceptionOccurred callback, ignoring", - () -> asyncResponseTransformer.exceptionOccurred(t)); + () -> finalAsyncResponseTransformer.exceptionOccurred(t)); metricPublishers.forEach(p -> p.publish(apiCallMetricCollector.collect())); return CompletableFutureUtils.failedFuture(t); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java index b096143d776b..c93b8a069db5 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java @@ -21,8 +21,10 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; +import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer; /** * Callback interface to handle a streaming asynchronous response. @@ -141,4 +143,35 @@ static AsyncResponseTransformer toFile(File fi static AsyncResponseTransformer> toBytes() { return new ByteArrayAsyncResponseTransformer<>(); } + + /** + * Creates an {@link AsyncResponseTransformer} that publishes the response body content through a {@link ResponsePublisher}, + * which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse} returned by the service. + *

+ * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed + * once the {@link SdkResponse} is available and the response body begins streaming. This behavior differs from some + * other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture} + * completed after the entire response body has finished streaming. + *

+ * You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this + * transformer is only recommended for advanced use cases. + *

+ * Example usage: + *

+     * {@code
+     *     CompletableFuture> responseFuture =
+     *         s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toPublisher());
+     *     ResponsePublisher responsePublisher = responseFuture.join();
+     *     System.out.println(responsePublisher.response());
+     *     CompletableFuture drainPublisherFuture = responsePublisher.subscribe(System.out::println);
+     *     drainPublisherFuture.join();
+     * }
+     * 
+ * + * @param Pojo response type. + * @return AsyncResponseTransformer instance. + */ + static AsyncResponseTransformer> toPublisher() { + return new PublisherAsyncResponseTransformer<>(); + } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformerUtils.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformerUtils.java new file mode 100644 index 000000000000..251d0ba78f0e --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformerUtils.java @@ -0,0 +1,56 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener; +import software.amazon.awssdk.utils.Pair; + +@SdkProtectedApi +public final class AsyncResponseTransformerUtils { + + private AsyncResponseTransformerUtils() { + } + + /** + * Wrap a {@link AsyncResponseTransformer} and associate it with a future that is completed upon end-of-stream, regardless of + * whether the transformer is configured to complete its future upon end-of-response or end-of-stream. + */ + public static Pair, CompletableFuture> + wrapWithEndOfStreamFuture(AsyncResponseTransformer responseTransformer) { + CompletableFuture future = new CompletableFuture<>(); + AsyncResponseTransformer wrapped = AsyncResponseTransformerListener.wrap( + responseTransformer, + new AsyncResponseTransformerListener() { + @Override + public void transformerExceptionOccurred(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void subscriberOnError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void subscriberOnComplete() { + future.complete(null); + } + }); + return Pair.of(wrapped, future); + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java new file mode 100644 index 000000000000..d8f87899f785 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java @@ -0,0 +1,87 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import java.nio.ByteBuffer; +import java.util.Objects; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.utils.ToString; +import software.amazon.awssdk.utils.Validate; + +/** + * An {@link SdkPublisher} that publishes response body content and also contains a reference to the {@link SdkResponse} returned + * by the service. + * + * @param Pojo response type. + * @see AsyncResponseTransformer#toPublisher() + */ +@SdkPublicApi +public final class ResponsePublisher implements SdkPublisher { + + private final ResponseT response; + private final SdkPublisher publisher; + + public ResponsePublisher(ResponseT response, SdkPublisher publisher) { + this.response = Validate.paramNotNull(response, "response"); + this.publisher = Validate.paramNotNull(publisher, "publisher"); + } + + /** + * @return the unmarshalled response object from the service. + */ + public ResponseT response() { + return response; + } + + @Override + public void subscribe(Subscriber subscriber) { + publisher.subscribe(subscriber); + } + + @Override + public String toString() { + return ToString.builder("ResponsePublisher") + .add("response", response) + .add("publisher", publisher) + .build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ResponsePublisher that = (ResponsePublisher) o; + + if (!Objects.equals(response, that.response)) { + return false; + } + return Objects.equals(publisher, that.publisher); + } + + @Override + public int hashCode() { + int result = response != null ? response.hashCode() : 0; + result = 31 * result + (publisher != null ? publisher.hashCode() : 0); + return result; + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java new file mode 100644 index 000000000000..883a40fd7516 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncRequestBodyListener.java @@ -0,0 +1,80 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async.listener; + +import java.nio.ByteBuffer; +import java.util.Optional; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * Listener interface that invokes callbacks associated with a {@link AsyncRequestBody} and any resulting {@link Subscriber}. + * + * @see PublisherListener + * @see SubscriberListener + */ +@SdkProtectedApi +public interface AsyncRequestBodyListener extends PublisherListener { + + /** + * Wrap a {@link AsyncRequestBody} with a new one that will notify a {@link AsyncRequestBodyListener} of important events + * occurring. + */ + static AsyncRequestBody wrap(AsyncRequestBody delegate, AsyncRequestBodyListener listener) { + return new NotifyingAsyncRequestBody(delegate, listener); + } + + @SdkInternalApi + final class NotifyingAsyncRequestBody implements AsyncRequestBody { + private static final Logger log = Logger.loggerFor(NotifyingAsyncRequestBody.class); + + private final AsyncRequestBody delegate; + private final AsyncRequestBodyListener listener; + + NotifyingAsyncRequestBody(AsyncRequestBody delegate, AsyncRequestBodyListener listener) { + this.delegate = Validate.notNull(delegate, "delegate"); + this.listener = Validate.notNull(listener, "listener"); + } + + @Override + public Optional contentLength() { + return delegate.contentLength(); + } + + @Override + public String contentType() { + return delegate.contentType(); + } + + @Override + public void subscribe(Subscriber s) { + invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe"); + delegate.subscribe(SubscriberListener.wrap(s, listener)); + } + + static void invoke(Runnable runnable, String callbackName) { + try { + runnable.run(); + } catch (Exception e) { + log.error(() -> callbackName + " callback failed. This exception will be dropped.", e); + } + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncResponseTransformerListener.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncResponseTransformerListener.java new file mode 100644 index 000000000000..b189d51ec7ef --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/AsyncResponseTransformerListener.java @@ -0,0 +1,110 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async.listener; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * Listener interface that invokes callbacks associated with a {@link AsyncResponseTransformer} and any resulting {@link + * SdkPublisher} and {@link Subscriber}. + * + * @see PublisherListener + * @see SubscriberListener + */ +@SdkProtectedApi +public interface AsyncResponseTransformerListener extends PublisherListener { + + /** + * Invoked before {@link AsyncResponseTransformer#onResponse(Object)} + */ + default void transformerOnResponse(ResponseT response) { + } + + /** + * Invoked before {@link AsyncResponseTransformer#onStream(SdkPublisher)} + */ + default void transformerOnStream(SdkPublisher publisher) { + } + + /** + * Invoked before {@link AsyncResponseTransformer#exceptionOccurred(Throwable)} + */ + default void transformerExceptionOccurred(Throwable t) { + } + + /** + * Wrap a {@link AsyncResponseTransformer} with a new one that will notify a {@link AsyncResponseTransformerListener} of + * important events occurring. + */ + static AsyncResponseTransformer wrap( + AsyncResponseTransformer delegate, + AsyncResponseTransformerListener listener) { + return new NotifyingAsyncResponseTransformer<>(delegate, listener); + } + + @SdkInternalApi + final class NotifyingAsyncResponseTransformer implements AsyncResponseTransformer { + private static final Logger log = Logger.loggerFor(NotifyingAsyncResponseTransformer.class); + + private final AsyncResponseTransformer delegate; + private final AsyncResponseTransformerListener listener; + + NotifyingAsyncResponseTransformer(AsyncResponseTransformer delegate, + AsyncResponseTransformerListener listener) { + this.delegate = Validate.notNull(delegate, "delegate"); + this.listener = Validate.notNull(listener, "listener"); + } + + @Override + public CompletableFuture prepare() { + return delegate.prepare(); + } + + @Override + public void onResponse(ResponseT response) { + invoke(() -> listener.transformerOnResponse(response), "transformerOnResponse"); + delegate.onResponse(response); + } + + @Override + public void onStream(SdkPublisher publisher) { + invoke(() -> listener.transformerOnStream(publisher), "transformerOnStream"); + delegate.onStream(PublisherListener.wrap(publisher, listener)); + } + + @Override + public void exceptionOccurred(Throwable error) { + invoke(() -> listener.transformerExceptionOccurred(error), "transformerExceptionOccurred"); + delegate.exceptionOccurred(error); + } + + static void invoke(Runnable runnable, String callbackName) { + try { + runnable.run(); + } catch (Exception e) { + log.error(() -> callbackName + " callback failed. This exception will be dropped.", e); + } + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java new file mode 100644 index 000000000000..bbee3e203447 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/PublisherListener.java @@ -0,0 +1,75 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async.listener; + + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * Listener interface that invokes callbacks associated with a {@link Publisher} and any resulting {@link Subscriber}. + * + * @see AsyncResponseTransformerListener + * @see SubscriberListener + */ +@SdkProtectedApi +public interface PublisherListener extends SubscriberListener { + /** + * Invoked before {@link Publisher#subscribe(Subscriber)} + */ + default void publisherSubscribe(Subscriber subscriber) { + } + + /** + * Wrap a {@link SdkPublisher} with a new one that will notify a {@link PublisherListener} of important events occurring. + */ + static SdkPublisher wrap(SdkPublisher delegate, PublisherListener listener) { + return new NotifyingPublisher<>(delegate, listener); + } + + @SdkInternalApi + final class NotifyingPublisher implements SdkPublisher { + private static final Logger log = Logger.loggerFor(NotifyingPublisher.class); + + private final SdkPublisher delegate; + private final PublisherListener listener; + + NotifyingPublisher(SdkPublisher delegate, + PublisherListener listener) { + this.delegate = Validate.notNull(delegate, "delegate"); + this.listener = Validate.notNull(listener, "listener"); + } + + @Override + public void subscribe(Subscriber s) { + invoke(() -> listener.publisherSubscribe(s), "publisherSubscribe"); + delegate.subscribe(SubscriberListener.wrap(s, listener)); + } + + static void invoke(Runnable runnable, String callbackName) { + try { + runnable.run(); + } catch (Exception e) { + log.error(() -> callbackName + " callback failed. This exception will be dropped.", e); + } + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/SubscriberListener.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/SubscriberListener.java new file mode 100644 index 000000000000..ee16f2208dd0 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/listener/SubscriberListener.java @@ -0,0 +1,128 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async.listener; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * Listener interface that invokes callbacks associated with a {@link Subscriber}. + * + * @see AsyncResponseTransformerListener + * @see PublisherListener + */ +@SdkProtectedApi +public interface SubscriberListener { + /** + * Invoked before {@link Subscriber#onNext(Object)} + */ + default void subscriberOnNext(T t) { + } + + /** + * Invoked before {@link Subscriber#onComplete()} + */ + default void subscriberOnComplete() { + } + + /** + * Invoked before {@link Subscriber#onError(Throwable)} + */ + default void subscriberOnError(Throwable t) { + } + + /** + * Invoked before {@link Subscription#cancel()} + */ + default void subscriptionCancel() { + } + + /** + * Wrap a {@link Subscriber} with a new one that will notify a {@link SubscriberListener} of important events occurring. + */ + static Subscriber wrap(Subscriber delegate, SubscriberListener listener) { + return new NotifyingSubscriber<>(delegate, listener); + } + + @SdkInternalApi + final class NotifyingSubscriber implements Subscriber { + private static final Logger log = Logger.loggerFor(NotifyingSubscriber.class); + + private final Subscriber delegate; + private final SubscriberListener listener; + + NotifyingSubscriber(Subscriber delegate, + SubscriberListener listener) { + this.delegate = Validate.notNull(delegate, "delegate"); + this.listener = Validate.notNull(listener, "listener"); + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(new NotifyingSubscription(s)); + } + + @Override + public void onNext(T t) { + invoke(() -> listener.subscriberOnNext(t), "subscriberOnNext"); + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + invoke(() -> listener.subscriberOnError(t), "subscriberOnError"); + delegate.onError(t); + } + + @Override + public void onComplete() { + invoke(listener::subscriberOnComplete, "subscriberOnComplete"); + delegate.onComplete(); + } + + static void invoke(Runnable runnable, String callbackName) { + try { + runnable.run(); + } catch (Exception e) { + log.error(() -> callbackName + " callback failed. This exception will be dropped.", e); + } + } + + @SdkInternalApi + final class NotifyingSubscription implements Subscription { + private final Subscription delegateSubscription; + + NotifyingSubscription(Subscription delegateSubscription) { + this.delegateSubscription = Validate.notNull(delegateSubscription, "delegateSubscription"); + } + + @Override + public void request(long n) { + delegateSubscription.request(n); + } + + @Override + public void cancel() { + invoke(listener::subscriptionCancel, "subscriptionCancel"); + delegateSubscription.cancel(); + } + } + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java new file mode 100644 index 000000000000..d5448a5addcd --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformer.java @@ -0,0 +1,60 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.ResponsePublisher; +import software.amazon.awssdk.core.async.SdkPublisher; + +/** + * Transforms a {@link ResponseT} and {@link ByteBuffer} {@link SdkPublisher} into a {@link ResponsePublisher}. + * + * @param Pojo response type. + * @see AsyncResponseTransformer#toPublisher() + */ +@SdkInternalApi +public final class PublisherAsyncResponseTransformer + implements AsyncResponseTransformer> { + + private volatile CompletableFuture> future; + private volatile ResponseT response; + + @Override + public CompletableFuture> prepare() { + CompletableFuture> f = new CompletableFuture<>(); + this.future = f; + return f; + } + + @Override + public void onResponse(ResponseT response) { + this.response = response; + } + + @Override + public void onStream(SdkPublisher publisher) { + future.complete(new ResponsePublisher<>(response, publisher)); + } + + @Override + public void exceptionOccurred(Throwable error) { + future.completeExceptionally(error); + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ResponsePublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ResponsePublisherTest.java new file mode 100644 index 000000000000..0cb76b6dc78f --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/ResponsePublisherTest.java @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.async; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +class ResponsePublisherTest { + + @Test + void equalsAndHashcode() { + EqualsVerifier.forClass(ResponsePublisher.class) + .withNonnullFields("response", "publisher") + .verify(); + } +} \ No newline at end of file diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java new file mode 100644 index 000000000000..321993f63838 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/PublisherAsyncResponseTransformerTest.java @@ -0,0 +1,91 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.ResponsePublisher; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer.BaosSubscriber; + +class PublisherAsyncResponseTransformerTest { + + private PublisherAsyncResponseTransformer transformer; + private SdkResponse response; + private String publisherStr; + private SdkPublisher publisher; + + @BeforeEach + public void setUp() throws Exception { + transformer = new PublisherAsyncResponseTransformer<>(); + response = Mockito.mock(SdkResponse.class); + publisherStr = UUID.randomUUID().toString(); + publisher = AsyncRequestBody.fromString(publisherStr); + } + + @Test + void successfulResponseAndStream_returnsResponsePublisher() throws Exception { + CompletableFuture> responseFuture = transformer.prepare(); + transformer.onResponse(response); + assertThat(responseFuture.isDone()).isFalse(); + transformer.onStream(publisher); + assertThat(responseFuture.isDone()).isTrue(); + ResponsePublisher responsePublisher = responseFuture.get(); + assertThat(responsePublisher.response()).isEqualTo(response); + String resultStr = drainPublisherToStr(responsePublisher); + assertThat(resultStr).isEqualTo(publisherStr); + } + + @Test + void failedResponse_completesExceptionally() { + CompletableFuture> responseFuture = transformer.prepare(); + assertThat(responseFuture.isDone()).isFalse(); + transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes")); + assertThat(responseFuture.isDone()).isTrue(); + assertThatThrownBy(responseFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + @Test + void failedStream_completesExceptionally() { + CompletableFuture> responseFuture = transformer.prepare(); + transformer.onResponse(response); + assertThat(responseFuture.isDone()).isFalse(); + transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes")); + assertThat(responseFuture.isDone()).isTrue(); + assertThatThrownBy(responseFuture::get) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(RuntimeException.class); + } + + private static String drainPublisherToStr(SdkPublisher publisher) throws Exception { + CompletableFuture bodyFuture = new CompletableFuture<>(); + publisher.subscribe(new BaosSubscriber(bodyFuture)); + byte[] body = bodyFuture.get(); + return new String(body); + } +} \ No newline at end of file diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java index 124117637779..71f6f7b4707a 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadIntegrationTest.java @@ -20,12 +20,15 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.ResponsePublisher; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.testutils.RandomTempFile; @@ -87,4 +90,20 @@ public void download_toBytes() throws Exception { assertThat(Md5Utils.md5AsBase64(result.asByteArray())).isEqualTo(Md5Utils.md5AsBase64(file)); assertThat(result.response().responseMetadata().requestId()).isNotNull(); } + + @Test + public void download_toPublisher() throws Exception { + Download> download = + tm.download(DownloadRequest.builder() + .getObjectRequest(b -> b.bucket(BUCKET).key(KEY)) + .responseTransformer(AsyncResponseTransformer.toPublisher()) + .overrideConfiguration(b -> b.addListener(LoggingTransferListener.create())) + .build()); + CompletedDownload> completedDownload = download.completionFuture().join(); + ResponsePublisher responsePublisher = completedDownload.result(); + ByteBuffer buf = ByteBuffer.allocate(Math.toIntExact(responsePublisher.response().contentLength())); + CompletableFuture drainPublisherFuture = responsePublisher.subscribe(buf::put); + drainPublisherFuture.join(); + assertThat(Md5Utils.md5AsBase64(buf.array())).isEqualTo(Md5Utils.md5AsBase64(file)); + } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java deleted file mode 100644 index 86d4ac9f60cb..000000000000 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncRequestBody.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.transfer.s3.internal.progress; - -import java.nio.ByteBuffer; -import java.util.Optional; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.AsyncRequestBody; - -@SdkInternalApi -public class NotifyingAsyncRequestBody implements AsyncRequestBody { - - public interface AsyncRequestBodyListener { - default void beforeSubscribe(Subscriber subscriber) { - } - - default void beforeOnNext(ByteBuffer byteBuffer) { - } - } - - private final AsyncRequestBody delegate; - private final AsyncRequestBodyListener listener; - - public NotifyingAsyncRequestBody(AsyncRequestBody delegate, - AsyncRequestBodyListener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public Optional contentLength() { - return delegate.contentLength(); - } - - @Override - public String contentType() { - return delegate.contentType(); - } - - @Override - public void subscribe(Subscriber subscriber) { - listener.beforeSubscribe(subscriber); - delegate.subscribe(new NotifyingSubscriber(subscriber, listener)); - } - - @SdkInternalApi - private static final class NotifyingSubscriber implements Subscriber { - private final Subscriber delegate; - private final AsyncRequestBodyListener listener; - - NotifyingSubscriber(Subscriber delegate, - AsyncRequestBodyListener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public void onSubscribe(Subscription s) { - delegate.onSubscribe(s); - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - listener.beforeOnNext(byteBuffer); - delegate.onNext(byteBuffer); - } - - @Override - public void onError(Throwable t) { - delegate.onError(t); - } - - @Override - public void onComplete() { - delegate.onComplete(); - } - } -} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java deleted file mode 100644 index 2af6d4c6750b..000000000000 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/NotifyingAsyncResponseTransformer.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.transfer.s3.internal.progress; - -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.async.SdkPublisher; - -@SdkInternalApi -public class NotifyingAsyncResponseTransformer implements AsyncResponseTransformer { - - public interface AsyncResponseTransformerListener { - default void beforeOnResponse(ResponseT response) { - } - - default void beforeSubscribe(Subscriber subscriber) { - } - - default void beforeOnNext(ByteBuffer byteBuffer) { - } - } - - private final AsyncResponseTransformer delegate; - private final AsyncResponseTransformerListener listener; - - public NotifyingAsyncResponseTransformer(AsyncResponseTransformer delegate, - AsyncResponseTransformerListener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public CompletableFuture prepare() { - return delegate.prepare(); - } - - @Override - public void onResponse(ResponseT response) { - listener.beforeOnResponse(response); - delegate.onResponse(response); - } - - public void onStream(SdkPublisher publisher) { - delegate.onStream(new NotifyingPublisher<>(publisher, listener)); - } - - @Override - public void exceptionOccurred(Throwable error) { - delegate.exceptionOccurred(error); - } - - @SdkInternalApi - private static final class NotifyingPublisher implements SdkPublisher { - private final SdkPublisher delegate; - private final AsyncResponseTransformerListener listener; - - NotifyingPublisher(SdkPublisher delegate, - AsyncResponseTransformerListener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public void subscribe(Subscriber s) { - listener.beforeSubscribe(s); - delegate.subscribe(new NotifyingSubscriber<>(s, listener)); - } - } - - @SdkInternalApi - private static final class NotifyingSubscriber implements Subscriber { - private final Subscriber delegate; - private final AsyncResponseTransformerListener listener; - - NotifyingSubscriber(Subscriber delegate, - AsyncResponseTransformerListener listener) { - this.delegate = delegate; - this.listener = listener; - } - - @Override - public void onSubscribe(Subscription s) { - delegate.onSubscribe(s); - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - listener.beforeOnNext(byteBuffer); - delegate.onNext(byteBuffer); - } - - @Override - public void onError(Throwable t) { - delegate.onError(t); - } - - @Override - public void onComplete() { - delegate.onComplete(); - } - } -} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java index 183592bb54de..33dd4cc6f756 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferListenerInvoker.java @@ -18,6 +18,7 @@ import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.transfer.s3.progress.TransferListener; @@ -32,7 +33,10 @@ @SdkInternalApi public class TransferListenerInvoker implements TransferListener { private static final Logger log = Logger.loggerFor(TransferListener.class); + private final List listeners; + private final AtomicBoolean initiated = new AtomicBoolean(); + private final AtomicBoolean complete = new AtomicBoolean(); public TransferListenerInvoker(List listeners) { this.listeners = Validate.paramNotNull(listeners, "listeners"); @@ -40,7 +44,9 @@ public TransferListenerInvoker(List listeners) { @Override public void transferInitiated(Context.TransferInitiated context) { - forEach(listener -> listener.transferInitiated(context)); + if (!initiated.getAndSet(true)) { + forEach(listener -> listener.transferInitiated(context)); + } } @Override @@ -50,12 +56,16 @@ public void bytesTransferred(Context.BytesTransferred context) { @Override public void transferComplete(Context.TransferComplete context) { - forEach(listener -> listener.transferComplete(context)); + if (!complete.getAndSet(true)) { + forEach(listener -> listener.transferComplete(context)); + } } @Override public void transferFailed(Context.TransferFailed context) { - forEach(listener -> listener.transferFailed(context)); + if (!complete.getAndSet(true)) { + forEach(listener -> listener.transferFailed(context)); + } } private void forEach(Consumer action) { diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index a1114a96ad5b..79f3f9c4b86f 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -23,12 +23,12 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.listener.AsyncRequestBodyListener; +import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.CompletedObjectTransfer; import software.amazon.awssdk.transfer.s3.TransferObjectRequest; import software.amazon.awssdk.transfer.s3.TransferRequestOverrideConfiguration; -import software.amazon.awssdk.transfer.s3.internal.progress.NotifyingAsyncRequestBody.AsyncRequestBodyListener; -import software.amazon.awssdk.transfer.s3.internal.progress.NotifyingAsyncResponseTransformer.AsyncResponseTransformerListener; import software.amazon.awssdk.transfer.s3.progress.TransferListener; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; @@ -42,6 +42,7 @@ public class TransferProgressUpdater { private final DefaultTransferProgress progress; private final TransferListenerContext context; private final TransferListenerInvoker listeners; + private final CompletableFuture endOfStreamFuture; public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody requestBody) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); @@ -55,6 +56,7 @@ public TransferProgressUpdater(TransferObjectRequest request, AsyncRequestBody r listeners = new TransferListenerInvoker(request.overrideConfiguration() .map(TransferRequestOverrideConfiguration::listeners) .orElseGet(Collections::emptyList)); + endOfStreamFuture = new CompletableFuture<>(); } public TransferProgress progress() { @@ -66,69 +68,114 @@ public void transferInitiated() { } public AsyncRequestBody wrapRequestBody(AsyncRequestBody requestBody) { - return new NotifyingAsyncRequestBody( + return AsyncRequestBodyListener.wrap( requestBody, new AsyncRequestBodyListener() { @Override - public void beforeSubscribe(Subscriber subscriber) { - progress.updateAndGet(b -> b.bytesTransferred(0)); + public void publisherSubscribe(Subscriber subscriber) { + resetBytesTransferred(); } @Override - public void beforeOnNext(ByteBuffer byteBuffer) { - TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { - b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit()); - }); - listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + public void subscriberOnNext(ByteBuffer byteBuffer) { + incrementBytesTransferred(byteBuffer.limit()); + } + + @Override + public void subscriberOnError(Throwable t) { + transferFailed(t); + } + + @Override + public void subscriberOnComplete() { + endOfStreamFuture.complete(null); } }); } public AsyncResponseTransformer wrapResponseTransformer( AsyncResponseTransformer responseTransformer) { - return new NotifyingAsyncResponseTransformer<>( + return AsyncResponseTransformerListener.wrap( responseTransformer, - new AsyncResponseTransformerListener() { + new AsyncResponseTransformerListener() { @Override - public void beforeOnResponse(GetObjectResponse response) { + public void transformerOnResponse(GetObjectResponse response) { if (response.contentLength() != null) { progress.updateAndGet(b -> b.transferSizeInBytes(response.contentLength())); } } @Override - public void beforeSubscribe(Subscriber subscriber) { - progress.updateAndGet(b -> b.bytesTransferred(0)); + public void transformerExceptionOccurred(Throwable t) { + transferFailed(t); } @Override - public void beforeOnNext(ByteBuffer byteBuffer) { - TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { - b.bytesTransferred(b.getBytesTransferred() + byteBuffer.limit()); - }); - listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + public void publisherSubscribe(Subscriber subscriber) { + resetBytesTransferred(); + } + + @Override + public void subscriberOnNext(ByteBuffer byteBuffer) { + incrementBytesTransferred(byteBuffer.limit()); + } + + @Override + public void subscriberOnError(Throwable t) { + transferFailed(t); + } + + @Override + public void subscriberOnComplete() { + endOfStreamFuture.complete(null); } }); } + private void resetBytesTransferred() { + progress.updateAndGet(b -> b.bytesTransferred(0)); + } + + private void incrementBytesTransferred(int numBytes) { + TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { + b.bytesTransferred(b.getBytesTransferred() + numBytes); + }); + listeners.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + } + public void registerCompletion(CompletableFuture future) { future.whenComplete((r, t) -> { if (t == null) { - listeners.transferComplete(context.copy(b -> { - b.progressSnapshot(progress.snapshot()); - b.completedTransfer(r); - })); + endOfStreamFuture.whenComplete((r2, t2) -> { + if (t2 == null) { + transferComplete(r); + } else { + transferFailed(t2); + } + }); } else { - listeners.transferFailed(TransferListenerFailedContext.builder() - .transferContext(context.copy(b -> { - b.progressSnapshot(progress.snapshot()); - })) - .exception(t) - .build()); + transferFailed(t); } }); } + private void transferComplete(CompletedObjectTransfer r) { + listeners.transferComplete(context.copy(b -> { + TransferProgressSnapshot snapshot = progress.snapshot(); + b.progressSnapshot(snapshot); + b.completedTransfer(r); + })); + } + + private void transferFailed(Throwable t) { + listeners.transferFailed(TransferListenerFailedContext.builder() + .transferContext(context.copy(b -> { + b.progressSnapshot(progress.snapshot()); + })) + .exception(t) + .build()); + } + private static Optional getContentLengthSafe(AsyncRequestBody requestBody) { if (requestBody == null) { return Optional.empty(); diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/GetObjectAsyncIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/GetObjectAsyncIntegrationTest.java index db48fbe37a7c..5a632d4071e3 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/GetObjectAsyncIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/GetObjectAsyncIntegrationTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.ResponsePublisher; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.interceptor.Context; @@ -97,6 +98,16 @@ public void toByteArray() throws IOException { byte[] returned = s3Async.getObject(getObjectRequest, AsyncResponseTransformer.toBytes()).join().asByteArray(); assertThat(returned).isEqualTo(Files.readAllBytes(file.toPath())); } + + @Test + public void toPublisher() throws IOException { + ResponsePublisher responsePublisher = + s3Async.getObject(getObjectRequest, AsyncResponseTransformer.toPublisher()).join(); + ByteBuffer buf = ByteBuffer.allocate(Math.toIntExact(responsePublisher.response().contentLength())); + CompletableFuture drainPublisherFuture = responsePublisher.subscribe(buf::put); + drainPublisherFuture.join(); + assertThat(buf.array()).isEqualTo(Files.readAllBytes(file.toPath())); + } @Test public void customResponseHandler_InterceptorRecievesResponsePojo() throws Exception {