Skip to content

Commit 62485f9

Browse files
committed
Use Unpooled in StreamedRequest#onNext
the onNext method can be called from a non SDK owned thread, and if we allocate the ByteBuf using the pooling allocator, then we pollute the customer's thread with a ThreadLocal. Using Unpooled#wrappedBuffer avoids actually copying the arrays so this should be more performant as well. Fixes #1133
1 parent 9f5ae80 commit 62485f9

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"type": "bugfix",
4+
"description": "Fix a bug that could pollute non SDK threads with `ThreadLocal`'s when allocating memory. See [#1133](https://github.com/aws/aws-sdk-java-v2/issues/1133)"
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.typesafe.netty.http.HttpStreamsClientHandler;
2727
import com.typesafe.netty.http.StreamedHttpRequest;
2828
import io.netty.buffer.ByteBuf;
29+
import io.netty.buffer.Unpooled;
2930
import io.netty.channel.Channel;
3031
import io.netty.channel.ChannelOption;
3132
import io.netty.channel.ChannelPipeline;
@@ -170,8 +171,7 @@ private void writeRequest(HttpRequest request) {
170171
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
171172
TimeUnit.MILLISECONDS));
172173
StreamedRequest streamedRequest = new StreamedRequest(request,
173-
context.executeRequest().requestContentPublisher(),
174-
channel);
174+
context.executeRequest().requestContentPublisher());
175175
channel.writeAndFlush(streamedRequest)
176176
.addListener(wireCall -> {
177177
// Done writing so remove the idle write timeout handler
@@ -410,16 +410,14 @@ public String toString() {
410410
private static class StreamedRequest extends DelegateHttpRequest implements StreamedHttpRequest {
411411

412412
private final Publisher<ByteBuffer> publisher;
413-
private final Channel channel;
414413
private final Optional<Long> requestContentLength;
415414
private long written = 0L;
416415
private boolean done;
417416
private Subscription subscription;
418417

419-
StreamedRequest(HttpRequest request, Publisher<ByteBuffer> publisher, Channel channel) {
418+
StreamedRequest(HttpRequest request, Publisher<ByteBuffer> publisher) {
420419
super(request);
421420
this.publisher = publisher;
422-
this.channel = channel;
423421
this.requestContentLength = contentLength(request);
424422
}
425423

@@ -433,17 +431,16 @@ public void onSubscribe(Subscription subscription) {
433431
}
434432

435433
@Override
436-
public void onNext(ByteBuffer byteBuffer) {
434+
public void onNext(ByteBuffer contentBytes) {
437435
if (done) {
438436
return;
439437
}
440438

441439
try {
442-
int newLimit = clampedBufferLimit(byteBuffer.remaining());
443-
byteBuffer.limit(newLimit);
444-
ByteBuf buffer = channel.alloc().buffer(byteBuffer.remaining());
445-
buffer.writeBytes(byteBuffer);
446-
HttpContent content = new DefaultHttpContent(buffer);
440+
int newLimit = clampedBufferLimit(contentBytes.remaining());
441+
contentBytes.limit(newLimit);
442+
ByteBuf contentByteBuf = Unpooled.wrappedBuffer(contentBytes);
443+
HttpContent content = new DefaultHttpContent(contentByteBuf);
447444

448445
subscriber.onNext(content);
449446
written += newLimit;

0 commit comments

Comments
 (0)