Skip to content

Commit 5edc44a

Browse files
authored
aws-crt-client refactor to use new 'utils' publishers and subscribers (#3507)
* Updated aws-crt-client to use new 'utils' publishers for its request body logic. Added two new publishers to 'utils': 1. StoringSubscriber - A subscriber that stores the events it receives in memory for retrieval. 2. ByteBufferStoringSubscriber - A subscriber that stores byte buffers it recevies in memory for retrieval. * Updated aws-crt-client to use new 'utils' publishers for its response body logic. (#3490) * Updated aws-crt-client to use new 'utils' publishers for its response body logic. Added a new publisher to 'utils': SimplePublisher - A publisher that exposes simple `write`, `complete` and `error` methods, and avoids the caller needing to be concerned about backpressure or following the reactive streams specification. * Addressed comments.
1 parent 1d2aa77 commit 5edc44a

18 files changed

+2239
-669
lines changed

build-tools/src/main/resources/software/amazon/awssdk/spotbugs-suppressions.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@
206206
<Bug pattern="URF_UNREAD_FIELD, RV_RETURN_VALUE_IGNORED"/>
207207
</Match>
208208

209+
<Match>
210+
<Class name="software.amazon.awssdk.utils.async.StoringSubscriber"/>
211+
<Method name="drop"/>
212+
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
213+
</Match>
214+
215+
209216
<Match>
210217
<Class name="software.amazon.awssdk.http.crt.internal.response.CrtResponseBodyPublisher" />
211218
<Method name="subscribe"/>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
322322
*
323323
* @param readBufferSize The number of bytes that can be buffered
324324
* @return The builder of the method chaining.
325+
*
326+
* TODO: This is also used for the write buffer size. Should we rename it?
325327
*/
326328
Builder readBufferSize(int readBufferSize);
327329

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public CompletableFuture<Void> execute(CrtRequestContext executionContext) {
5252

5353
HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
5454
HttpStreamResponseHandler crtResponseHandler =
55-
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, executionContext);
55+
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
5656

5757
// Submit the request on the connection
5858
try {

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,27 @@
1919
import software.amazon.awssdk.annotations.SdkInternalApi;
2020
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
2121
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
22-
import software.amazon.awssdk.utils.Validate;
22+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
23+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
2324

24-
/**
25-
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
26-
*/
2725
@SdkInternalApi
2826
final class CrtRequestBodyAdapter implements HttpRequestBodyStream {
29-
private final int windowSize;
30-
private final CrtRequestBodySubscriber requestBodySubscriber;
27+
private final SdkHttpContentPublisher requestPublisher;
28+
private final ByteBufferStoringSubscriber requestBodySubscriber;
3129

32-
CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, int windowSize) {
33-
this.windowSize = Validate.isPositive(windowSize, "windowSize is <= 0");
34-
this.requestBodySubscriber = new CrtRequestBodySubscriber(windowSize);
30+
CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, int readLimit) {
31+
this.requestPublisher = requestPublisher;
32+
this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit);
3533
requestPublisher.subscribe(requestBodySubscriber);
3634
}
3735

3836
@Override
3937
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
40-
return requestBodySubscriber.transferRequestBody(bodyBytesOut);
38+
return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM;
39+
}
40+
41+
@Override
42+
public long getLength() {
43+
return requestPublisher.contentLength().orElse(0L);
4144
}
4245
}

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodySubscriber.java

Lines changed: 0 additions & 132 deletions
This file was deleted.

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.http.crt.internal.response;
1717

18+
import java.nio.ByteBuffer;
1819
import java.util.concurrent.CompletableFuture;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
2021
import software.amazon.awssdk.crt.CRT;
@@ -26,10 +27,10 @@
2627
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
2728
import software.amazon.awssdk.http.HttpStatusFamily;
2829
import software.amazon.awssdk.http.SdkHttpResponse;
29-
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
30-
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
30+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
3131
import software.amazon.awssdk.utils.Logger;
3232
import software.amazon.awssdk.utils.Validate;
33+
import software.amazon.awssdk.utils.async.SimplePublisher;
3334

