Skip to content

Fix handling of large chunks #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,23 @@ private static Throwable protocolNoSupportedByDriverError( int suggestedProtocol

private static Throwable transformError( Throwable error )
{
Throwable cause = error instanceof DecoderException ? error.getCause() : error;
if ( cause instanceof ServiceUnavailableException )
if ( error instanceof DecoderException && error.getCause() != null )
{
return cause;
// unwrap the DecoderException if it has a cause
error = error.getCause();
}
else if ( cause instanceof SSLHandshakeException )

if ( error instanceof ServiceUnavailableException )
{
return error;
}
else if ( error instanceof SSLHandshakeException )
{
return new SecurityException( "Failed to establish secured connection with the server", cause );
return new SecurityException( "Failed to establish secured connection with the server", error );
}
else
{
return new ServiceUnavailableException( "Failed to establish connection with the server", cause );
return new ServiceUnavailableException( "Failed to establish connection with the server", error );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ private void fail( ChannelHandlerContext ctx, Throwable error )

private static Throwable transformError( Throwable error )
{
if ( error instanceof CodecException )
if ( error instanceof CodecException && error.getCause() != null )
{
// unwrap exception from message encoder/decoder
// unwrap the CodecException if it has a cause
error = error.getCause();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@

public class ChunkDecoder extends LengthFieldBasedFrameDecoder
{
private static final int MAX_FRAME_LENGTH = Short.MAX_VALUE;
private static final int MAX_FRAME_BODY_LENGTH = 0xFFFF;
private static final int LENGTH_FIELD_OFFSET = 0;
private static final int LENGTH_FIELD_LENGTH = 2;
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = LENGTH_FIELD_LENGTH;
private static final int MAX_FRAME_LENGTH = LENGTH_FIELD_LENGTH + MAX_FRAME_BODY_LENGTH;

private final Logging logging;
private Logger log;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ public void shouldHandleCodecException()
assertFalse( channel.isOpen() );
}

@Test
public void shouldHandleCodecExceptionWithoutCause()
{
CodecException codecException = new CodecException( "Unable to encode or decode message" );
channel.pipeline().fireExceptionCaught( codecException );

Throwable error = messageDispatcher.currentError();

assertEquals( codecException, error );
assertFalse( channel.isOpen() );
}

@Test
public void shouldHandleIOException()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,31 @@ public void shouldUnwrapDecoderException()
assertNull( await( channel.closeFuture() ) );
}

@Test
public void shouldHandleDecoderExceptionWithoutCause()
{
ChannelPromise handshakeCompletedPromise = channel.newPromise();
HandshakeHandler handler = newHandler( handshakeCompletedPromise );
channel.pipeline().addLast( handler );

DecoderException decoderException = new DecoderException( "Unable to decode a message" );
channel.pipeline().fireExceptionCaught( decoderException );

try
{
// promise should fail
await( handshakeCompletedPromise );
fail( "Exception expected" );
}
catch ( ServiceUnavailableException e )
{
assertEquals( decoderException, e.getCause() );
}

// channel should be closed
assertNull( await( channel.closeFuture() ) );
}

@Test
public void shouldTranslateSSLHandshakeException()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,22 @@ public void shouldLogNonEmptyChunkOnTraceLevel()
assertByteBufEquals( wrappedBuffer( bytes ), channel.readInbound() );
}

@Test
public void shouldDecodeMaxSizeChunk()
{
byte[] message = new byte[0xFFFF];

ByteBuf input = buffer();
input.writeShort( message.length ); // chunk header
input.writeBytes( message ); // chunk body

assertTrue( channel.writeInbound( input ) );
assertTrue( channel.finish() );

assertEquals( 1, channel.inboundMessages().size() );
assertByteBufEquals( wrappedBuffer( message ), channel.readInbound() );
}

private static ChunkDecoder newChunkDecoder()
{
return new ChunkDecoder( DEV_NULL_LOGGING );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -35,11 +36,15 @@
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.util.TestNeo4jSession;
import org.neo4j.driver.v1.util.TestUtil;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;
Expand All @@ -52,6 +57,8 @@

public class ParametersIT
{
private static final int LONG_VALUE_SIZE = 1_000_000;

@Rule
public TestNeo4jSession session = new TestNeo4jSession();

Expand Down Expand Up @@ -451,6 +458,35 @@ public void shouldNotBePossibleToUsePathAsParameterViaMapValue()
expectIOExceptionWithMessage( mapValue, "Unknown type: PATH" );
}

@Test
public void shouldSendAndReceiveLongString()
{
String string = TestUtil.randomString( LONG_VALUE_SIZE );
testSendAndReceiveValue( string );
}

@Test
public void shouldSendAndReceiveLongListOfLongs()
{
List<Long> longs = ThreadLocalRandom.current()
.longs( LONG_VALUE_SIZE )
.boxed()
.collect( toList() );

testSendAndReceiveValue( longs );
}

@Test
public void shouldSendAndReceiveLongArrayOfBytes()
{
assumeTrue( supportsBytes() );

byte[] bytes = new byte[LONG_VALUE_SIZE];
ThreadLocalRandom.current().nextBytes( bytes );

testSendAndReceiveValue( bytes );
}

private void testBytesProperty( byte[] array )
{
assumeTrue( supportsBytes() );
Expand Down Expand Up @@ -509,4 +545,11 @@ private void expectIOExceptionWithMessage( Value value, String message )
fail( "Expecting a ServiceUnavailableException but got " + e );
}
}

private void testSendAndReceiveValue( Object value )
{
StatementResult result = session.run( "RETURN $value", singletonMap( "value", value ) );
Object receivedValue = result.single().get( 0 ).asObject();
assertArrayEquals( new Object[]{value}, new Object[]{receivedValue} );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
Expand All @@ -39,11 +38,12 @@
import org.neo4j.driver.v1.tck.tck.util.runners.CypherStatementRunner;
import org.neo4j.driver.v1.tck.tck.util.runners.StatementRunner;
import org.neo4j.driver.v1.tck.tck.util.runners.StringRunner;
import org.neo4j.driver.v1.util.TestUtil;

import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThat;
import static org.neo4j.driver.v1.tck.Environment.driver;
import static org.neo4j.driver.v1.Values.parameters;
import static org.neo4j.driver.v1.tck.Environment.driver;
import static org.neo4j.driver.v1.tck.Environment.expectedBoltValue;
import static org.neo4j.driver.v1.tck.Environment.expectedJavaValue;
import static org.neo4j.driver.v1.tck.Environment.listOfObjects;
Expand Down Expand Up @@ -77,9 +77,9 @@ public void a_value( String value ) throws Throwable
}

@Given( "^a String of size (\\d+)$" )
public void a_String_of_size( long size ) throws Throwable
public void a_String_of_size( int size ) throws Throwable
{
expectedJavaValue = getRandomString( size );
expectedJavaValue = TestUtil.randomString( size );
expectedBoltValue = Values.value( expectedJavaValue );
}

Expand Down Expand Up @@ -191,18 +191,6 @@ public void result_should_be_equal_to_a_single_Type_of_Input() throws Throwable
}
}

