Skip to content

Commit 49e3d73

Browse files
committed
Better handling of closed connections
- Get rid of StackOverFlowError when no data on network - If connection is closed give a proper error message
1 parent 4d56eec commit 49e3d73

File tree

2 files changed

+104
-29
lines changed

2 files changed

+104
-29
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInput.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
*
4+
* <p>
55
* This file is part of Neo4j.
6-
*
6+
* <p>
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
10+
* <p>
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
* <p>
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -100,8 +100,11 @@ public State readChunkSize( BufferingChunkedInput ctx ) throws IOException
100100
{
101101
if ( ctx.buffer.remaining() == 0 )
102102
{
103-
//buffer empty, read next packet and try again
104-
readNextPacket( ctx.channel, ctx.buffer );
103+
//buffer empty, block until you get at least at least one byte
104+
while ( ctx.buffer.remaining() == 0 )
105+
{
106+
readNextPacket( ctx.channel, ctx.buffer );
107+
}
105108
return AWAITING_CHUNK.readChunkSize( ctx );
106109
}
107110
else if ( ctx.buffer.remaining() >= 2 )
@@ -132,7 +135,7 @@ public State read( BufferingChunkedInput ctx ) throws IOException
132135
public State peekByte( BufferingChunkedInput ctx ) throws IOException
133136
{
134137
//read chunk size and then proceed to read the rest of the chunk.
135-
return readChunkSize( ctx ).peekByte( ctx );
138+
return readChunkSize( ctx ).peekByte( ctx );
136139
}
137140
},
138141
IN_CHUNK
@@ -175,7 +178,7 @@ else if ( ctx.buffer.remaining() < ctx.scratchBuffer.remaining() )
175178
int bytesToRead = min( ctx.scratchBuffer.remaining(), ctx.remainingChunkSize );
176179
copyBytes( ctx.buffer, ctx.scratchBuffer, bytesToRead );
177180
ctx.remainingChunkSize -= bytesToRead;
178-
if (ctx.scratchBuffer.remaining() == 0)
181+
if ( ctx.scratchBuffer.remaining() == 0 )
179182
{
180183
//we have written all data that was asked for us
181184
return IN_CHUNK;
@@ -229,7 +232,7 @@ public State readChunkSize( BufferingChunkedInput ctx ) throws IOException
229232
}
230233

231234
@Override
232-
public State read( BufferingChunkedInput ctx) throws IOException
235+
public State read( BufferingChunkedInput ctx ) throws IOException
233236
{
234237
throw new IllegalStateException( "Cannot read data while in progress of reading header" );
235238
}
@@ -271,25 +274,34 @@ public State peekByte( BufferingChunkedInput ctx ) throws IOException
271274
* @param buffer The buffer to read into
272275
* @throws IOException
273276
*/
274-
private static void readNextPacket(ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
277+
private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buffer ) throws IOException
275278
{
276279
try
277280
{
278281
buffer.clear();
279-
channel.read( buffer );
282+
int read = channel.read( buffer );
283+
if ( read == -1 )
284+
{
285+
throw new ClientException(
286+
"Connection terminated while receiving data. This can happen due to network " +
287+
"instabilities, or due to restarts of the database." );
288+
}
280289
buffer.flip();
281290
}
282-
catch( ClosedByInterruptException e )
291+
catch ( ClosedByInterruptException e )
283292
{
284293
throw new ClientException(
285-
"Connection to the database was lost because someone called `interrupt()` on the driver thread waiting for a reply. " +
286-
"This normally happens because the JVM is shutting down, but it can also happen because your application code or some " +
294+
"Connection to the database was lost because someone called `interrupt()` on the driver " +
295+
"thread waiting for a reply. " +
296+
"This normally happens because the JVM is shutting down, but it can also happen because your " +
297+
"application code or some " +
287298
"framework you are using is manually interrupting the thread." );
288299
}
289300
catch ( IOException e )
290301
{
291302
String message = e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage();
292-
throw new ClientException( "Unable to process request: " + message + " buffer: \n" + BytePrinter.hex( buffer ), e );
303+
throw new ClientException(
304+
"Unable to process request: " + message + " buffer: \n" + BytePrinter.hex( buffer ), e );
293305
}
294306
}
295307

@@ -368,7 +380,7 @@ public PackInput readBytes( byte[] into, int offset, int toRead ) throws IOExcep
368380
public byte peekByte() throws IOException
369381
{
370382
state = state.peekByte( this );
371-
return buffer.get(buffer.position());
383+
return buffer.get( buffer.position() );
372384
}
373385

