Skip to content

Commit 6e21f34

Browse files
author
Zhen Li
committed
Fixed a bug found with ClusterStressIT regarding how we send run and pull messages.
When creating the async result, we write a RUN message, followed by a PULL message. The RUN and PULL messages shall be flushed together. If RUN and PULL are flushed separately, the following scenario may happen: C: RUN "RETURN Wrong" {} {mode="r"} S: FAILURE Neo.ClientError.Statement.SyntaxError "Variable `Wrong` not defined (line 1, column 8 (offset: 7)) C: RESET C: PULL {n=1000} S: SUCCESS {} S: FAILURE Neo.ClientError.Request.Invalid "Message 'PULL Map{n -> Long(1000)}' cannot be handled by a session in the READY state."
1 parent 36eb748 commit 6e21f34

File tree

8 files changed

+18
-62
lines changed

8 files changed

+18
-62
lines changed

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public AsyncStatementResultCursorOnlyFactory( Connection connection, Message run
6060
public CompletionStage<AsyncStatementResultCursor> asyncResult()
6161
{
6262
// only write and flush messages when async result is wanted.
63-
connection.writeAndFlush( runMessage, runHandler );
63+
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
6464
pullAllHandler.prePopulateRecords();
6565

6666
if ( waitForRunResponse )

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,11 @@
4848
public class AutoPullResponseHandler extends BasicPullResponseHandler implements PullAllResponseHandler
4949
{
5050
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
51-
52-
private static final int RECORD_BUFFER_LOW_WATERMARK = Integer.getInteger( "recordBufferLowWatermark", 300 );
53-
private static final int RECORD_BUFFER_HIGH_WATERMARK = Integer.getInteger( "recordBufferHighWatermark", 1000 );
5451
private final long fetchSize;
5552

5653
// initialized lazily when first record arrives
5754
private Queue<Record> records = UNINITIALIZED_RECORDS;
5855

59-
// private boolean autoReadManagementEnabled = true;
6056
private ResultSummary summary;
6157
private Throwable failure;
6258

@@ -113,18 +109,6 @@ private void handleFailure( Throwable error )
113109
failure = error;
114110
}
115111
}
116-
//
117-
// @Override
118-
// public boolean canManageAutoRead()
119-
// {
120-
// return true;
121-
// }
122-
123-
// @Override
124-
// public synchronized void disableAutoReadManagement()
125-
// {
126-
// autoReadManagementEnabled = false;
127-
// }
128112

129113
public synchronized CompletionStage<Record> peekAsync()
130114
{
@@ -198,7 +182,6 @@ private synchronized CompletionStage<ResultSummary> pullAllAsync()
198182
}
199183
else
200184
{
201-
// enableAutoRead();
202185
request( UNLIMITED_FETCH_SIZE );
203186
if ( summaryFuture == null )
204187
{
@@ -217,32 +200,11 @@ private void enqueueRecord( Record record )
217200
}
218201

219202
records.add( record );
220-
221-
// boolean shouldBufferAllRecords = failureFuture != null;
222-
// // when failure is requested we have to buffer all remaining records and then return the error
223-
// // do not disable auto-read in this case, otherwise records will not be consumed and trailing
224-
// // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for the error
225-
// if ( !shouldBufferAllRecords && records.size() > RECORD_BUFFER_HIGH_WATERMARK )
226-
// {
227-
// // more than high watermark records are already queued, tell connection to stop auto-reading from network
228-
// // this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
229-
// // fetched from network faster than consumed
230-
// disableAutoRead();
231-
// }
232203
}
233204

234205
private Record dequeueRecord()
235206
{
236-
Record record = records.poll();
237-
238-
// if ( records.size() < RECORD_BUFFER_LOW_WATERMARK )
239-
// {
240-
// // less than low watermark records are now available in the buffer, tell connection to pre-fetch more
241-
// // and populate queue with new records from network
242-
// enableAutoRead();
243-
// }
244-
245-
return record;
207+
return records.poll();
246208
}
247209

248210
private <T> List<T> recordsAsList( Function<Record,T> mapFunction )
@@ -332,20 +294,4 @@ else if ( value == null )
332294
return completedFuture( value );
333295
}
334296
}
335-
336-
// private void enableAutoRead()
337-
// {
338-
// if ( autoReadManagementEnabled )
339-
// {
340-
// connection.enableAutoRead();
341-
// }
342-
// }
343-
//
344-
// private void disableAutoRead()
345-
// {
346-
// if ( autoReadManagementEnabled )
347-
// {
348-
// connection.disableAutoRead();
349-
// }
350-
// }
351297
}

