Skip to content

Fixed couple netty buffer leaks #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
}

@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
{
if ( readMessageBoundary )
{
// now we have a complete message in the input buffer

// increment ref count of the buffer because we will pass it's duplicate through
in.retain();
ByteBuf res = in.duplicate();
// increment ref count of the buffer and create it's duplicate that shares the content
// duplicate will be the output of this decoded and input for the next one
ByteBuf messageBuf = in.retainedDuplicate();

// signal that whole message was read by making input buffer seem like it was fully read/consumed
in.readerIndex( in.readableBytes() );

// pass the full message to the next handler in the pipeline
out.add( res );
out.add( messageBuf );

readMessageBoundary = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
try
{
writer.write( msg );
output.stop();
}
catch ( Throwable error )
{
throw new EncoderException( "Failed to write outbound message: " + msg, error );
}
finally
{
output.stop();
// release buffer because it will not get added to the out list and no other handler is going to handle it
messageBuf.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

throw new EncoderException( "Failed to write outbound message: " + msg, error );
}

if ( log.isTraceEnabled() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.neo4j.driver.internal.async;

import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Test;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
Expand Down Expand Up @@ -47,12 +46,6 @@ public class ChannelAttributesTest
{
private final EmbeddedChannel channel = new EmbeddedChannel();

@After
public void tearDown() throws Exception
{
channel.close();
}

@Test
public void shouldSetAndGetAddress()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ChannelConnectedListenerTest
@After
public void tearDown()
{
channel.close();
channel.finishAndReleaseAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class HandshakeCompletedListenerTest
private final EmbeddedChannel channel = new EmbeddedChannel();

@After
public void tearDown() throws Exception
public void tearDown()
{
channel.close();
channel.finishAndReleaseAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void setUp()
@After
public void tearDown()
{
channel.close();
channel.finishAndReleaseAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.ssl.SslHandler;
import org.junit.After;
import org.junit.Test;

import org.neo4j.driver.internal.security.SecurityPlan;
Expand All @@ -31,20 +32,27 @@
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp;
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;

public class NettyChannelInitializerTest
{
private final EmbeddedChannel channel = new EmbeddedChannel();

@After
public void tearDown()
{
channel.finishAndReleaseAll();
}

@Test
public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception
{
SecurityPlan security = SecurityPlan.forAllCertificates();
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );

EmbeddedChannel channel = new EmbeddedChannel();
initializer.initChannel( channel );

assertNotNull( channel.pipeline().get( SslHandler.class ) );
Expand All @@ -56,7 +64,6 @@ public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption()
SecurityPlan security = SecurityPlan.insecure();
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );

EmbeddedChannel channel = new EmbeddedChannel();
initializer.initChannel( channel );

assertNull( channel.pipeline().get( SslHandler.class ) );
Expand All @@ -70,7 +77,6 @@ public void shouldUpdateChannelAttributes()
SecurityPlan security = SecurityPlan.insecure();
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock );

EmbeddedChannel channel = new EmbeddedChannel();
initializer.initChannel( channel );

assertEquals( LOCAL_DEFAULT, serverAddress( channel ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Test;

import static io.netty.buffer.Unpooled.buffer;
Expand All @@ -28,9 +29,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;

public class ChunkDecoderTest
{
private final EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );

@After
public void tearDown()
{
channel.finishAndReleaseAll();
}

@Test
public void shouldDecodeFullChunk()
{
Expand All @@ -54,7 +64,7 @@ public void shouldDecodeFullChunk()
// there should only be a single chunk available for reading
assertEquals( 1, channel.inboundMessages().size() );
// it should have no size header and expected body
assertEquals( input.slice( 2, 7 ), channel.readInbound() );
assertByteBufEquals( input.slice( 2, 7 ), channel.readInbound() );
}

@Test
Expand Down Expand Up @@ -97,7 +107,7 @@ public void shouldDecodeSplitChunk()
// there should only be a single chunk available for reading
assertEquals( 1, channel.inboundMessages().size() );
// it should have no size header and expected body
assertEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() );
}

@Test
Expand All @@ -113,6 +123,6 @@ public void shouldDecodeEmptyChunk()
// there should only be a single chunk available for reading
assertEquals( 1, channel.inboundMessages().size() );
// it should have no size header and empty body
assertEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void tearDown()
{
if ( channel != null )
{
channel.close();
channel.finishAndReleaseAll();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,47 @@
package org.neo4j.driver.internal.async.inbound;

import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
import org.junit.Test;

import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;

public class MessageDecoderTest
{
private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );

@After
public void tearDown()
{
channel.finishAndReleaseAll();
}

@Test
public void shouldDecodeMessageWithSingleChunk()
{
EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );

assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ) ) );
assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) );
assertTrue( channel.finish() );

assertEquals( 1, channel.inboundMessages().size() );
assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() );
}

@Test
public void shouldDecodeMessageWithMultipleChunks()
{
EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );

assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3} ) ) );
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{4, 5} ) ) );
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{6, 7, 8} ) ) );
assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) );
assertTrue( channel.finish() );

assertEquals( 1, channel.inboundMessages().size() );
assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() );
}

@Test
Expand All @@ -73,8 +79,8 @@ public void shouldDecodeMultipleConsecutiveMessages()
channel.writeInbound( wrappedBuffer( new byte[0] ) );

assertEquals( 3, channel.inboundMessages().size() );
assertEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() );
assertEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() );
assertEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() );
assertByteBufEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,18 @@

public class OutboundMessageHandlerTest
{
private EmbeddedChannel channel;
private InboundMessageDispatcher messageDispatcher;
private final EmbeddedChannel channel = new EmbeddedChannel();

@Before
public void setUp()
{
channel = new EmbeddedChannel();
messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING );
ChannelAttributes.setMessageDispatcher( channel, messageDispatcher );
ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
}

@After
public void tearDown()
{
if ( channel != null )
{
channel.close();
}
channel.finishAndReleaseAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import static org.neo4j.driver.internal.async.ChannelAttributes.setCreationTimestamp;
import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;
import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT;
import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST;
import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE;
import static org.neo4j.driver.internal.async.pool.PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE;
import static org.neo4j.driver.internal.async.pool.PoolSettings.NOT_CONFIGURED;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.util.Iterables.single;
import static org.neo4j.driver.v1.util.TestUtil.await;

Expand All @@ -53,15 +53,15 @@ public class NettyChannelHealthCheckerTest
private final InboundMessageDispatcher dispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING );

@Before
public void setUp() throws Exception
public void setUp()
{
setMessageDispatcher( channel, dispatcher );
}

@After
public void tearDown() throws Exception
public void tearDown()
{
channel.close();
channel.finishAndReleaseAll();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.EncoderException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -63,6 +64,12 @@ public void setUp()
pipeline.addLast( new ChannelErrorHandler( DEV_NULL_LOGGING ) );
}

@After
public void tearDown()
{
channel.finishAndReleaseAll();
}

@Test
public void shouldSetServerVersionOnChannel()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ResetResponseHandlerTest
@After
public void tearDown()
{
channel.close();
channel.finishAndReleaseAll();
}

@Test
Expand Down
Loading