Skip to content

Add new AsyncResponseTransformer: toPublisher() #2837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a8d8804
Add new AsyncResponseTransformer: toPublisher()
Nov 10, 2021
c7412e3
Fix grammar
Nov 10, 2021
1fbd8cd
Merge branch 'master' into response-publisher
Bennett-Lynch Nov 10, 2021
5c6167f
Make classes final
Nov 10, 2021
b0e140e
Merge branch 'master' into response-publisher
Bennett-Lynch Jan 6, 2022
738f53b
Migrate unit test to JUnit 5
Jan 6, 2022
618ae30
Update generated clients to record metrics based on end-of-stream
Jan 24, 2022
a109db6
Use response content length in integ tests
Jan 24, 2022
200aab1
Remove unnecessary assertions
Jan 24, 2022
f3d946a
Merge branch 'master' into response-publisher
Bennett-Lynch Jan 24, 2022
ca6c65d
Add change log entry
Jan 24, 2022
1b36bc2
Update change log entry
Jan 24, 2022
41a333e
Revert unrelated changes
Jan 24, 2022
02185c2
Refactor & reorganize listener interfaces
Jan 25, 2022
37c1c83
Remove default wrapWithListener methods & make SdkProtectedApi
Jan 25, 2022
2c0b034
Incorporate feedback
Jan 26, 2022
a8ec3f8
Merge branch 'master' into response-publisher
Bennett-Lynch Jan 26, 2022
102d544
Create AsyncResponseTransformerUtils and fix misc. check style
Jan 26, 2022
d15c0f2
Merge branch 'master' into response-publisher
Bennett-Lynch Jan 26, 2022
c5c50f2
Create AsyncRequestBodyListener & invoke listeners before delegating
Jan 27, 2022
0793b52
Preserve EventListeningSubscriber usage for do-after semantics
Jan 27, 2022
35ef88c
Revert formatting changes to generated class tests
Jan 27, 2022
d07c4ce
Use effectively final var for test class expectations
Jan 27, 2022
47a6edf
Fix JsonProtocolSpec code gen
Jan 27, 2022
302ccab
Prevent unnecessary string concatenation
Jan 28, 2022
68a2f6d
Merge branch 'master' into response-publisher
Bennett-Lynch Jan 28, 2022
fe2b126
Use SDK logger & add equals/hashcode tests
Jan 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-0e09833.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "feature",
"description": "Add new AsyncResponseTransformer: toPublisher(). This transformer allows users to directly consume a streaming-response payload (i.e., S3 GetObject) without having to buffer to memory or disk. This also allows users of Reactor/RxJava to more easily consume a streaming response (e.g., via Flux#from(Publisher))."
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,6 +65,8 @@
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
import software.amazon.awssdk.codegen.poet.model.EventStreamSpecHelper;
import software.amazon.awssdk.core.RequestOverrideConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.NotifyingAsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
Expand All @@ -78,6 +81,7 @@
import software.amazon.awssdk.protocols.json.AwsJsonProtocolFactory;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.awssdk.utils.Pair;

public final class AsyncClientClass extends AsyncClientInterface {
private final IntermediateModel model;
Expand Down Expand Up @@ -244,7 +248,31 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
CoreMetric.class, "SERVICE_ID", model.getMetadata().getServiceId());
builder.addStatement("apiCallMetricCollector.reportMetric($T.$L, $S)",
CoreMetric.class, "OPERATION_NAME", opModel.getOperationName());


if (opModel.hasStreamingOutput()) {
ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());

builder.addStatement("$T<$T<$T, ReturnT>, $T<$T>> $N = $T.wrapWithEndOfStreamFuture($N)",
Pair.class,
AsyncResponseTransformer.class,
responseType,
CompletableFuture.class,
Void.class,
"pair",
NotifyingAsyncResponseTransformer.class,
"asyncResponseTransformer");

builder.addStatement("$N = $N.left()",
"asyncResponseTransformer",
"pair");

builder.addStatement("$T<$T> $N = $N.right()",
CompletableFuture.class,
Void.class,
"endOfStreamFuture",
"pair");
}

if (shouldUseAsyncWithBodySigner(opModel)) {
builder.addCode(applyAsyncWithBodyV4SignerOverride(opModel));
} else {
Expand Down Expand Up @@ -312,8 +340,14 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
.beginControlFlow("catch ($T t)", Throwable.class);

// For streaming operations we also want to notify the response handler of any exception.
if (opModel.hasStreamingOutput()) {
ClassName responseType = poetExtensions.getModelClass(opModel.getReturnType().getReturnType());
builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer",
AsyncResponseTransformer.class,
responseType);
}
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
String paramName = opModel.hasStreamingOutput() ? "asyncResponseTransformer" : "asyncResponseHandler";
String paramName = opModel.hasStreamingOutput() ? "finalAsyncResponseTransformer" : "asyncResponseHandler";
builder.addStatement("runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\",\n" +
"() -> $N.exceptionOccurred(t))", paramName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ default String streamingOutputWhenComplete(String responseHandlerName) {
+ " runAndLogError(log, \"Exception thrown in exceptionOccurred callback, ignoring\", () "
+ "-> %s.exceptionOccurred(e));%n"
+ " }%n"
+ "%s"
+ " endOfStreamFuture.whenComplete((r2, e2) -> {%n"
+ " %s%n"
+ " });"
+ "})", responseHandlerName, publishMetrics());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.awssdk.codegen.model.intermediate.OperationModel;
import software.amazon.awssdk.codegen.poet.PoetExtensions;
import software.amazon.awssdk.codegen.poet.client.traits.HttpChecksumRequiredTrait;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.protocols.query.AwsQueryProtocolFactory;
Expand Down Expand Up @@ -151,8 +152,11 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
executeFutureValueType), whenCompleteFutureName);
if (opModel.hasStreamingOutput()) {
builder.addStatement("$T<$T, ReturnT> finalAsyncResponseTransformer = asyncResponseTransformer",
AsyncResponseTransformer.class,
pojoResponseType);
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName,
streamingOutputWhenComplete("asyncResponseTransformer"));
streamingOutputWhenComplete("finalAsyncResponseTransformer"));
} else {
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName, publishMetricsWhenComplete());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
s3ArnableFields(opModel, model).ifPresent(builder::add);

builder.add(".withInput($L)", opModel.getInput().getVariableName());
if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
if (opModel.hasEventStreamOutput()) {
builder.add(", $N", executionResponseTransformerName);
}
builder.addStatement(")");
Expand All @@ -218,7 +218,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
builder.addStatement("$T $N = null", ParameterizedTypeName.get(ClassName.get(CompletableFuture.class),
executeFutureValueType), whenCompleteFutureName);

if (opModel.hasStreamingOutput() || opModel.hasEventStreamOutput()) {
if (opModel.hasEventStreamOutput()) {
builder.addStatement("$N = executeFuture$L", whenCompleteFutureName,
whenCompleteBlock(opModel, "asyncResponseHandler",
eventStreamTransformFutureName));
Expand Down
Loading