Skip to content

Commit a3998eb

Browse files
committed
Attach SdkHttpResponse to the responses of event streaming opeartions
1 parent 2fa64ca commit a3998eb

File tree

6 files changed

+127
-63
lines changed

6 files changed

+127
-63
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "bugfix",
4+
"description": "Attach `SdkHttpResponse` to the responses of event streaming operations."
5+
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import software.amazon.awssdk.codegen.poet.PoetExtensions;
4444
import software.amazon.awssdk.codegen.poet.eventstream.EventStreamUtils;
4545
import software.amazon.awssdk.core.SdkResponse;
46+
import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler;
4647
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
4748
import software.amazon.awssdk.core.http.HttpResponseHandler;
4849
import software.amazon.awssdk.core.internal.protocol.json.VoidJsonUnmarshaller;
@@ -112,51 +113,20 @@ public CodeBlock responseHandler(IntermediateModel model, OperationModel opModel
112113

113114
// TODO remove this once kinesis supports CBOR for event streaming
114115
String protocolFactory = opModel.hasEventStreamOutput() ? "jsonProtocolFactory" : "protocolFactory";
115-
CodeBlock.Builder builder = CodeBlock
116-
.builder()
117-
.add("\n\n$T<$T> responseHandler = $L.createResponseHandler(new $T()" +
118-
" .withPayloadJson($L)" +
119-
" .withHasStreamingSuccessResponse($L), new $T());",
120-
HttpResponseHandler.class,
121-
pojoResponseType,
122-
protocolFactory,
123-
JsonOperationMetadata.class,
124-
!opModel.getHasBlobMemberAsPayload(),
125-
opModel.hasStreamingOutput(),
126-
unmarshaller);
116+
CodeBlock.Builder builder = CodeBlock.builder();
127117
if (opModel.hasEventStreamOutput()) {
128-
builder.add("\n\n$T<$T> voidResponseHandler = $L.createResponseHandler(new $T()" +
129-
" .withPayloadJson(false)" +
130-
" .withHasStreamingSuccessResponse(true), new $T());",
118+
responseHandlersForEventStreaming(opModel, unmarshaller, pojoResponseType, protocolFactory, builder);
119+
} else {
120+
builder.add("\n\n$T<$T> responseHandler = $L.createResponseHandler(new $T()" +
121+
" .withPayloadJson($L)" +
122+
" .withHasStreamingSuccessResponse($L), new $T());",
131123
HttpResponseHandler.class,
132-
SdkResponse.class,
124+
pojoResponseType,
133125
protocolFactory,
134126
JsonOperationMetadata.class,
135-
VoidJsonUnmarshaller.class);
136-
EventStreamUtils eventStreamUtils = EventStreamUtils.create(poetExtensions, opModel);
137-
ClassName eventStreamBaseClass = eventStreamUtils.eventStreamBaseClass();
138-
builder
139-
.add("\n\n$T<$T> eventResponseHandler = $L.createResponseHandler(new $T()" +
140-
" .withPayloadJson($L)" +
141-
" .withHasStreamingSuccessResponse($L), "
142-
+ "$T.builder()",
143-
HttpResponseHandler.class,
144-
WildcardTypeName.subtypeOf(eventStreamBaseClass),
145-
protocolFactory,
146-
JsonOperationMetadata.class,
147-
true,
148-
false,
149-
ClassName.get(EventStreamTaggedUnionJsonUnmarshaller.class));
150-
151-
eventStreamUtils.getEventStreamMembers()
152-
.forEach(m -> {
153-
String unmarshallerClassName = m.getShape().getVariable().getVariableType() + "Unmarshaller";
154-
builder.add(".addUnmarshaller(\"$L\", $T.getInstance())\n",
155-
m.getC2jName(),
156-
poetExtensions.getTransformClass(unmarshallerClassName));
157-
});
158-
builder.add(".defaultUnmarshaller((in) -> $T.UNKNOWN)\n"
159-
+ ".build());\n", eventStreamUtils.eventStreamBaseClass());
127+
!opModel.getHasBlobMemberAsPayload(),
128+
opModel.hasStreamingOutput(),
129+
unmarshaller);
160130
}
161131
return builder.build();
162132
}
@@ -375,4 +345,56 @@ private ClassName baseExceptionClassName(IntermediateModel model) {
375345

376346
return ClassName.get(exceptionPath, model.getSdkModeledExceptionBaseClassName());
377347
}
348+
349+
350+
/**
351+
* Add responseHandlers for event streaming operations
352+
*/
353+
private void responseHandlersForEventStreaming(OperationModel opModel, ClassName unmarshaller, TypeName pojoResponseType,
354+
String protocolFactory, CodeBlock.Builder builder) {
355+
builder.add("\n\n$T<$T> responseHandler = new $T($L.createResponseHandler(new $T()" +
356+
" .withPayloadJson($L)" +
357+
" .withHasStreamingSuccessResponse($L), new $T()));",
358+
HttpResponseHandler.class,
359+
pojoResponseType,
360+
AttachHttpMetadataResponseHandler.class,
361+
protocolFactory,
362+
JsonOperationMetadata.class,
363+
!opModel.getHasBlobMemberAsPayload(),
364+
opModel.hasStreamingOutput(),
365+
unmarshaller);
366+
367+
builder.add("\n\n$T<$T> voidResponseHandler = $L.createResponseHandler(new $T()" +
368+
" .withPayloadJson(false)" +
369+
" .withHasStreamingSuccessResponse(true), new $T());",
370+
HttpResponseHandler.class,
371+
SdkResponse.class,
372+
protocolFactory,
373+
JsonOperationMetadata.class,
374+
VoidJsonUnmarshaller.class);
375+
EventStreamUtils eventStreamUtils = EventStreamUtils.create(poetExtensions, opModel);
376+
ClassName eventStreamBaseClass = eventStreamUtils.eventStreamBaseClass();
377+
builder
378+
.add("\n\n$T<$T> eventResponseHandler = $L.createResponseHandler(new $T()" +
379+
" .withPayloadJson($L)" +
380+
" .withHasStreamingSuccessResponse($L), "
381+
+ "$T.builder()",
382+
HttpResponseHandler.class,
383+
WildcardTypeName.subtypeOf(eventStreamBaseClass),
384+
protocolFactory,
385+
JsonOperationMetadata.class,
386+
true,
387+
false,
388+
ClassName.get(EventStreamTaggedUnionJsonUnmarshaller.class));
389+
390+
eventStreamUtils.getEventStreamMembers()
391+
.forEach(m -> {
392+
String unmarshallerClassName = m.getShape().getVariable().getVariableType() + "Unmarshaller";
393+
builder.add(".addUnmarshaller(\"$L\", $T.getInstance())\n",
394+
m.getC2jName(),
395+
poetExtensions.getTransformClass(unmarshallerClassName));
396+
});
397+
builder.add(".defaultUnmarshaller((in) -> $T.UNKNOWN)\n"
398+
+ ".build());\n", eventStreamUtils.eventStreamBaseClass());
399+
}
378400
}

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-async-client-class.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2525
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
2626
import software.amazon.awssdk.core.client.handler.AsyncClientHandler;
27+
import software.amazon.awssdk.core.client.handler.AttachHttpMetadataResponseHandler;
2728
import software.amazon.awssdk.core.client.handler.ClientExecutionParams;
2829
import software.amazon.awssdk.core.http.HttpResponseHandler;
2930
import software.amazon.awssdk.core.internal.client.config.SdkClientConfiguration;
@@ -216,9 +217,9 @@ public CompletableFuture<Void> eventStreamOperation(EventStreamOperationRequest
216217
EventStreamOperationResponseHandler asyncResponseHandler) {
217218
try {
218219

219-
HttpResponseHandler<EventStreamOperationResponse> responseHandler = jsonProtocolFactory.createResponseHandler(
220-
new JsonOperationMetadata().withPayloadJson(true).withHasStreamingSuccessResponse(false),
221-
new EventStreamOperationResponseUnmarshaller());
220+
HttpResponseHandler<EventStreamOperationResponse> responseHandler = new AttachHttpMetadataResponseHandler(
221+
jsonProtocolFactory.createResponseHandler(new JsonOperationMetadata().withPayloadJson(true)
222+
.withHasStreamingSuccessResponse(false), new EventStreamOperationResponseUnmarshaller()));
222223

223224
HttpResponseHandler<SdkResponse> voidResponseHandler = jsonProtocolFactory.createResponseHandler(
224225
new JsonOperationMetadata().withPayloadJson(false).withHasStreamingSuccessResponse(true),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.client.handler;
17+
18+
import software.amazon.awssdk.annotations.SdkProtectedApi;
19+
import software.amazon.awssdk.core.SdkResponse;
20+
import software.amazon.awssdk.core.http.HttpResponseHandler;
21+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
22+
import software.amazon.awssdk.http.SdkHttpFullResponse;
23+
import software.amazon.awssdk.http.SdkHttpResponse;
24+
25+
/**
26+
* Decorate {@link HttpResponseHandler} to attach {@link SdkHttpResponse} to the response object.
27+
*/
28+
@SdkProtectedApi
29+
public final class AttachHttpMetadataResponseHandler<T extends SdkResponse> implements HttpResponseHandler<T> {
30+
31+
private final HttpResponseHandler<T> delegate;
32+
33+
public AttachHttpMetadataResponseHandler(HttpResponseHandler<T> delegate) {
34+
this.delegate = delegate;
35+
}
36+
37+
@Override
38+
@SuppressWarnings("unchecked")
39+
public T handle(SdkHttpFullResponse response, ExecutionAttributes executionAttributes) throws Exception {
40+
return (T) delegate.handle(response, executionAttributes)
41+
.toBuilder()
42+
.sdkHttpResponse(response)
43+
.build();
44+
}
45+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/client/handler/BaseClientHandler.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import software.amazon.awssdk.core.internal.interceptor.ExecutionInterceptorChain;
3434
import software.amazon.awssdk.core.internal.interceptor.InterceptorContext;
3535
import software.amazon.awssdk.http.SdkHttpFullRequest;
36-
import software.amazon.awssdk.http.SdkHttpResponse;
3736

3837
@SdkProtectedApi
3938
public abstract class BaseClientHandler {
@@ -117,21 +116,6 @@ private static <OutputT extends SdkResponse> OutputT runAfterUnmarshallingInterc
117116
return (OutputT) interceptorContext.response();
118117
}
119118

120-
/**
121-
* Add {@link SdkHttpResponse} to SdkResponse.
122-
*/
123-
@SuppressWarnings("unchecked")
124-
private static <OutputT extends SdkResponse> HttpResponseHandler<OutputT> addHttpResponseMetadataResponseHandler(
125-
HttpResponseHandler<OutputT> delegate) {
126-
return (response, executionAttributes) -> {
127-
OutputT sdkResponse = delegate.handle(response, executionAttributes);
128-
129-
return (OutputT) sdkResponse.toBuilder()
130-
.sdkHttpResponse(response)
131-
.build();
132-
};
133-
}
134-
135119
static <OutputT extends SdkResponse> HttpResponseHandler<OutputT> interceptorCalling(
136120
HttpResponseHandler<OutputT> delegate, ExecutionContext context) {
137121
return (response, executionAttributes) ->
@@ -182,6 +166,6 @@ protected boolean isCalculateCrc32FromCompressedData() {
182166
<OutputT extends SdkResponse> HttpResponseHandler<OutputT> decorateResponseHandlers(
183167
HttpResponseHandler<OutputT> delegate, ExecutionContext executionContext) {
184168
HttpResponseHandler<OutputT> interceptorCallingResponseHandler = interceptorCalling(delegate, executionContext);
185-
return addHttpResponseMetadataResponseHandler(interceptorCallingResponseHandler);
169+
return new AttachHttpMetadataResponseHandler<>(interceptorCallingResponseHandler);
186170
}
187171
}

services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.core.SdkBytes;
4141
import software.amazon.awssdk.core.async.SdkPublisher;
4242
import software.amazon.awssdk.http.SdkCancellationException;
43+
import software.amazon.awssdk.http.SdkHttpResponse;
4344
import software.amazon.awssdk.regions.Region;
4445
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
4546
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
@@ -106,13 +107,13 @@ public void subscribeToShard_ReceivesAllData() {
106107
SubscribeToShardResponseHandler.builder()
107108
.onEventStream(p -> p.filter(SubscribeToShardEvent.class)
108109
.subscribe(eventConsumer))
110+
.onResponse(this::verifyHttpMetadata)
109111
.build())
110112
.join();
111113
producer.shutdown();
112114
// Make sure we all the data we received was data we published, we may have published more
113115
// if the producer isn't shutdown immediately after we finish subscribing.
114116
assertThat(producedData).containsSequence(receivedData);
115-
116117
}
117118

118119
@Test
@@ -126,7 +127,7 @@ public void cancelledSubscription_DoesNotCallTerminalMethods() {
126127
new SubscribeToShardResponseHandler() {
127128
@Override
128129
public void responseReceived(SubscribeToShardResponse response) {
129-
130+
verifyHttpMetadata(response);
130131
}
131132

132133
@Override
@@ -222,4 +223,10 @@ private Optional<SdkBytes> putRecord() {
222223
}
223224
}
224225

226+
private void verifyHttpMetadata(SubscribeToShardResponse response) {
227+
SdkHttpResponse sdkHttpResponse = response.sdkHttpResponse();
228+
assertThat(sdkHttpResponse).isNotNull();
229+
assertThat(sdkHttpResponse.isSuccessful()).isTrue();
230+
assertThat(sdkHttpResponse.headers()).isNotEmpty();
231+
}
225232
}

0 commit comments

Comments
 (0)