From 7572f5c86dd9f191c56c50ff95ff69e44031ca93 Mon Sep 17 00:00:00 2001 From: Zhen Date: Thu, 13 Oct 2016 10:02:09 +0200 Subject: [PATCH 1/2] Fixed expect strust but get xx bug by flipping a buffer back after connection error Close the socket channel directly if server already mark the end of the channel --- .../driver/internal/InternalSession.java | 38 +++++++++++-------- .../socket/BufferingChunkedInput.java | 13 ++++++- .../connector/socket/SocketUtils.java | 16 ++++++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index 042176e0bf..c76d40ac27 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -108,32 +108,38 @@ public boolean isOpen() public void close() { // Use atomic operation to protect from closing the connection twice (putting back to the pool twice). - if( !isOpen.compareAndSet( true, false ) ) + if ( !isOpen.compareAndSet( true, false ) ) { throw new ClientException( "This session has already been closed." ); } - else + + if( !connection.isOpen() ) + { + // the socket connection is already closed due to some error, cannot send more data + connection.close(); + return; + } + + if ( currentTransaction != null ) { - if ( currentTransaction != null ) - { - try - { - currentTransaction.close(); - } - catch ( Throwable e ) - { - // Best-effort - } - } try { - connection.sync(); + currentTransaction.close(); } - finally + catch ( Throwable e ) { - connection.close(); + // Best-effort } } + try + { + connection.sync(); + } + finally + { + connection.close(); + } + } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java index becb8c12c3..e103ebd78b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java @@ -408,11 +408,18 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff int read = channel.read( buffer ); if ( read == -1 ) { + try + { + channel.close(); + } + catch ( IOException e ) + { + // best effort + } throw new ClientException( "Connection terminated while receiving data. This can happen due to network " + "instabilities, or due to restarts of the database." ); } - buffer.flip(); } catch ( ClosedByInterruptException e ) { @@ -429,6 +436,10 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff throw new ClientException( "Unable to process request: " + message + " buffer: \n" + BytePrinter.hex( buffer ), e ); } + finally + { + buffer.flip(); + } } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketUtils.java b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketUtils.java index 992c9dc8f2..3e52ced28a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketUtils.java +++ b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketUtils.java @@ -41,6 +41,14 @@ public static void blockingRead(ByteChannel channel, ByteBuffer buf) throws IOEx { if (channel.read( buf ) < 0) { + try + { + channel.close(); + } + catch ( IOException e ) + { + // best effort + } String bufStr = BytePrinter.hex( buf ).trim(); throw new ClientException( String.format( "Connection terminated while receiving data. This can happen due to network " + @@ -56,6 +64,14 @@ public static void blockingWrite(ByteChannel channel, ByteBuffer buf) throws IOE { if (channel.write( buf ) < 0) { + try + { + channel.close(); + } + catch ( IOException e ) + { + // best effort + } String bufStr = BytePrinter.hex( buf ).trim(); throw new ClientException( String.format( "Connection terminated while sending data. This can happen due to network " + From accb44b8a785a0e141cc14bbd6d5148af24e5b95 Mon Sep 17 00:00:00 2001 From: Zhen Date: Thu, 13 Oct 2016 12:33:49 +0200 Subject: [PATCH 2/2] Adding test for the buffer change --- .../socket/BufferingChunkedInput.java | 2 +- .../socket/BufferingChunkedInputTest.java | 25 ++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java index e103ebd78b..6f31489d12 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java @@ -398,7 +398,7 @@ else if ( buffer.remaining() >= 2 ) * @param buffer The buffer to read into * @throws IOException */ - private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException + static void readNextPacket( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException { assert !buffer.hasRemaining(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInputTest.java b/driver/src/test/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInputTest.java index 704d80e891..7d73623023 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInputTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInputTest.java @@ -36,6 +36,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -491,6 +492,28 @@ public void shouldFailNicelyOnClosedConnections() throws IOException input.readByte(); } + + @Test + public void shouldKeepBufferCorrectWhenError() throws Throwable + { + // Given + ReadableByteChannel channel = mock( ReadableByteChannel.class ); + when( channel.read( any( ByteBuffer.class ) ) ).thenReturn( -1 ); + ByteBuffer buffer = ByteBuffer.allocate( 8 ); + buffer.limit(0); + + //Expect + exception.expect( ClientException.class ); + exception.expectMessage( "Connection terminated while receiving data. This can happen due to network " + + "instabilities, or due to restarts of the database." ); + // When + BufferingChunkedInput.readNextPacket( channel, buffer ); + assertEquals( buffer.position(), 0 ); + assertEquals( buffer.limit(), 0 ); + assertEquals( buffer.capacity(), 8 ); + assertFalse( channel.isOpen() ); + } + private ReadableByteChannel fillPacket( int size, int value ) { int[] ints = new int[size]; @@ -541,4 +564,4 @@ public void close() throws IOException }; } -} \ No newline at end of file +}