|
20 | 20 | import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKeys.REQUEST_CONTEXT_KEY;
|
21 | 21 | import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKeys.RESPONSE_COMPLETE_KEY;
|
22 | 22 |
|
| 23 | +import com.typesafe.netty.http.HttpStreamsClientHandler; |
23 | 24 | import com.typesafe.netty.http.StreamedHttpResponse;
|
24 | 25 | import io.netty.buffer.ByteBuf;
|
25 | 26 | import io.netty.channel.ChannelHandler.Sharable;
|
|
31 | 32 | import io.netty.handler.codec.http.HttpObject;
|
32 | 33 | import io.netty.handler.codec.http.HttpResponse;
|
33 | 34 | import io.netty.handler.codec.http.HttpUtil;
|
| 35 | +import io.netty.handler.codec.http.LastHttpContent; |
34 | 36 | import io.netty.util.AttributeKey;
|
35 | 37 | import java.io.IOException;
|
36 | 38 | import java.nio.ByteBuffer;
|
@@ -75,12 +77,10 @@ protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg
|
75 | 77 | if (msg instanceof StreamedHttpResponse) {
|
76 | 78 | requestContext.handler().onStream(new PublisherAdapter((StreamedHttpResponse) msg, channelContext, requestContext));
|
77 | 79 | } else if (msg instanceof FullHttpResponse) {
|
78 |
| - // TODO: HttpStreamsClientHandler leaves a dangling LastHttpContent |
79 |
| - // in the pipeline. Consume it ourselves to make sure the channel |
80 |
| - // is empty at the end of stream and before releasing it back to he |
81 |
| - // pool. The HttpStreamsClientHandler should really be doing this |
82 |
| - // for us. |
83 |
| - channelContext.read(); |
| 80 | + // Be prepared to take care of (ignore) a trailing LastHttpResponse |
| 81 | + // from the HttpClientCodec if there is one. |
| 82 | + channelContext.pipeline().replace(HttpStreamsClientHandler.class, |
| 83 | + channelContext.name() + "-LastHttpContentSwallower", new LastHttpContentSwallower()); |
84 | 84 |
|
85 | 85 | ByteBuf fullContent = ((FullHttpResponse) msg).content();
|
86 | 86 | final ByteBuffer bb = copyToByteBuffer(fullContent);
|
@@ -239,4 +239,20 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
|
239 | 239 | .set(subscriber);
|
240 | 240 | }
|
241 | 241 | }
|
| 242 | + |
| 243 | + |
| 244 | + private static class LastHttpContentSwallower extends SimpleChannelInboundHandler<HttpObject> { |
| 245 | + @Override |
| 246 | + protected void channelRead0(ChannelHandlerContext ctx, HttpObject obj) throws Exception { |
| 247 | + if (obj instanceof LastHttpContent) { |
| 248 | + // Queue another read to make up for the one we just ignored |
| 249 | + ctx.read(); |
| 250 | + } else { |
| 251 | + ctx.fireChannelRead(obj); |
| 252 | + } |
| 253 | + // Remove self from pipeline since we only care about potentially |
| 254 | + // ignoring the very first message |
| 255 | + ctx.pipeline().remove(this); |
| 256 | + } |
| 257 | + } |
242 | 258 | }
|
0 commit comments