Skip to content

Commit 9dcd709

Browse files
committed
Optimize StatementResultCursor#consumeAsync()
Previously it was implemented using `#forEachAsync()` and a no-op consumer. This resulted in every record being added to the queue of incoming records, then wrapped in a `CompletableFuture` and only then thrown away. So it did a lot more work than needed. This commit optimizes `#consumeAsync()` and `StatementResult#consume()` that builds on top of it. Now all incoming values will not be transformed into records and will be dropped right away. So no queueing and `CompletableFuture`s will be involved.
1 parent e10f2b1 commit 9dcd709

File tree

6 files changed

+384
-62
lines changed

6 files changed

+384
-62
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ public CompletionStage<Record> singleAsync()
9595
@Override
9696
public CompletionStage<ResultSummary> consumeAsync()
9797
{
98-
return forEachAsync( record ->
99-
{
100-
} );
98+
return pullAllHandler.consumeAsync();
10199
}
102100

103101
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
6262
private Throwable failure;
6363
private ResultSummary summary;
6464

65+
private boolean ignoreRecords;
6566
private CompletableFuture<Record> recordFuture;
6667
private CompletableFuture<Throwable> failureFuture;
6768

@@ -116,9 +117,16 @@ public synchronized void onFailure( Throwable error )
116117
@Override
117118
public synchronized void onRecord( Value[] fields )
118119
{
119-
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
120-
enqueueRecord( record );
121-
completeRecordFuture( record );
120+
if ( ignoreRecords )
121+
{
122+
completeRecordFuture( null );
123+
}
124+
else
125+
{
126+
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
127+
enqueueRecord( record );
128+
completeRecordFuture( record );
129+
}
122130
}
123131

124132
public synchronized CompletionStage<Record> peekAsync()
@@ -131,7 +139,7 @@ public synchronized CompletionStage<Record> peekAsync()
131139
return failedFuture( extractFailure() );
132140
}
133141

134-
if ( finished )
142+
if ( ignoreRecords || finished )
135143
{
136144
return completedWithNull();
137145
}
@@ -165,6 +173,13 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
165173
} );
166174
}
167175

176+
public synchronized CompletionStage<ResultSummary> consumeAsync()
177+
{
178+
ignoreRecords = true;
179+
records.clear();
180+
return summaryAsync();
181+
}
182+
168183
public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
169184
{
170185
return failureAsync().thenApply( error ->

driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java

Lines changed: 32 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -197,60 +197,6 @@ public void shouldFailWhenAskedForSingleRecordButResultContainsMore()
197197
}
198198
}
199199

200-
@Test
201-
public void shouldConsumeAsyncWhenResultContainsMultipleRecords()
202-
{
203-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
204-
205-
Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) );
206-
Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 2, 2 ) );
207-
Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) );
208-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) )
209-
.thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) )
210-
.thenReturn( completedWithNull() );
211-
212-
ResultSummary summary = mock( ResultSummary.class );
213-
when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) );
214-
215-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
216-
217-
assertEquals( summary, await( cursor.consumeAsync() ) );
218-
verify( pullAllHandler, times( 4 ) ).nextAsync();
219-
}
220-
221-
@Test
222-
public void shouldConsumeAsyncWhenResultContainsOneRecords()
223-
{
224-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
225-
226-
Record record = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) );
227-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) )
228-
.thenReturn( completedWithNull() );
229-
230-
ResultSummary summary = mock( ResultSummary.class );
231-
when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) );
232-
233-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
234-
235-
assertEquals( summary, await( cursor.consumeAsync() ) );
236-
verify( pullAllHandler, times( 2 ) ).nextAsync();
237-
}
238-
239-
@Test
240-
public void shouldConsumeAsyncWhenResultContainsNoRecords()
241-
{
242-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
243-
when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() );
244-
245-
ResultSummary summary = mock( ResultSummary.class );
246-
when( pullAllHandler.summaryAsync() ).thenReturn( completedFuture( summary ) );
247-
248-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
249-
250-
assertEquals( summary, await( cursor.consumeAsync() ) );
251-
verify( pullAllHandler ).nextAsync();
252-
}
253-
254200
@Test
255201
public void shouldForEachAsyncWhenResultContainsMultipleRecords()
256202
{
@@ -455,6 +401,38 @@ public void shouldPropagateFailureFromListAsyncWithMapFunction()
455401
verify( pullAllHandler ).listAsync( mapFunction );
456402
}
457403

404+
@Test
405+
public void shouldConsumeAsync()
406+
{
407+
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
408+
ResultSummary summary = mock( ResultSummary.class );
409+
when( pullAllHandler.consumeAsync() ).thenReturn( completedFuture( summary ) );
410+
411+
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
412+
413+
assertEquals( summary, await( cursor.consumeAsync() ) );
414+
}
415+
416+
@Test
417+
public void shouldPropagateFailureInConsumeAsync()
418+
{
419+
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
420+
RuntimeException error = new RuntimeException( "Hi" );
421+
when( pullAllHandler.consumeAsync() ).thenReturn( failedFuture( error ) );
422+
423+
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
424+
425+
try
426+
{
427+
await( cursor.consumeAsync() );
428+
fail( "Exception expected" );
429+
}
430+
catch ( RuntimeException e )
431+
{
432+
assertEquals( error, e );
433+
}
434+
}
435+
458436
private static InternalStatementResultCursor newCursor( PullAllResponseHandler pullAllHandler )
459437
{
460438
return new InternalStatementResultCursor( new RunResponseHandler( new CompletableFuture<>() ), pullAllHandler );

0 commit comments

Comments
 (0)