3435
/**
3536
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
@@ -39,97 +40,110 @@ public final class CrtResponseAdapter implements HttpStreamResponseHandler {
3940
private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class);
4041

4142
private final HttpClientConnection connection;
42-
private final CompletableFuture<Void> responseComplete;
43-
private final AsyncExecuteRequest sdkRequest;
44-
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
45-
private final int windowSize;
46-
private CrtResponseBodyPublisher respBodyPublisher;
43+
private final CompletableFuture<Void> completionFuture;
44+
private final SdkAsyncHttpResponseHandler responseHandler;
45+
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();
4746

48-
private CrtResponseAdapter(HttpClientConnection connection,
49-
CompletableFuture<Void> responseComplete,
50-
AsyncExecuteRequest sdkRequest,
51-
int windowSize) {
52-
this.connection = Validate.notNull(connection, "HttpConnection is null");
53-
this.responseComplete = Validate.notNull(responseComplete, "reqComplete Future is null");
54-
this.sdkRequest = Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
55-
this.windowSize = Validate.isPositive(windowSize, "windowSize is <= 0");
56-
}
47+
private final SdkHttpResponse.Builder responseBuilder = SdkHttpResponse.builder();
5748

58-
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection connection,
59-
CompletableFuture<Void> responseComplete,
60-
CrtRequestContext request) {
61-
return new CrtResponseAdapter(connection, responseComplete, request.sdkRequest(), request.readBufferSize());
49+
private CrtResponseAdapter(HttpClientConnection connection,
50+
CompletableFuture<Void> completionFuture,
51+
SdkAsyncHttpResponseHandler responseHandler) {
52+
this.connection = Validate.paramNotNull(connection, "connection");
53+
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
54+
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
6255
}
6356

64-
private void initRespBodyPublisherIfNeeded(HttpStream stream) {
65-
if (respBodyPublisher == null) {
66-
respBodyPublisher = new CrtResponseBodyPublisher(connection, stream, responseComplete, windowSize);
67-
}
57+
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
58+
CompletableFuture<Void> requestFuture,
59+
SdkAsyncHttpResponseHandler responseHandler) {
60+
return new CrtResponseAdapter(crtConn, requestFuture, responseHandler);
6861
}
6962

7063
@Override
71-
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
72-
initRespBodyPublisherIfNeeded(stream);
73-
74-
for (HttpHeader h : nextHeaders) {
75-
respBuilder.appendHeader(h.getName(), h.getValue());
64+
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
65+
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
66+
for (HttpHeader h : nextHeaders) {
67+
responseBuilder.appendHeader(h.getName(), h.getValue());
68+
}
7669
}
7770
}
7871

7972
@Override
8073
public void onResponseHeadersDone(HttpStream stream, int headerType) {
8174
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
82-
initRespBodyPublisherIfNeeded(stream);
83-
84-
respBuilder.statusCode(stream.getResponseStatusCode());
85-
sdkRequest.responseHandler().onHeaders(respBuilder.build());
86-
sdkRequest.responseHandler().onStream(respBodyPublisher);
75+
responseBuilder.statusCode(stream.getResponseStatusCode());
76+
responseHandler.onHeaders(responseBuilder.build());
77+
responseHandler.onStream(responsePublisher);
8778
}
8879
}
8980

9081
@Override
9182
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
92-
initRespBodyPublisherIfNeeded(stream);
83+
CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));
84+
85+
if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
86+
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
87+
return bodyBytesIn.length;
88+
}
9389

94-
respBodyPublisher.queueBuffer(bodyBytesIn);
95-
respBodyPublisher.publishToSubscribers();
90+
writeFuture.whenComplete((result, failure) -> {
91+
if (failure != null) {
92+
failResponseHandlerAndFuture(stream, failure);
93+
return;
94+
}
95+
96+
stream.incrementWindow(bodyBytesIn.length);
97+
});
9698

97-
/*
98-
* Intentionally zero. We manually manage the crt stream's window within the body publisher by updating with
99-
* the exact amount we were able to push to the subcriber.
100-
*
101-
* See the call to stream.incrementWindow() in AwsCrtResponseBodyPublisher.
102-
*/
10399
return 0;
104100
}
105101

106102
@Override
107103
public void onResponseComplete(HttpStream stream, int errorCode) {
108-
initRespBodyPublisherIfNeeded(stream);
109-
110-
if (HttpStatusFamily.of(respBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
111-
connection.shutdown();
112-
}
113-
114104
if (errorCode == CRT.AWS_CRT_SUCCESS) {
115-
log.debug(() -> "Response Completed Successfully");
116-
respBodyPublisher.setQueueComplete();
117-
respBodyPublisher.publishToSubscribers();
105+
onSuccessfulResponseComplete(stream);
118106
} else {
119-
HttpException error = new HttpException(errorCode);
120-
log.error(() -> "Response Encountered an Error.", error);
121-
122-
// Invoke Error Callback on SdkAsyncHttpResponseHandler
123-
try {
124-
sdkRequest.responseHandler().onError(error);
125-
} catch (Exception e) {
126-
log.error(() -> String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError: %s",
127-
sdkRequest.responseHandler(), e));
107+
onFailedResponseComplete(stream, new HttpException(errorCode));
108+
}
109+
}
110+
111+
private void onSuccessfulResponseComplete(HttpStream stream) {
112+
responsePublisher.complete().whenComplete((result, failure) -> {
113+
if (failure != null) {
114+
failResponseHandlerAndFuture(stream, failure);
115+
return;
116+
}
117+
118+
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
119+
connection.shutdown();
128120
}
129121

130-
// Invoke Error Callback on any Subscriber's of the Response Body
131-
respBodyPublisher.setError(error);
132-
respBodyPublisher.publishToSubscribers();
122+
connection.close();
123+
stream.close();
124+
completionFuture.complete(null);
125+
});
126+
}
127+
128+
private void onFailedResponseComplete(HttpStream stream, HttpException error) {
129+
log.error(() -> "HTTP response encountered an error.", error);
130+
responsePublisher.error(error);
131+
failResponseHandlerAndFuture(stream, error);
132+
}
133+
134+
private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
135+
callResponseHandlerOnError(error);
136+
completionFuture.completeExceptionally(error);
137+
connection.shutdown();
138+
connection.close();
139+
stream.close();
140+
}
141+
142+
private void callResponseHandlerOnError(Throwable error) {
143+
try {
144+
responseHandler.onError(error);
145+
} catch (RuntimeException e) {
146+
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
133147
}
134148
}
135149
}

0 commit comments

Comments
 (0)