Skip to content

Commit e410dae

Browse files
author
Zhen
committed
Merge branch '1.0' into 1.1
2 parents 85aa122 + 541cd55 commit e410dae

20 files changed

+418
-66
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,14 @@ public void run()
148148
//must check if transaction has been closed
149149
if (currentTransaction != null)
150150
{
151-
currentTransaction.markAsRolledBack();
152-
currentTransaction = null;
153-
connection.onError( null );
151+
if( connection.hasUnrecoverableErrors() )
152+
{
153+
currentTransaction.markToClose();
154+
}
155+
else
156+
{
157+
currentTransaction.failure();
158+
}
154159
}
155160
}
156161
});
@@ -165,12 +170,14 @@ public TypeSystem typeSystem()
165170

166171
private void ensureConnectionIsValidBeforeRunningSession()
167172
{
173+
ensureNoUnrecoverableError();
168174
ensureNoOpenTransactionBeforeRunningSession();
169175
ensureConnectionIsOpen();
170176
}
171177

172178
private void ensureConnectionIsValidBeforeOpeningTransaction()
173179
{
180+
ensureNoUnrecoverableError();
174181
ensureNoOpenTransactionBeforeOpeningTransaction();
175182
ensureConnectionIsOpen();
176183
}
@@ -187,6 +194,16 @@ protected void finalize() throws Throwable
187194
super.finalize();
188195
}
189196

