Skip to content

Stop publishing after Content-Length bytes #539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO Async HTTP Client",
"type": "bugfix",
"description": "Fix the Netty async client to stop publishing to the request stream once `Content-Length` is reached."
}
13 changes: 11 additions & 2 deletions http-clients/netty-nio-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,22 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -120,4 +129,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.util.concurrent.Future;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -277,44 +278,96 @@ public String toString() {
/**
* Decorator around {@link StreamedHttpRequest} to adapt a publisher of {@link ByteBuffer} (i.e. {@link
* software.amazon.awssdk.http.async.SdkHttpRequestProvider}) to a publisher of {@link HttpContent}.
* <p />
* This publisher also prevents the adapted publisher from publishing more content to the subscriber than
* the specified 'Content-Length' of the request.
*/
private static class StreamedRequest extends DelegateHttpRequest implements StreamedHttpRequest {

private final Publisher<ByteBuffer> publisher;
private final Channel channel;
private final Optional<Long> requestContentLength;
private long written = 0L;
private boolean done;
private Subscription subscription;

StreamedRequest(HttpRequest request, Publisher<ByteBuffer> publisher, Channel channel) {
super(request);
this.publisher = publisher;
this.channel = channel;
this.requestContentLength = contentLength(request);
}

@Override
public void subscribe(Subscriber<? super HttpContent> subscriber) {
publisher.subscribe(new Subscriber<ByteBuffer>() {
@Override
public void onSubscribe(Subscription subscription) {
StreamedRequest.this.subscription = subscription;
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
if (done) {
return;
}

int newLimit = clampedBufferLimit(byteBuffer.remaining());
byteBuffer.limit(newLimit);
ByteBuf buffer = channel.alloc().buffer(byteBuffer.remaining());
buffer.writeBytes(byteBuffer);
HttpContent content = new DefaultHttpContent(buffer);

subscriber.onNext(content);
written += newLimit;

if (!shouldContinuePublishing()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to cancel or can we just call on complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also cancel the subscription to so the adapted publisher can do any cleanup if it wanted to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can just do that in onComplete right?

If a Publisher signals either onError or onComplete on a Subscriber, that Subscriber’s Subscription MUST be considered cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I might be confused but since our adapter sits between the streamed request subscriber and the user supplied request content publisher, we need to cancel the subscription to the content publisher and call onComplete the streamed request right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was reading again and found this so this looks right to me.

The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to call cancel so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.

done = true;
subscription.cancel();
subscriber.onComplete();
}
}

@Override
public void onError(Throwable t) {
subscriber.onError(t);
if (!done) {
done = true;
subscriber.onError(t);

}
}

@Override
public void onComplete() {
subscriber.onComplete();
if (!done) {
done = true;
subscriber.onComplete();
}
}
});
}

private int clampedBufferLimit(int bufLen) {
return requestContentLength.map(cl ->
(int) Math.min(cl - written, bufLen)
).orElse(bufLen);
}

private boolean shouldContinuePublishing() {
return requestContentLength.map(cl -> written < cl).orElse(true);
}

private static Optional<Long> contentLength(HttpRequest request) {
String value = request.headers().get("Content-Length");
if (value != null) {
try {
return Optional.of(Long.parseLong(value));
} catch (NumberFormatException e) {
log.warn("Unable to parse 'Content-Length' header. Treating it as non existent.");
}
}
return Optional.empty();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,31 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.assertj.core.api.Condition;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -76,14 +84,23 @@
@RunWith(MockitoJUnitRunner.class)
public class NettyNioAsyncHttpClientWireMockTest {

private final RecordingNetworkTrafficListener wiremockTrafficListener = new RecordingNetworkTrafficListener();

@Rule
public WireMockRule mockServer = new WireMockRule(wireMockConfig().dynamicPort().dynamicHttpsPort());
public WireMockRule mockServer = new WireMockRule(wireMockConfig()
.dynamicPort()
.dynamicHttpsPort()
.networkTrafficListener(wiremockTrafficListener));

@Mock
private SdkRequestContext requestContext;

private static SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder()
.buildWithDefaults(mapWithTrustAllCerts());
private static SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder().buildWithDefaults(mapWithTrustAllCerts());

@Before
public void methodSetup() {
wiremockTrafficListener.reset();
}

@AfterClass
public static void tearDown() throws Exception {
Expand Down Expand Up @@ -227,6 +244,30 @@ public void canSendContentAndGetThatContentBack() throws Exception {
assertThat(recorder.fullResponseAsString()).isEqualTo(reverse(body));
}

@Test
public void requestContentOnlyEqualToContentLengthHeaderFromProvider() throws InterruptedException, ExecutionException, TimeoutException, IOException {
final String content = randomAlphabetic(32);
final String streamContent = content + reverse(content);
stubFor(any(urlEqualTo("/echo?reversed=true"))
.withRequestBody(equalTo(content))
.willReturn(aResponse().withBody(reverse(content))));
URI uri = URI.create("http://localhost:" + mockServer.port());

SdkHttpFullRequest request = createRequest(uri, "/echo", streamContent, SdkHttpMethod.POST, singletonMap("reversed", "true"));
request = request.toBuilder().header("Content-Length", Integer.toString(content.length())).build();
RecordingResponseHandler recorder = new RecordingResponseHandler();


client.prepareRequest(request, requestContext, createProvider(streamContent), recorder).run();

recorder.completeFuture.get(5, TimeUnit.SECONDS);

// HTTP servers will stop processing the request as soon as it reads
// bytes equal to 'Content-Length' so we need to inspect the raw
// traffic to ensure that there wasn't anything after that.
assertThat(wiremockTrafficListener.requests.toString()).endsWith(content);
}

private void assertCanReceiveBasicRequest(URI uri, String body) throws Exception {
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withHeader("Some-Header", "With Value").withBody(body)));

Expand Down Expand Up @@ -275,11 +316,11 @@ public void cancel() {
};
}

private SdkHttpRequest createRequest(URI uri) {
private SdkHttpFullRequest createRequest(URI uri) {
return createRequest(uri, "/", null, SdkHttpMethod.GET, emptyMap());
}

private SdkHttpRequest createRequest(URI uri,
private SdkHttpFullRequest createRequest(URI uri,
String resourcePath,
String body,
SdkHttpMethod method,
Expand Down Expand Up @@ -379,4 +420,33 @@ private static AttributeMap mapWithTrustAllCerts() {
.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true)
.build();
}

private static class RecordingNetworkTrafficListener implements WiremockNetworkTrafficListener {
private final StringBuilder requests = new StringBuilder();


@Override
public void opened(Socket socket) {

}

@Override
public void incoming(Socket socket, ByteBuffer byteBuffer) {
requests.append(StandardCharsets.UTF_8.decode(byteBuffer));
}

@Override
public void outgoing(Socket socket, ByteBuffer byteBuffer) {

}

@Override
public void closed(Socket socket) {

}

public void reset() {
requests.setLength(0);
}
}
}