public String getRandomString( long size )
{
StringBuilder stringBuilder = new StringBuilder();
String alphabet = "abcdefghijklmnopqrstuvwxyz";
Random random = new Random();
while ( size-- > 0 )
{
stringBuilder.append( alphabet.charAt( random.nextInt( alphabet.length() ) ) );
}
return stringBuilder.toString();
}

public List<Object> getListOfRandomsOfTypes( Type type, long size )
{
List<Object> list = new ArrayList<>();
Expand Down
13 changes: 13 additions & 0 deletions driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -57,6 +58,7 @@
public final class TestUtil
{
private static final long DEFAULT_WAIT_TIME_MS = MINUTES.toMillis( 1 );
private static final String ALPHANUMERICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789";

private TestUtil()
{
Expand Down Expand Up @@ -257,6 +259,17 @@ public static void awaitCondition( BooleanSupplier condition, long value, TimeUn
}
}

public static String randomString( int size )
{
StringBuilder sb = new StringBuilder( size );
ThreadLocalRandom random = ThreadLocalRandom.current();
for ( int i = 0; i < size; i++ )
{
sb.append( ALPHANUMERICS.charAt( random.nextInt( ALPHANUMERICS.length() ) ) );
}
return sb.toString();
}

private static void setupSuccessfulPullAll( Connection connection, String statement )
{
doAnswer( invocation ->
Expand Down