diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java index 4165a1c7c7..74da541747 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java @@ -41,21 +41,21 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio } @Override - protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) throws Exception + protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) { if ( readMessageBoundary ) { // now we have a complete message in the input buffer - // increment ref count of the buffer because we will pass it's duplicate through - in.retain(); - ByteBuf res = in.duplicate(); + // increment ref count of the buffer and create it's duplicate that shares the content + // duplicate will be the output of this decoded and input for the next one + ByteBuf messageBuf = in.retainedDuplicate(); // signal that whole message was read by making input buffer seem like it was fully read/consumed in.readerIndex( in.readableBytes() ); // pass the full message to the next handler in the pipeline - out.add( res ); + out.add( messageBuf ); readMessageBoundary = false; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java index 35e2f2b70f..05bc216309 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java @@ -65,14 +65,14 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List out try { writer.write( msg ); + output.stop(); } catch ( Throwable error ) - { - throw new EncoderException( "Failed to write outbound message: " + msg, error ); - } - finally { output.stop(); + // release buffer because it will not get added to the out list and no other handler is going to handle it + messageBuf.release(); + throw new EncoderException( "Failed to write outbound message: " + msg, error ); } if ( log.isTraceEnabled() ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java index beff3714d4..5e84d3c13f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java @@ -19,7 +19,6 @@ package org.neo4j.driver.internal.async; import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.After; import org.junit.Test; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; @@ -47,12 +46,6 @@ public class ChannelAttributesTest { private final EmbeddedChannel channel = new EmbeddedChannel(); - @After - public void tearDown() throws Exception - { - channel.close(); - } - @Test public void shouldSetAndGetAddress() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java index 6401168edc..d3fdbf1fa3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java @@ -45,7 +45,7 @@ public class ChannelConnectedListenerTest @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java index 9de415b6ce..97650bfc44 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java @@ -49,9 +49,9 @@ public class HandshakeCompletedListenerTest private final EmbeddedChannel channel = new EmbeddedChannel(); @After - public void tearDown() throws Exception + public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java index 84a5ca694a..b4acd812a3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java @@ -59,7 +59,7 @@ public void setUp() @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java index 379dbd68a9..c1232b31b5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java @@ -20,6 +20,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.ssl.SslHandler; +import org.junit.After; import org.junit.Test; import org.neo4j.driver.internal.security.SecurityPlan; @@ -31,20 +32,27 @@ import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress; -import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT; public class NettyChannelInitializerTest { + private final EmbeddedChannel channel = new EmbeddedChannel(); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception { SecurityPlan security = SecurityPlan.forAllCertificates(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertNotNull( channel.pipeline().get( SslHandler.class ) ); @@ -56,7 +64,6 @@ public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() SecurityPlan security = SecurityPlan.insecure(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertNull( channel.pipeline().get( SslHandler.class ) ); @@ -70,7 +77,6 @@ public void shouldUpdateChannelAttributes() SecurityPlan security = SecurityPlan.insecure(); NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock ); - EmbeddedChannel channel = new EmbeddedChannel(); initializer.initChannel( channel ); assertEquals( LOCAL_DEFAULT, serverAddress( channel ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java index 20bf245ec7..5ab70724c7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; import org.junit.Test; import static io.netty.buffer.Unpooled.buffer; @@ -28,9 +29,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals; public class ChunkDecoderTest { + private final EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldDecodeFullChunk() { @@ -54,7 +64,7 @@ public void shouldDecodeFullChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and expected body - assertEquals( input.slice( 2, 7 ), channel.readInbound() ); + assertByteBufEquals( input.slice( 2, 7 ), channel.readInbound() ); } @Test @@ -97,7 +107,7 @@ public void shouldDecodeSplitChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and expected body - assertEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() ); } @Test @@ -113,6 +123,6 @@ public void shouldDecodeEmptyChunk() // there should only be a single chunk available for reading assertEquals( 1, channel.inboundMessages().size() ); // it should have no size header and empty body - assertEquals( wrappedBuffer( new byte[0] ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java index 9d060906a0..32f4cc4ed8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java @@ -76,7 +76,7 @@ public void tearDown() { if ( channel != null ) { - channel.close(); + channel.finishAndReleaseAll(); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java index 4ff26a7789..92795ace32 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java @@ -19,33 +19,39 @@ package org.neo4j.driver.internal.async.inbound; import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; import org.junit.Test; import static io.netty.buffer.Unpooled.wrappedBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals; public class MessageDecoderTest { + private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldDecodeMessageWithSingleChunk() { - EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); - assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ) ) ); assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) ); assertTrue( channel.finish() ); assertEquals( 1, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() ); } @Test public void shouldDecodeMessageWithMultipleChunks() { - EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() ); - assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3} ) ) ); assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{4, 5} ) ) ); assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{6, 7, 8} ) ) ); @@ -53,7 +59,7 @@ public void shouldDecodeMessageWithMultipleChunks() assertTrue( channel.finish() ); assertEquals( 1, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() ); } @Test @@ -73,8 +79,8 @@ public void shouldDecodeMultipleConsecutiveMessages() channel.writeInbound( wrappedBuffer( new byte[0] ) ); assertEquals( 3, channel.inboundMessages().size() ); - assertEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() ); - assertEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() ); - assertEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() ); + assertByteBufEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java index 77bc8ab26b..060250c97f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandlerTest.java @@ -60,24 +60,18 @@ public class OutboundMessageHandlerTest { - private EmbeddedChannel channel; - private InboundMessageDispatcher messageDispatcher; + private final EmbeddedChannel channel = new EmbeddedChannel(); @Before public void setUp() { - channel = new EmbeddedChannel(); - messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) ); } @After public void tearDown() { - if ( channel != null ) - { - channel.close(); - } + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java index 381564992b..b1139df7cb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthCheckerTest.java @@ -38,12 +38,12 @@ import static org.neo4j.driver.internal.async.ChannelAttributes.setCreationTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; -import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE; import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; import static org.neo4j.driver.internal.async.pool.PoolSettings.NOT_CONFIGURED; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -53,15 +53,15 @@ public class NettyChannelHealthCheckerTest private final InboundMessageDispatcher dispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); @Before - public void setUp() throws Exception + public void setUp() { setMessageDispatcher( channel, dispatcher ); } @After - public void tearDown() throws Exception + public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java index d9a24d1c5a..3a1dd62c90 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/InitResponseHandlerTest.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.EncoderException; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -63,6 +64,12 @@ public void setUp() pipeline.addLast( new ChannelErrorHandler( DEV_NULL_LOGGING ) ); } + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + @Test public void shouldSetServerVersionOnChannel() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index f55306fe2d..d12849e141 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -41,7 +41,7 @@ public class ResetResponseHandlerTest @After public void tearDown() { - channel.close(); + channel.finishAndReleaseAll(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index a78038600d..2abd5e6506 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -72,7 +72,7 @@ public static > T await( U future ) { try { - return future.get( 1, MINUTES ); + return future.get( 5, MINUTES ); } catch ( InterruptedException e ) { @@ -110,7 +110,7 @@ public static void assertByteBufContains( ByteBuf buf, Number... values ) } finally { - buf.release(); + releaseIfPossible( buf ); } } @@ -122,8 +122,8 @@ public static void assertByteBufEquals( ByteBuf expected, ByteBuf actual ) } finally { - expected.release(); - actual.release(); + releaseIfPossible( expected ); + releaseIfPossible( actual ); } } @@ -240,4 +240,12 @@ else if ( value instanceof Double ) "Unexpected number: '" + value + "' or type" + value.getClass() ); } } + + private static void releaseIfPossible( ByteBuf buf ) + { + if ( buf.refCnt() > 0 ) + { + buf.release(); + } + } }