driver/src/test/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorOnlyFactoryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import static org.mockito.ArgumentMatchers.any;
4343
import static org.mockito.Mockito.mock;
4444
import static org.mockito.Mockito.verify;
45-
import static org.mockito.Mockito.verifyNoMoreInteractions;
4645
import static org.mockito.Mockito.when;
4746
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4847
import static org.neo4j.driver.internal.util.Futures.failedFuture;
@@ -174,7 +173,7 @@ private AsyncStatementResultCursorOnlyFactory newResultCursorFactory( Completabl
174173

175174
private void verifyRunCompleted( Connection connection, CompletionStage<AsyncStatementResultCursor> cursorFuture )
176175
{
177-
verify( connection ).writeAndFlush( any( Message.class ), any( RunResponseHandler.class ) );
176+
verify( connection ).write( any( Message.class ), any( RunResponseHandler.class ) );
178177
assertThat( getNow( cursorFuture ), instanceOf( AsyncStatementResultCursorImpl.class ) );
179178
}
180179
}

driver/src/test/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1Test.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ private static ResponseHandler verifyRunInvoked( Connection connection )
369369
ArgumentCaptor<ResponseHandler> runHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
370370
ArgumentCaptor<ResponseHandler> pullAllHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class );
371371

372-
verify( connection ).writeAndFlush( eq( new RunMessage( QUERY, PARAMS ) ), runHandlerCaptor.capture() );
372+
verify( connection ).write( eq( new RunMessage( QUERY, PARAMS ) ), runHandlerCaptor.capture() );
373373
verify( connection ).writeAndFlush( eq( PullAllMessage.PULL_ALL ), pullAllHandlerCaptor.capture() );
374374

375375
assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) );

driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ private static ResponseHandlers verifyRunInvoked( Connection connection, boolean
473473
expectedMessage = RunWithMetadataMessage.explicitTxRunMessage( STATEMENT );
474474
}
475475

476-
verify( connection ).writeAndFlush( eq( expectedMessage ), runHandlerCaptor.capture() );
476+
verify( connection ).write( eq( expectedMessage ), runHandlerCaptor.capture() );
477477
verify( connection ).writeAndFlush( eq( PullAllMessage.PULL_ALL ), pullAllHandlerCaptor.capture() );
478478

479479
assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) );

driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.Set;
2424
import java.util.concurrent.CompletionStage;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import org.neo4j.driver.internal.BoltServerAddress;
@@ -106,6 +107,7 @@ private static class FailingConnection implements Connection
106107
{
107108
final Connection delegate;
108109
final AtomicReference<Throwable> nextRunFailure;
110+
final AtomicInteger count = new AtomicInteger( 2 ); // one failure for run, one failure for pull
109111

110112
FailingConnection( Connection delegate, AtomicReference<Throwable> nextRunFailure )
111113
{
@@ -222,13 +224,21 @@ private boolean tryFail( ResponseHandler handler1, ResponseHandler handler2 )
222224
Throwable failure = nextRunFailure.getAndSet( null );
223225
if ( failure != null )
224226
{
227+
int reportCount = count.get();
225228
if ( handler1 != null )
226229
{
227230
handler1.onFailure( failure );
231+
reportCount = count.decrementAndGet();
228232
}
229233
if ( handler2 != null )
230234
{
231235
handler2.onFailure( failure );
236+
reportCount = count.decrementAndGet();
237+
}
238+
239+
if ( reportCount > 0 )
240+
{
241+
nextRunFailure.compareAndSet( null, failure );
232242
}
233243
return true;
234244
}

driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ void tearDown()
130130
if ( driver != null )
131131
{
132132
driver.close();
133-
System.out.println( driver.metrics() );
134133
}
135134
}
136135

driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.TimeoutException;
4040
import java.util.concurrent.atomic.AtomicBoolean;
4141
import java.util.function.Function;
42+
import java.util.logging.Level;
4243

4344
import org.neo4j.driver.AccessMode;
4445
import org.neo4j.driver.AuthToken;
@@ -89,6 +90,7 @@
8990
import static org.junit.jupiter.api.Assertions.assertNotNull;
9091
import static org.junit.jupiter.api.Assertions.assertThrows;
9192
import static org.junit.jupiter.api.Assertions.assertTrue;
93+
import static org.neo4j.driver.Logging.console;
9294
import static org.neo4j.driver.Values.parameters;
9395
import static org.neo4j.driver.SessionConfig.builder;
9496
import static org.neo4j.driver.internal.InternalBookmark.parse;
@@ -1018,7 +1020,7 @@ private static int runCountNodes( StatementRunner statementRunner, String label,
10181020

10191021
private static Config configWithoutLogging()
10201022
{
1021-
return Config.builder().withLogging( DEV_NULL_LOGGING ).build();
1023+
return Config.builder().withLogging( console( Level.FINE ) ).build();
10221024
}
10231025

10241026
private static ExecutorService newExecutor()

0 commit comments

Comments
 (0)