diff --git a/.changes/next-release/bugfix-NettyNIOHTTPClient-dd0a119.json b/.changes/next-release/bugfix-NettyNIOHTTPClient-dd0a119.json new file mode 100644 index 000000000000..44a4572b0506 --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOHTTPClient-dd0a119.json @@ -0,0 +1,5 @@ +{ + "category": "Netty NIO HTTP Client", + "type": "bugfix", + "description": "Fix a bug that could pollute non SDK threads with `ThreadLocal`'s when allocating memory. See [#1133](https://github.com/aws/aws-sdk-java-v2/issues/1133)" +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java index 5cde5eaeb973..3a751ee48cdb 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java @@ -26,6 +26,7 @@ import com.typesafe.netty.http.HttpStreamsClientHandler; import com.typesafe.netty.http.StreamedHttpRequest; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -170,8 +171,7 @@ private void writeRequest(HttpRequest request) { channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(), TimeUnit.MILLISECONDS)); StreamedRequest streamedRequest = new StreamedRequest(request, - context.executeRequest().requestContentPublisher(), - channel); + context.executeRequest().requestContentPublisher()); channel.writeAndFlush(streamedRequest) .addListener(wireCall -> { // Done writing so remove the idle write timeout handler @@ -410,16 +410,14 @@ public String toString() { private static class StreamedRequest extends DelegateHttpRequest implements StreamedHttpRequest { private final Publisher publisher; - private final Channel channel; private final Optional requestContentLength; private long written = 0L; private boolean done; private Subscription subscription; - StreamedRequest(HttpRequest request, Publisher publisher, Channel channel) { + StreamedRequest(HttpRequest request, Publisher publisher) { super(request); this.publisher = publisher; - this.channel = channel; this.requestContentLength = contentLength(request); } @@ -433,17 +431,16 @@ public void onSubscribe(Subscription subscription) { } @Override - public void onNext(ByteBuffer byteBuffer) { + public void onNext(ByteBuffer contentBytes) { if (done) { return; } try { - int newLimit = clampedBufferLimit(byteBuffer.remaining()); - byteBuffer.limit(newLimit); - ByteBuf buffer = channel.alloc().buffer(byteBuffer.remaining()); - buffer.writeBytes(byteBuffer); - HttpContent content = new DefaultHttpContent(buffer); + int newLimit = clampedBufferLimit(contentBytes.remaining()); + contentBytes.limit(newLimit); + ByteBuf contentByteBuf = Unpooled.wrappedBuffer(contentBytes); + HttpContent content = new DefaultHttpContent(contentByteBuf); subscriber.onNext(content); written += newLimit;