Skip to content

Commit 6cf6e30

Browse files
JonathanHensonrccarper
authored andcommitted
Updated crt http client to use latest crt api, which uses byte[] for response body.
1 parent a3ba0de commit 6cf6e30

File tree

4 files changed

+12
-13
lines changed

4 files changed

+12
-13
lines changed

http-clients/aws-crt-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<dependency>
3434
<groupId>software.amazon.awssdk.crt</groupId>
3535
<artifactId>aws-crt</artifactId>
36-
<version>0.3.35</version>
36+
<version>0.4.1</version>
3737
</dependency>
3838

3939
<!--SDK dependencies-->

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
135135
Validate.notNull(uri, "URI must not be null");
136136
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
137137

138-
return new HttpClientConnectionManager(bootstrap, socketOptions, tlsContext, uri,
139-
HttpClientConnectionManager.DEFAULT_MAX_BUFFER_SIZE, windowSize,
138+
return new HttpClientConnectionManager(bootstrap, socketOptions, tlsContext, uri, windowSize,
140139
maxConnectionsPerEndpoint);
141140
}
142141

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtAsyncHttpStreamAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void onResponseHeadersDone(HttpStream stream, int headerType) {
8989
}
9090

9191
@Override
92-
public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
92+
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
9393
initRespBodyPublisherIfNeeded(stream);
9494

9595
if (respBodyPublisher == null) {
@@ -99,10 +99,10 @@ public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
9999

100100
// Queue a Deep Copy since bodyBytesIn is only guaranteed to contain valid memory for the lifetime of this
101101
// function call, and it's memory can be reused once this function returns.
102-
respBodyPublisher.queueBuffer(deepCopy(bodyBytesIn));
102+
respBodyPublisher.queueBuffer(bodyBytesIn);
103103
respBodyPublisher.publishToSubscribers();
104104

105-
if (bodyBytesIn.remaining() != 0) {
105+
if (bodyBytesIn.length != 0) {
106106
throw new IllegalStateException("Unprocessed bytes remain in bodyBytesIn Buffer!");
107107
}
108108

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/AwsCrtResponseBodyPublisher.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
5353
private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0);
5454
private final AtomicInteger queuedBytes = new AtomicInteger(0);
5555
private final AtomicReference<Subscriber<? super ByteBuffer>> subscriberRef = new AtomicReference<>(null);
56-
private final Queue<ByteBuffer> queuedBuffers = new ConcurrentLinkedQueue<>();
56+
private final Queue<byte[]> queuedBuffers = new ConcurrentLinkedQueue<>();
5757
private final AtomicReference<Throwable> error = new AtomicReference<>(null);
5858

5959
/**
@@ -97,18 +97,18 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
9797
* Adds a Buffer to the Queue to be published to any Subscribers
9898
* @param buffer The Buffer to be queued.
9999
*/
100-
public void queueBuffer(ByteBuffer buffer) {
100+
public void queueBuffer(byte[] buffer) {
101101
Validate.notNull(buffer, "ByteBuffer must not be null");
102102

103103
if (isCancelled.get()) {
104104
// Immediately open HttpStream's IO window so it doesn't see any IO Back-pressure.
105105
// AFAIK there's no way to abort an in-progress HttpStream, only free it's memory by calling close()
106-
stream.incrementWindow(buffer.remaining());
106+
stream.incrementWindow(buffer.length);
107107
return;
108108
}
109109

110110
queuedBuffers.add(buffer);
111-
int totalBytesQueued = queuedBytes.addAndGet(buffer.remaining());
111+
int totalBytesQueued = queuedBytes.addAndGet(buffer.length);
112112

113113
if (totalBytesQueued > windowSize) {
114114
throw new IllegalStateException("Queued more than Window Size: queued=" + totalBytesQueued
@@ -230,10 +230,10 @@ protected synchronized void publishToSubscribers() {
230230
int totalAmountTransferred = 0;
231231

232232
while (outstandingRequests.get() > 0 && queuedBuffers.size() > 0) {
233-
ByteBuffer buffer = queuedBuffers.poll();
233+
byte[] buffer = queuedBuffers.poll();
234234
outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO);
235-
int amount = buffer.remaining();
236-
publishWithoutMutualRecursion(subscriberRef.get(), buffer);
235+
int amount = buffer.length;
236+
publishWithoutMutualRecursion(subscriberRef.get(), ByteBuffer.wrap(buffer));
237237
totalAmountTransferred += amount;
238238
}
239239

0 commit comments

Comments
 (0)