Skip to content

Commit 791295a

Browse files
author
Zhen
committed
Drop connection/session peacefully on protocol violation error
Scenario: C->S: RUN (&*()&&*%&^%I, PULL_ALL,... // Client sends to server a RUN message that cannot be understood by the server (unable to dechunk or unpack) and a PULL_ALL S->C: Neo.ClientError.Request, RST // Server sends back an error for RUN message and the wish to close this connection, without a reply to PULL_ALL This PR makes sure that the client should close the connection/session immediately if the connection would be broken afterwards without waiting for any reply for PULL_ALL.
1 parent dfba014 commit 791295a

File tree

12 files changed

+220
-15
lines changed

12 files changed

+220
-15
lines changed

README.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,3 @@ Java 8, because Neo4j-the-database needs it to run.
4848

4949
If you are building on windows, you need to run install as admin so that Neo4j-the-database could be registered as a
5050
windows service and then be started and stopped correctly using its powershell scripts for windows.
51-
To be able to run powershell script on windows, you might need to enable running scripts on the system.
52-
This can for example be achieved by executing the following from an elevated PowerShell prompt:
53-
54-
Set-ExecutionPolicy -ExecutionPolicy RemoteSigned

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public InternalSession( Connection connection )
5656
@Override
5757
public ResultCursor run( String statementText, Map<String,Value> statementParameters )
5858
{
59-
ensureNoOpenTransaction();
59+
ensureConnectionIsValid();
6060
ResultBuilder resultBuilder = new ResultBuilder( statementText, statementParameters );
6161
connection.run( statementText, statementParameters, resultBuilder );
6262

@@ -111,7 +111,7 @@ public void close()
111111
@Override
112112
public Transaction beginTransaction()
113113
{
114-
ensureNoOpenTransaction();
114+
ensureConnectionIsValid();
115115
return currentTransaction = new InternalTransaction( connection, txCleanup );
116116
}
117117

@@ -121,6 +121,12 @@ public TypeSystem typeSystem()
121121
return InternalTypeSystem.TYPE_SYSTEM;
122122
}
123123

124+
private void ensureConnectionIsValid()
125+
{
126+
ensureNoOpenTransaction();
127+
ensureConnectionIsOpen();
128+
}
129+
124130
private void ensureNoOpenTransaction()
125131
{
126132
if ( currentTransaction != null )
@@ -129,4 +135,14 @@ private void ensureNoOpenTransaction()
129135
"more statements/transactions in the current session." );
130136
}
131137
}
138+
139+
private void ensureConnectionIsOpen()
140+
{
141+
if ( !connection.isOpen() )
142+
{
143+
throw new ClientException( "The current session cannot be reused as the underlying connection with the " +
144+
"server has been closed due to unrecoverable errors. " +
145+
"Please close this session and retry your statement in another new session." );
146+
}
147+
}
132148
}

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,27 @@ public void send( List<Message> pendingMessages, SocketResponseHandler handler )
102102
while ( handler.receivedResponses() < pendingMessages.size() )
103103
{
104104
reader.read( handler );
105+
106+
// TODO: all the errors come from the following trace should result in the termination of this channel
107+
// https://github.com/neo4j/neo4j/blob/3.0/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltProtocolV1.java#L86
108+
if ( handler.protocolViolationErrorOccurred() )
109+
{
110+
stop();
111+
throw handler.serverFailure();
112+
}
105113
}
106114
}
107115

108116
public void stop()
109117
{
110118
try
111119
{
112-
channel.close();
113-
channel = null;
114-
logger.debug( "~~ [CLOSE]" );
120+
if( channel != null )
121+
{
122+
logger.debug( "~~ [CLOSE]" );
123+
channel.close();
124+
channel = null;
125+
}
115126
}
116127
catch ( IOException e )
117128
{
@@ -126,11 +137,16 @@ public void stop()
126137
}
127138
}
128139

