From 10437e3a7b8dfb83501f64a817ea1f5fa31911a0 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 30 Oct 2017 16:42:05 -0500 Subject: [PATCH 1/3] Release IO buffer on outbound error `OutboundMessageHandler` writes message to `ByteBuf`. It allocates a dedicated IO buffer for this. Writing can fail for various reasons (unsupported byte arrays, etc). Previously this IO buffer has not been released in case of an error. When writing is successful IO buffer is released by then channel pipeline. This commit makes `OutboundMessageHandler` release buffer before rethrowing the exception. --- .../internal/async/outbound/OutboundMessageHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java index 35e2f2b70f..05bc216309 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java @@ -65,14 +65,14 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List out try { writer.write( msg ); + output.stop(); } catch ( Throwable error ) - { - throw new EncoderException( "Failed to write outbound message: " + msg, error ); - } - finally { output.stop(); + // release buffer because it will not get added to the out list and no other handler is going to handle it + messageBuf.release(); + throw new EncoderException( "Failed to write outbound message: " + msg, error ); } if ( log.isTraceEnabled() ) From 3f4cb71c0d45ba9a34ccae1d190d8f2bdd2d54ea Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 14 Nov 2017 17:38:39 +0100 Subject: [PATCH 2/3] Fixed couple buffer leaks in tests --- .../async/inbound/MessageDecoder.java | 10 ++++---- .../internal/async/ChannelAttributesTest.java | 7 ------ .../async/ChannelConnectedListenerTest.java | 2 +- .../async/HandshakeCompletedListenerTest.java | 4 ++-- .../async/HandshakeResponseHandlerTest.java | 2 +- .../async/NettyChannelInitializerTest.java | 14 +++++++---- .../async/inbound/ChunkDecoderTest.java | 16 ++++++++++--- .../inbound/InboundMessageHandlerTest.java | 2 +- .../async/inbound/MessageDecoderTest.java | 24 ++++++++++++------- .../outbound/OutboundMessageHandlerTest.java | 12 +++------- .../pool/NettyChannelHealthCheckerTest.java | 8 +++---- .../handlers/InitResponseHandlerTest.java | 7 ++++++ .../handlers/ResetResponseHandlerTest.java | 2 +- .../org/neo4j/driver/v1/util/TestUtil.java | 14 ++++++++--- 14 files changed, 74 insertions(+), 50 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java index 4165a1c7c7..74da541747 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java @@ -41,21 +41,21 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio } @Override - protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) throws Exception + protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) { if ( readMessageBoundary ) { // now we have a complete message in the input buffer - // increment ref count of the buffer because we will pass it's duplicate through - in.retain(); - ByteBuf res = in.duplicate(); + // increment ref count of the buffer and create it's duplicate that shares the content + // duplicate will be the output of this decoded and input for the next one + ByteBuf messageBuf = in.retainedDuplicate(); // signal that whole message was read by making input buffer seem like it was fully read/consumed in.readerIndex( in.readableBytes() ); // pass the full message to the next handler in the pipeline - out.add( res ); + out.add( messageBuf ); readMessageBoundary = false; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java index beff3714d4..5e84d3c13f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java @@ -19,7 +19,6 @@ package org.neo4j.driver.internal.async; import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.After; import org.junit.Test; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; @@ -47,12 +46,6 @@ public class ChannelAttributesTest { private final EmbeddedChannel channel = new EmbeddedChannel(); - @After - public void tearDown() throws Exception - { - channel.close(); - } - @Test public void shouldSetAndGetAddress() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java index 6401168edc..d3fdbf1fa3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java @@ -45,7 +45,7 @@ public class ChannelConnectedListenerTest @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java index 9de415b6ce..97650bfc44 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java @@ -49,9 +49,9 @@ public class HandshakeCompletedListenerTest private final EmbeddedChannel channel = new EmbeddedChannel(); @After - public void tearDown() throws Exception + public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java index 84a5ca694a..b4acd812a3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java @@ -59,7 +59,7 @@ public void setUp() @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java index 379dbd68a9..c1232b31b5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java @@ -20,6 +20,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.ssl.SslHandler; +import org.junit.After; import org.junit.Test; import org.neo4j.driver.internal.security.SecurityPlan; @@ -31,20 +32,27 @@ import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress; -import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT; public class NettyChannelInitializerTest { + private final EmbeddedChannel channel = new EmbeddedChannel(); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception { SecurityPlan security = SecurityPlan.forAllCertificates(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertNotNull( channel.pipeline().get( SslHandler.class ) ); @@ -56,7 +64,6 @@ public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() SecurityPlan security = SecurityPlan.insecure(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertNull( channel.pipeline().get( SslHandler.class ) ); @@ -70,7 +77,6 @@ public void shouldUpdateChannelAttributes() SecurityPlan security = SecurityPlan.insecure(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertEquals( LOCAL_DEFAULT, serverAddress( channel ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java index 20bf245ec7..5ab70724c7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; import org.junit.Test; import static io.netty.buffer.Unpooled.buffer; @@ -28,9 +29,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals; public class ChunkDecoderTest { + private final EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldDecodeFullChunk() { @@ -54,7 +64,7 @@ public void shouldDecodeFullChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and expected body - assertEquals( input.slice( 2, 7 ), channel.readInbound() ); + assertByteBufEquals( input.slice( 2, 7 ), channel.readInbound() ); } @Test @@ -97,7 +107,7 @@ public void shouldDecodeSplitChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and expected body - assertEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() ); } @Test @@ -113,6 +123,6 @@ public void shouldDecodeEmptyChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and empty body - assertEquals( wrappedBuffer( new byte[0] ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java index 9d060906a0..32f4cc4ed8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java @@ -76,7 +76,7 @@ public void tearDown() { if ( channel != null ) { - channel.close(); + channel.finishAndReleaseAll(); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java index 4ff26a7789..92795ace32 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java @@ -19,33 +19,39 @@ package org.neo4j.driver.internal.async.inbound; import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; import org.junit.Test; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals; public class MessageDecoderTest { + private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldDecodeMessageWithSingleChunk() { - EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); - assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ) ) ); assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) ); assertTrue( channel.finish() ); assertEquals( 1, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() ); } @Test public void shouldDecodeMessageWithMultipleChunks() { - EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); - assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3} ) ) ); assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{4, 5} ) ) ); assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{6, 7, 8} ) ) ); @@ -53,7 +59,7 @@ public void shouldDecodeMessageWithMultipleChunks() assertTrue( channel.finish() ); assertEquals( 1, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() ); } @Test @@ -73,8 +79,8 @@ public void shouldDecodeMultipleConsecutiveMessages() channel.writeInbound( wrappedBuffer( new byte[0] ) ); assertEquals( 3, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() ); - assertEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() ); - assertEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java index 77bc8ab26b..060250c97f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java @@ -60,24 +60,18 @@ public class OutboundMessageHandlerTest { - private EmbeddedChannel channel; - private InboundMessageDispatcher messageDispatcher; + private final EmbeddedChannel channel = new EmbeddedChannel(); @Before public void setUp() { - channel = new EmbeddedChannel(); - messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) ); } @After public void tearDown() { - if ( channel != null ) - { - channel.close(); - } + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index 381564992b..b1139df7cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -38,12 +38,12 @@ import static org.neo4j.driver.internal.async.ChannelAttributes.setCreationTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; -import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; import static org.neo4j.driver.internal.async.pool.PoolSettings.NOT_CONFIGURED; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -53,15 +53,15 @@ public class NettyChannelHealthCheckerTest private final InboundMessageDispatcher dispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); @Before - public void setUp() throws Exception + public void setUp() { setMessageDispatcher( channel, dispatcher ); } @After - public void tearDown() throws Exception + public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java index d9a24d1c5a..3a1dd62c90 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.EncoderException; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -63,6 +64,12 @@ public void setUp() pipeline.addLast( new ChannelErrorHandler( DEV_NULL_LOGGING ) ); } + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldSetServerVersionOnChannel() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index f55306fe2d..d12849e141 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -41,7 +41,7 @@ public class ResetResponseHandlerTest @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index a78038600d..fa771b0c5f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -110,7 +110,7 @@ public static void assertByteBufContains( ByteBuf buf, Number... values ) } finally { - buf.release(); + releaseIfPossible( buf ); } } @@ -122,8 +122,8 @@ public static void assertByteBufEquals( ByteBuf expected, ByteBuf actual ) } finally { - expected.release(); - actual.release(); + releaseIfPossible( expected ); + releaseIfPossible( actual ); } } @@ -240,4 +240,12 @@ else if ( value instanceof Double ) "Unexpected number: '" + value + "' or type" + value.getClass() ); } } + + private static void releaseIfPossible( ByteBuf buf ) + { + if ( buf.refCnt() > 0 ) + { + buf.release(); + } + } } From 8ad66262d0768127ece767f70f39bc8f022462ae Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 14 Nov 2017 18:19:11 +0100 Subject: [PATCH 3/3] Increase future wait timeout in tests Because couple tests take much more time with Netty buffer leak detector enabled with paranoid leak detection level. --- driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index fa771b0c5f..2abd5e6506 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -72,7 +72,7 @@ public static > T await( U future ) { try { - return future.get( 1, MINUTES ); + return future.get( 5, MINUTES ); } catch ( InterruptedException e ) {