197+
private void ensureNoUnrecoverableError()
198+
{
199+
if( connection.hasUnrecoverableErrors() )
200+
{
201+
throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " +
202+
"has happened. Please close the current session and re-run your statement in a" +
203+
" new session." );
204+
}
205+
}
206+
190207
private void ensureNoOpenTransactionBeforeRunningSession()
191208
{
192209
if ( currentTransaction != null )

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -66,32 +66,14 @@ public InternalStatementResult( Connection connection, Statement statement )
6666

6767
private StreamCollector newRunResponseCollector()
6868
{
69-
return new StreamCollector()
69+
return new StreamCollector.NoOperationStreamCollector()
7070
{
7171
@Override
7272
public void keys( String[] names )
7373
{
7474
keys = Arrays.asList( names );
7575
}
7676

77-
@Override
78-
public void record( Value[] fields ) {}
79-
80-
@Override
81-
public void statementType( StatementType type ) {}
82-
83-
@Override
84-
public void statementStatistics( SummaryCounters statistics ) {}
85-
86-
@Override
87-
public void plan( Plan plan ) {}
88-
89-
@Override
90-
public void profile( ProfiledPlan plan ) {}
91-
92-
@Override
93-
public void notifications( List<Notification> notifications ) {}
94-
9577
@Override
9678
public void done()
9779
{
@@ -106,11 +88,8 @@ public void done()
10688
private StreamCollector newPullAllResponseCollector( Statement statement )
10789
{
10890
final SummaryBuilder summaryBuilder = new SummaryBuilder( statement );
109-
return new StreamCollector()
91+
return new StreamCollector.NoOperationStreamCollector()
11092
{
111-
@Override
112-
public void keys( String[] names ) {}
113-
11493
@Override
11594
public void record( Value[] fields )
11695
{

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ public void close()
112112
}
113113
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
114114
{
115-
// If alwaysValid of the things we've put in the queue have been sent off, there is no need to
116-
// do this, we could just clear the queue. Future optimization.
117115
conn.run( "ROLLBACK", Collections.<String, Value>emptyMap(), StreamCollector.NO_OP );
118116
conn.discardAll();
119117
conn.sync();
@@ -171,6 +169,8 @@ public StatementResult run( Statement statement )
171169
}
172170
catch ( Neo4jException e )
173171
{
172+
// Failed to send messages to the server probably due to IOException in the socket.
173+
// So we should stop sending more messages in this transaction
174174
state = State.FAILED;
175175
throw e;
176176
}
@@ -200,12 +200,8 @@ public TypeSystem typeSystem()
200200
return InternalTypeSystem.TYPE_SYSTEM;
201201
}
202202

203-
// TODO: This is wrong. This is only needed because we changed the SSM
204-
// to move to IDLE on any exception (so the normal `ROLLBACK` statement won't work).
205-
// We should change the SSM to move to some special ROLLBACK_ONLY state instead and
206-
// remove this code path
207-
public void markAsRolledBack()
203+
public void markToClose()
208204
{
209-
state = State.ROLLED_BACK;
205+
state = State.FAILED;
210206
}
211207
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.messaging;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* ACK_FAILURE request message
25+
*
26+
* This message acts as a barrier after an error, informing the server that we've seen the error
27+
* message, and that messages that follow this one are safe to execute.
28+
*/
29+
public class AckFailureMessage implements Message
30+
{
31+
public static final AckFailureMessage ACK_FAILURE= new AckFailureMessage();
32+
33+
@Override
34+
public void dispatch( MessageHandler handler ) throws IOException
35+
{
36+
handler.handleAckFailureMessage();
37+
}
38+
39+
@Override
40+
public String toString()
41+
{
42+
return "[ACK_FAILURE]";
43+
}
44+
45+
@Override
46+
public boolean equals( Object obj )
47+
{
48+
return obj != null && obj.getClass() == getClass();
49+
}
50+
51+
@Override
52+
public int hashCode()
53+
{
54+
return 1;
55+
}
56+
}

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface MessageHandler
3636

3737
void handleResetMessage() throws IOException;
3838

39+
void handleAckFailureMessage() throws IOException;
40+
3941
// Responses
4042
void handleSuccessMessage( Map<String,Value> meta ) throws IOException;
4143

@@ -44,4 +46,5 @@ public interface MessageHandler
4446
void handleFailureMessage( String code, String message ) throws IOException;
4547

4648
void handleIgnoredMessage() throws IOException;
49+
4750
}

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
public class PackStreamMessageFormatV1 implements MessageFormat
5757
{
5858
public final static byte MSG_INIT = 0x01;
59+
public final static byte MSG_ACK_FAILURE = 0x0E;
5960
public final static byte MSG_RESET = 0x0F;
6061
public final static byte MSG_RUN = 0x10;
6162
public final static byte MSG_DISCARD_ALL = 0x2F;
@@ -151,6 +152,13 @@ public void handleResetMessage() throws IOException
151152
onMessageComplete.run();
152153
}
153154

155+
@Override
156+
public void handleAckFailureMessage() throws IOException
157+
{
158+
packer.packStructHeader( 0, MSG_ACK_FAILURE );
159+
onMessageComplete.run();
160+
}
161+
154162
@Override
155163
public void handleSuccessMessage( Map<String,Value> meta ) throws IOException
156164
{
@@ -319,7 +327,7 @@ private void packValue( Value value ) throws IOException
319327
break;
320328

321329
default:
322-
throw new UnsupportedOperationException( "Unknown type: " + value );
330+
throw new IOException( "Unknown type: " + value );
323331
}
324332
}
325333

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,26 @@ public void pullAll( StreamCollector collector )
100100
}
101101

102102
@Override
103-
public void reset( StreamCollector collector )
103+
public void reset()
104104
{
105105
try
106106
{
107107
markAsInUse();
108-
delegate.reset( collector );
108+
delegate.reset();
109+
}
110+
finally
111+
{
112+
markAsAvailable();
113+
}
114+
}
115+
116+
@Override
117+
public void ackFailure()
118+
{
119+
try
120+
{
121+
markAsInUse();
122+
delegate.ackFailure();
109123
}
110124
finally
111125
{
@@ -181,6 +195,12 @@ public void onError( Runnable runnable )
181195
delegate.onError( runnable );
182196
}
183197

198+
@Override
199+
public boolean hasUnrecoverableErrors()
200+
{
201+
return delegate.hasUnrecoverableErrors();
202+
}
203+
184204
private void markAsAvailable()
185205
{
186206
inUse.set( false );

driver/src/main/java/org/neo4j/driver/internal/net/LoggingResponseHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.neo4j.driver.v1.Logger;
2525
import org.neo4j.driver.v1.Value;
2626

27+
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
2728
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
2829
import static org.neo4j.driver.internal.messaging.IgnoredMessage.IGNORED;
2930
import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL;
@@ -74,6 +75,13 @@ public void handleResetMessage()
7475
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, RESET );
7576
}
7677

78+
@Override
79+
public void handleAckFailureMessage()
80+
{
81+
super.handleAckFailureMessage();
82+
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, ACK_FAILURE );
83+
}
84+
7785
@Override
7886
public void handleSuccessMessage( Map<String,Value> meta )
7987
{

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,7 @@ public void receiveOne( SocketResponseHandler handler ) throws IOException
161161
{
162162
reader.read( handler );
163163

164-
// TODO: all the errors come from the following trace should result in the termination of this channel
165-
// https://github.com/neo4j/neo4j/blob/3
166-
// .0/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltProtocolV1.java#L86
164+
// Stop immediately if bolt protocol error happened on the server
167165
if ( handler.protocolViolationErrorOccurred() )
168166
{
169167
stop();

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.neo4j.driver.internal.messaging.InitMessage;
2828
import org.neo4j.driver.internal.messaging.Message;
2929
import org.neo4j.driver.internal.messaging.PullAllMessage;
30-
import org.neo4j.driver.internal.messaging.ResetMessage;
3130
import org.neo4j.driver.internal.messaging.RunMessage;
3231
import org.neo4j.driver.internal.security.SecurityPlan;
3332
import org.neo4j.driver.internal.spi.Connection;
@@ -38,7 +37,9 @@
3837
import org.neo4j.driver.v1.exceptions.ClientException;
3938
import org.neo4j.driver.v1.exceptions.Neo4jException;
4039

40+
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
4141
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
42+
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
4243

4344
public class SocketConnection implements Connection
4445
{
@@ -67,7 +68,7 @@ public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, L
6768
@Override
6869
public void init( String clientName, Map<String,Value> authToken )
6970
{
70-
queueMessage( new InitMessage( clientName, authToken ), StreamCollector.NO_OP );
71+
queueMessage( new InitMessage( clientName, authToken ), StreamCollector.INIT );
7172
sync();
7273
}
7374

@@ -90,9 +91,15 @@ public void pullAll( StreamCollector collector )
9091
}
9192

9293
@Override
93-
public void reset( StreamCollector collector )
94+
public void reset()
9495
{
95-
queueMessage( ResetMessage.RESET, collector );
96+
queueMessage( RESET, StreamCollector.RESET );
97+
}
98+
99+
@Override
100+
public void ackFailure()
101+
{
102+
queueMessage( ACK_FAILURE, StreamCollector.ACK_FAILURE );
96103
}
97104

98105
@Override
@@ -147,7 +154,6 @@ private void assertNoServerFailure()
147154
{
148155
if ( responseHandler.serverFailureOccurred() )
149156
{
150-
reset( StreamCollector.NO_OP );
151157
Neo4jException exception = responseHandler.serverFailure();
152158
responseHandler.clearError();
153159
throw exception;
@@ -194,4 +200,10 @@ public void onError( Runnable runnable )
194200
{
195201
throw new UnsupportedOperationException( "Error subscribers are not supported on SocketConnection." );
196202
}
203+
204+
@Override
205+
public boolean hasUnrecoverableErrors()
206+
{
207+
throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." );
208+
}
197209
}

0 commit comments

Comments
 (0)