140+
public boolean isOpen()
141+
{
142+
return channel != null && channel.isOpen();
143+
}
144+
129145
private SocketProtocol negotiateProtocol() throws IOException
130146
{
131147
logger.debug( "~~ [HANDSHAKE] [0x6060B017, 1, 0, 0, 0]." );
132148
//Propose protocol versions
133-
ByteBuffer buf = ByteBuffer.allocate( 5 * 4 ).order( BIG_ENDIAN);
149+
ByteBuffer buf = ByteBuffer.allocate( 5 * 4 ).order( BIG_ENDIAN );
134150
buf.putInt( MAGIC_PREAMBLE );
135151
for ( int version : SUPPORTED_VERSIONS )
136152
{

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ public void close()
154154
socket.stop();
155155
}
156156

157+
@Override
158+
public boolean isOpen()
159+
{
160+
return socket.isOpen();
161+
}
162+
157163
private int nextRequestId()
158164
{
159165
return (requestCounter++);

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ public void registerResultCollector( int correlationId, StreamCollector collecto
222222
collectors.put( correlationId, collector );
223223
}
224224

225+
public boolean protocolViolationErrorOccurred()
226+
{
227+
return error != null && error.neo4jErrorCode().startsWith( "Neo.ClientError.Request" );
228+
}
229+
225230
public boolean serverFailureOccurred()
226231
{
227232
return error != null;

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void sync()
100100
{
101101
delegate.sync();
102102
}
103-
catch(RuntimeException e)
103+
catch ( RuntimeException e )
104104
{
105105
onDelegateException( e );
106106
}
@@ -112,6 +112,12 @@ public void close()
112112
release.accept( this );
113113
}
114114

115+
@Override
116+
public boolean isOpen()
117+
{
118+
return delegate.isOpen();
119+
}
120+
115121
public boolean hasUnrecoverableErrors()
116122
{
117123
return unrecoverableErrorsOccurred;
@@ -130,13 +136,19 @@ public void dispose()
130136
*/
131137
private void onDelegateException( RuntimeException e )
132138
{
133-
if ( !isClientOrTransientError( e ) )
139+
if ( !isClientOrTransientError( e ) || isProtocolViolationError( e ) )
134140
{
135141
unrecoverableErrorsOccurred = true;
136142
}
137143
throw e;
138144
}
139145

146+
private boolean isProtocolViolationError(RuntimeException e )
147+
{
148+
return e instanceof Neo4jException
149+
&& ((Neo4jException) e).neo4jErrorCode().startsWith( "Neo.ClientError.Request" );
150+
}
151+
140152
private boolean isClientOrTransientError( RuntimeException e )
141153
{
142154
// Eg: DatabaseErrors and unknown (no status code or not neo4j exception) cause session to be discarded

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,14 @@ public interface Connection extends AutoCloseable
5959

6060
@Override
6161
void close();
62+
63+
/**
64+
* Test if the underlying socket connection with the server is still open.
65+
* When the socket connection with the server is closed,
66+
* the connection cannot take on any task, but be {@link #close() closed} to release resources it occupies.
67+
* Note: Invocation of {@link #close()} method would make this method to return false,
68+
* however this method cannot indicate whether {@link #close()} is already be called or not.
69+
* @return true if the socket connection with the server is open, otherwise false.
70+
*/
71+
boolean isOpen();
6272
}

driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static junit.framework.TestCase.assertNotNull;
3030
import static org.mockito.Mockito.mock;
3131
import static org.mockito.Mockito.verify;
32+
import static org.mockito.Mockito.when;
3233

3334
public class InternalSessionTest
3435
{
@@ -40,6 +41,7 @@ public void shouldSyncOnRun() throws Throwable
4041
{
4142
// Given
4243
Connection mock = mock( Connection.class );
44+
when( mock.isOpen() ).thenReturn( true );
4345
InternalSession sess = new InternalSession( mock );
4446

4547
// When
@@ -54,6 +56,7 @@ public void shouldNotAllowNewTxWhileOneIsRunning() throws Throwable
5456
{
5557
// Given
5658
Connection mock = mock( Connection.class );
59+
when( mock.isOpen() ).thenReturn( true );
5760
InternalSession sess = new InternalSession( mock );
5861
sess.beginTransaction();
5962

@@ -69,6 +72,7 @@ public void shouldBeAbleToOpenTxAfterPreviousIsClosed() throws Throwable
6972
{
7073
// Given
7174
Connection mock = mock( Connection.class );
75+
when( mock.isOpen() ).thenReturn( true );
7276
InternalSession sess = new InternalSession( mock );
7377
sess.beginTransaction().close();
7478

@@ -84,6 +88,7 @@ public void shouldNotBeAbleToUseSessionWhileOngoingTransaction() throws Throwabl
8488
{
8589
// Given
8690
Connection mock = mock( Connection.class );
91+
when( mock.isOpen() ).thenReturn( true );
8792
InternalSession sess = new InternalSession( mock );
8893
sess.beginTransaction();
8994

@@ -99,6 +104,7 @@ public void shouldBeAbleToUseSessionAgainWhenTransactionIsClosed() throws Throwa
99104
{
100105
// Given
101106
Connection mock = mock( Connection.class );
107+
when( mock.isOpen() ).thenReturn( true );
102108
InternalSession sess = new InternalSession( mock );
103109
sess.beginTransaction().close();
104110

@@ -108,4 +114,34 @@ public void shouldBeAbleToUseSessionAgainWhenTransactionIsClosed() throws Throwa
108114
// Then
109115
verify( mock ).sync();
110116
}
117+
118+
@Test
119+
public void shouldNotAllowMoreStatementsInSessionWhileConnectionClosed() throws Throwable
120+
{
121+
// Given
122+
Connection mock = mock( Connection.class );
123+
when( mock.isOpen() ).thenReturn( false );
124+
InternalSession sess = new InternalSession( mock );
125+
126+
// Expect
127+
exception.expect( ClientException.class );
128+
129+
// When
130+
sess.run( "whatever" );
131+
}
132+
133+
@Test
134+
public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throws Throwable
135+
{
136+
// Given
137+
Connection mock = mock( Connection.class );
138+
when( mock.isOpen() ).thenReturn( false );
139+
InternalSession sess = new InternalSession( mock );
140+
141+
// Expect
142+
exception.expect( ClientException.class );
143+
144+
// When
145+
sess.beginTransaction();
146+
}
111147
}

driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
*/
1919
package org.neo4j.driver.internal.pool;
2020

21-
import java.io.IOException;
22-
2321
import org.junit.Test;
2422
import org.mockito.Mockito;
2523

24+
import java.io.IOException;
25+
2626
import org.neo4j.driver.internal.spi.Connection;
2727
import org.neo4j.driver.v1.exceptions.ClientException;
2828
import org.neo4j.driver.v1.exceptions.Neo4jException;
@@ -68,6 +68,13 @@ public void shouldNotInvalidateOnKnownRecoverableExceptions() throws Throwable
6868
assertRecoverable( new TransientException( "Neo.TransientError.General.ReadOnly", "Hello, world!" ) );
6969
}
7070

71+
@Test
72+
public void shouldInvalidateOnProtocolViolationExceptions() throws Throwable
73+
{
74+
assertUnrecoverable( new ClientException( "Neo.ClientError.Request.InvalidFormat", "Hello, world!" ) );
75+
assertUnrecoverable( new ClientException( "Neo.ClientError.Request.Invalid", "Hello, world!" ) );
76+
}
77+
7178
private void assertUnrecoverable( Neo4jException exception )
7279
{
7380
doThrow( exception ).when( delegate ).sync();
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Copyright (c) 2002-2015 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.neo4j.driver.v1.integration;
21+
22+
import org.junit.After;
23+
import org.junit.Before;
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.neo4j.driver.internal.connector.socket.SocketClient;
27+
import org.neo4j.driver.internal.connector.socket.SocketResponseHandler;
28+
import org.neo4j.driver.internal.logging.DevNullLogger;
29+
import org.neo4j.driver.internal.messaging.InitMessage;
30+
import org.neo4j.driver.internal.messaging.Message;
31+
import org.neo4j.driver.v1.Config;
32+
import org.neo4j.driver.v1.exceptions.ClientException;
33+
import org.neo4j.driver.v1.util.TestNeo4j;
34+
35+
import java.net.URI;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
39+
import static junit.framework.TestCase.assertTrue;
40+
import static org.hamcrest.CoreMatchers.equalTo;
41+
import static org.junit.Assert.assertThat;
42+
import static org.junit.Assert.fail;
43+
import static org.mockito.Mockito.*;
44+
45+
public class SocketClientIT
46+
{
47+
@Rule
48+
public TestNeo4j neo4j = new TestNeo4j();
49+
50+
private SocketClient client = null;
51+
52+
@Before
53+
public void setup()
54+
{
55+
URI url = URI.create( neo4j.address() );
56+
client = new SocketClient( url.getHost(), url.getPort(), Config.defaultConfig(),
57+
new DevNullLogger() );
58+
}
59+
60+
@After
61+
public void tearDown()
62+
{
63+
if( client != null )
64+
{
65+
client.stop();
66+
}
67+
}
68+
69+
@Test
70+
public void shouldCloseConnectionWhenReceivingProtocolViolationError() throws Exception
71+
{
72+
// Given
73+
List<Message> messages = new ArrayList<>( 2 );
74+
messages.add( new InitMessage( "EvilClientV1_Hello" ) );
75+
messages.add( new InitMessage( "EvilClientV1_World" ) );
76+
77+
SocketResponseHandler handler = mock( SocketResponseHandler.class );
78+
when( handler.protocolViolationErrorOccurred() ).thenReturn( true );
79+
when( handler.receivedResponses() ).thenReturn( 0, 1, 2 );
80+
when( handler.serverFailure() ).thenReturn(
81+
new ClientException( "Neo.ClientError.Request.InvalidFormat", "Hello, world!" ) );
82+
83+
// When & Then
84+
client.start();
85+
try
86+
{
87+
client.send( messages, handler );
88+
fail( "The client should receive a protocol violation error" );
89+
}
90+
catch ( Exception e )
91+
{
92+
assertTrue( e instanceof ClientException );
93+
assertThat( e.getMessage(), equalTo( "Hello, world!" ) );
94+
}
95+
96+
assertThat( client.isOpen(), equalTo( false ) );
97+
verify( handler, times(1) ).protocolViolationErrorOccurred();
98+
verify( handler, times(1) ).receivedResponses();
99+
}
100+
}

0 commit comments

Comments
 (0)