Skip to content

Updated crt http client to use latest crt api, which uses byte[] for … #1498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
77fc5aa
Amazon Pinpoint Update: This release of the Amazon Pinpoint API intro…
Nov 1, 2019
84875f9
AWS Database Migration Service Update: This release contains task tim…
Nov 1, 2019
c089a5f
AWS CloudTrail Update: This release adds two new APIs, GetTrail and L…
Nov 1, 2019
51468b6
Release 2.10.7. Updated CHANGELOG.md, README.md and all pom.xml.
Nov 1, 2019
e8c5504
Merge pull request #657 from aws/staging/388a3a8b-bd1c-4999-82b9-975d…
aws-sdk-java-automation Nov 1, 2019
879d14c
Update to next snapshot version: 2.10.8-SNAPSHOT
Nov 1, 2019
be6229e
Amazon DynamoDB Accelerator (DAX) Update: Documentation updates for dax
Nov 4, 2019
f79e252
AWS RoboMaker Update: RoboMaker Fleet Management launch a feature to …
Nov 4, 2019
f8fd5cf
Amazon Elastic Compute Cloud Update: Documentation updates for ec2
Nov 4, 2019
68618ad
Release 2.10.8. Updated CHANGELOG.md, README.md and all pom.xml.
Nov 4, 2019
691ca79
Merge pull request #658 from aws/staging/d62065eb-2729-42ac-b74e-3da1…
aws-sdk-java-automation Nov 4, 2019
500a2d3
Update to next snapshot version: 2.10.9-SNAPSHOT
Nov 4, 2019
4122966
Updated crt http client to use latest crt api, which uses byte[] for …
JonathanHenson Nov 4, 2019
d4974d7
Removed old comment.
JonathanHenson Nov 4, 2019
39d9894
Merge branch 'master' into crt-api-update
JonathanHenson Nov 4, 2019
ec5a086
Fixed build failures.
JonathanHenson Nov 5, 2019
446cb74
Fixed benchmark builds.
JonathanHenson Nov 5, 2019
7713022
Removed unused import.
JonathanHenson Nov 5, 2019
f4c106b
Fixed SPI test.
JonathanHenson Nov 5, 2019
3757da1
Hopefully fix the build this time.
JonathanHenson Nov 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.3.35</version>
<version>0.4.1</version>
</dependency>

<!--SDK dependencies-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
Validate.notNull(uri, "URI must not be null");
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);

return new HttpClientConnectionManager(bootstrap, socketOptions, tlsContext, uri,
HttpClientConnectionManager.DEFAULT_MAX_BUFFER_SIZE, windowSize,
return new HttpClientConnectionManager(bootstrap, socketOptions, tlsContext, uri, windowSize,
maxConnectionsPerEndpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void onResponseHeadersDone(HttpStream stream, int headerType) {
}

@Override
public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
initRespBodyPublisherIfNeeded(stream);

if (respBodyPublisher == null) {
Expand All @@ -99,10 +99,10 @@ public int onResponseBody(HttpStream stream, ByteBuffer bodyBytesIn) {

// Queue a Deep Copy since bodyBytesIn is only guaranteed to contain valid memory for the lifetime of this
// function call, and it's memory can be reused once this function returns.
respBodyPublisher.queueBuffer(deepCopy(bodyBytesIn));
respBodyPublisher.queueBuffer(bodyBytesIn);
respBodyPublisher.publishToSubscribers();

if (bodyBytesIn.remaining() != 0) {
if (bodyBytesIn.length != 0) {
throw new IllegalStateException("Unprocessed bytes remain in bodyBytesIn Buffer!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AwsCrtResponseBodyPublisher implements Publisher<ByteBuffer> {
private final AtomicInteger mutualRecursionDepth = new AtomicInteger(0);
private final AtomicInteger queuedBytes = new AtomicInteger(0);
private final AtomicReference<Subscriber<? super ByteBuffer>> subscriberRef = new AtomicReference<>(null);
private final Queue<ByteBuffer> queuedBuffers = new ConcurrentLinkedQueue<>();
private final Queue<byte[]> queuedBuffers = new ConcurrentLinkedQueue<>();
private final AtomicReference<Throwable> error = new AtomicReference<>(null);

/**
Expand Down Expand Up @@ -97,18 +97,18 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
* Adds a Buffer to the Queue to be published to any Subscribers
* @param buffer The Buffer to be queued.
*/
public void queueBuffer(ByteBuffer buffer) {
public void queueBuffer(byte[] buffer) {
Validate.notNull(buffer, "ByteBuffer must not be null");

if (isCancelled.get()) {
// Immediately open HttpStream's IO window so it doesn't see any IO Back-pressure.
// AFAIK there's no way to abort an in-progress HttpStream, only free it's memory by calling close()
stream.incrementWindow(buffer.remaining());
stream.incrementWindow(buffer.length);
return;
}

queuedBuffers.add(buffer);
int totalBytesQueued = queuedBytes.addAndGet(buffer.remaining());
int totalBytesQueued = queuedBytes.addAndGet(buffer.length);

if (totalBytesQueued > windowSize) {
throw new IllegalStateException("Queued more than Window Size: queued=" + totalBytesQueued
Expand Down Expand Up @@ -230,10 +230,10 @@ protected synchronized void publishToSubscribers() {
int totalAmountTransferred = 0;

while (outstandingRequests.get() > 0 && queuedBuffers.size() > 0) {
ByteBuffer buffer = queuedBuffers.poll();
byte[] buffer = queuedBuffers.poll();
outstandingRequests.getAndUpdate(DECREMENT_IF_GREATER_THAN_ZERO);
int amount = buffer.remaining();
publishWithoutMutualRecursion(subscriberRef.get(), buffer);
int amount = buffer.length;
publishWithoutMutualRecursion(subscriberRef.get(), ByteBuffer.wrap(buffer));
totalAmountTransferred += amount;
}

Expand Down