374386
private boolean hasMoreDataUnreadInCurrentChunk()
@@ -382,7 +394,7 @@ private boolean hasMoreDataUnreadInCurrentChunk()
382394
public void run()
383395
{
384396
// the on message complete should only be called when no data unread from the message buffer
385-
if( hasMoreDataUnreadInCurrentChunk() )
397+
if ( hasMoreDataUnreadInCurrentChunk() )
386398
{
387399
throw new ClientException( "Trying to read message complete ending '00 00' while there are more data " +
388400
"left in the message content unread: buffer [" +
@@ -392,11 +404,12 @@ public void run()
392404
try
393405
{
394406
// read message boundary
395-
state.readChunkSize( BufferingChunkedInput.this );
407+
state.readChunkSize( BufferingChunkedInput.this );
396408
if ( remainingChunkSize != 0 )
397409
{
398410
throw new ClientException( "Expecting message complete ending '00 00', but got " +
399-
BytePrinter.hex( ByteBuffer.allocate( 2 ).putShort( (short) remainingChunkSize ) ) );
411+
BytePrinter.hex( ByteBuffer.allocate( 2 )
412+
.putShort( (short) remainingChunkSize ) ) );
400413
}
401414
}
402415
catch ( IOException e )

driver/src/test/java/org/neo4j/driver/internal/connector/socket/BufferingChunkedInputTest.java

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/**
22
* Copyright (c) 2002-2016 "Neo Technology,"
33
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4-
*
4+
* <p>
55
* This file is part of Neo4j.
6-
*
6+
* <p>
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
11-
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
10+
* <p>
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
* <p>
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,6 @@
2121
import org.junit.Rule;
2222
import org.junit.Test;
2323
import org.junit.rules.ExpectedException;
24-
import org.mockito.Matchers;
2524

2625
import java.io.ByteArrayInputStream;
2726
import java.io.IOException;
@@ -38,6 +37,7 @@
3837
import static org.hamcrest.MatcherAssert.assertThat;
3938
import static org.junit.Assert.assertEquals;
4039
import static org.junit.Assert.fail;
40+
import static org.mockito.Matchers.any;
4141
import static org.mockito.Mockito.mock;
4242
import static org.mockito.Mockito.when;
4343

@@ -80,7 +80,8 @@ public void shouldReadOneByteInTwoChunks() throws IOException
8080
public void shouldReadOneByteWhenSplitHeader() throws IOException
8181
{
8282
// Given
83-
BufferingChunkedInput input = new BufferingChunkedInput( packets( packet( 0 ), packet( 1, 13, 0, 1, 37, 0, 0 ) ) );
83+
BufferingChunkedInput input =
84+
new BufferingChunkedInput( packets( packet( 0 ), packet( 1, 13, 0, 1, 37, 0, 0 ) ) );
8485

8586
// When
8687
byte b1 = input.readByte();
@@ -281,7 +282,7 @@ public void shouldGiveHelpfulMessageOnInterrupt() throws IOException
281282
{
282283
// Given
283284
ReadableByteChannel channel = mock( ReadableByteChannel.class );
284-
when( channel.read( Matchers.any( ByteBuffer.class ) ) ).thenThrow( new ClosedByInterruptException() );
285+
when( channel.read( any( ByteBuffer.class ) ) ).thenThrow( new ClosedByInterruptException() );
285286

286287
BufferingChunkedInput ch = new BufferingChunkedInput( channel, 2 );
287288

@@ -339,7 +340,8 @@ public void shouldPeekOneByteInTwoChunks() throws IOException
339340
public void shouldPeekOneByteWhenSplitHeader() throws IOException
340341
{
341342
// Given
342-
BufferingChunkedInput input = new BufferingChunkedInput( packets( packet( 0 ), packet( 1, 13, 0, 1, 37, 0, 0 ) ) );
343+
BufferingChunkedInput input =
344+
new BufferingChunkedInput( packets( packet( 0 ), packet( 1, 13, 0, 1, 37, 0, 0 ) ) );
343345

344346
// When
345347
byte peeked1 = input.peekByte();
@@ -373,6 +375,66 @@ public void shouldPeekOneByteInOneChunkWhenBustingBuffer() throws IOException
373375
assertThat( read2, equalTo( (byte) 37 ) );
374376
}
375377

378+
@Test
379+
public void shouldNotStackOverflowWhenDataIsNotAvailable() throws IOException
380+
{
381+
// Given a channel that does not get data from the channel
382+
ReadableByteChannel channel = new ReadableByteChannel()
383+
{
384+
private int counter = 0;
385+
private int numberOfTries = 10000;
386+
387+
@Override
388+
public int read( ByteBuffer dst ) throws IOException
389+
{
390+
if ( counter++ < numberOfTries )
391+
{
392+
return 0;
393+
}
394+
else
395+
{
396+
dst.put( (byte) 11 );
397+
return 1;
398+
}
399+
}
400+
401+
@Override
402+
public boolean isOpen()
403+
{
404+
return true;
405+
}
406+
407+
@Override
408+
public void close() throws IOException
409+
{
410+
411+
}
412+
};
413+
414+
// When
415+
BufferingChunkedInput input = new BufferingChunkedInput( channel );
416+
417+
// Then
418+
assertThat(input.readByte(), equalTo( (byte)11 ));
419+
420+
}
421+
422+
@Test
423+
public void shouldFailNicelyOnClosedConnections() throws IOException
424+
{
425+
// Given
426+
ReadableByteChannel channel = mock( ReadableByteChannel.class );
427+
when( channel.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
428+
BufferingChunkedInput input = new BufferingChunkedInput( channel );
429+
430+
//Expect
431+
exception.expect( ClientException.class );
432+
exception.expectMessage( "Connection terminated while receiving data. This can happen due to network " +
433+
"instabilities, or due to restarts of the database." );
434+
// When
435+
input.readByte();
436+
}
437+
376438
private ReadableByteChannel packet( int... bytes )
377439
{
378440
byte[] byteArray = new byte[bytes.length];

0 commit comments

Comments
 (0)