Skip to content

Commit 5e9ce3f

Browse files
committed
Optimized #listAsync()
Method `InternalStatementResultCursor#listAsync()` returns a future containing list of all records. Records can potentially be transformed using the `#listAsync(Function<Record, T>)` overload. Implementation used to be built on top of `#nextAsync()` and invoked it recursively in `ForkJoinPool.commonPool` workers. This resulted in quite a lot of garbage generation because every record needed to be wrapped in a `CompletableFuture`. Also execution in the separate threads resulted in increased CPU load. `PullAllResponseHandler` is responsible for buffering of incoming records in a queue. This commit makes `#listAsync()` implementation wait for final PULL_ALL message and then transform the queue into list. When FAILURE is received for PULL_ALL resulting future is failed. When SUCCESS is received for PULL_ALL given transformation function is applied to the queue of records and resulting list is returned. This results in less garbage being generated because no intermediate `CompletableFuture`s are created.
1 parent fa0c386 commit 5e9ce3f

File tree

4 files changed

+198
-136
lines changed

4 files changed

+198
-136
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.concurrent.CompletableFuture;
2423
import java.util.concurrent.CompletionStage;
@@ -118,9 +117,7 @@ public CompletionStage<List<Record>> listAsync()
118117
@Override
119118
public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
120119
{
121-
CompletableFuture<List<T>> resultFuture = new CompletableFuture<>();
122-
internalListAsync( new ArrayList<>(), resultFuture, mapFunction );
123-
return resultFuture;
120+
return pullAllHandler.listAsync( mapFunction );
124121
}
125122

126123
public CompletionStage<Throwable> failureAsync()
@@ -160,40 +157,4 @@ else if ( record != null )
160157
}
161158
} );
162159
}
163-
164-
private <T> void internalListAsync( List<T> result, CompletableFuture<List<T>> resultFuture,
165-
Function<Record,T> mapFunction )
166-
{
167-
CompletionStage<Record> recordFuture = nextAsync();
168-
169-
// use async completion listener because of recursion, otherwise it is possible for
170-
// the caller thread to get StackOverflowError when result is large and buffered
171-
recordFuture.whenCompleteAsync( ( record, completionError ) ->
172-
{
173-
Throwable error = Futures.completionExceptionCause( completionError );
174-
if ( error != null )
175-
{
176-
resultFuture.completeExceptionally( error );
177-
}
178-
else if ( record != null )
179-
{
180-
T value;
181-
try
182-
{
183-
value = mapFunction.apply( record );
184-
}
185-
catch ( Throwable mapError )
186-
{
187-
resultFuture.completeExceptionally( mapError );
188-
return;
189-
}
190-
result.add( value );
191-
internalListAsync( result, resultFuture, mapFunction );
192-
}
193-
else
194-
{
195-
resultFuture.complete( result );
196-
}
197-
} );
198-
}
199160
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.neo4j.driver.internal.handlers;
2020

2121
import java.util.ArrayDeque;
22+
import java.util.ArrayList;
23+
import java.util.List;
2224
import java.util.Map;
2325
import java.util.Queue;
2426
import java.util.concurrent.CompletableFuture;
@@ -33,6 +35,7 @@
3335
import org.neo4j.driver.v1.Statement;
3436
import org.neo4j.driver.v1.Value;
3537
import org.neo4j.driver.v1.summary.ResultSummary;
38+
import org.neo4j.driver.v1.util.Function;
3639

3740
import static java.util.Collections.emptyMap;
3841
import static java.util.Objects.requireNonNull;
@@ -158,6 +161,18 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
158161
} );
159162
}
160163

164+
public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
165+
{
166+
return failureAsync().thenApply( error ->
167+
{
168+
if ( error != null )
169+
{
170+
throw Futures.asCompletionException( error );
171+
}
172+
return recordsAsList( mapFunction );
173+
} );
174+
}
175+
161176
public synchronized CompletionStage<Throwable> failureAsync()
162177
{
163178
if ( failure != null )
@@ -213,6 +228,22 @@ private Record dequeueRecord()
213228
return record;
214229
}
215230

231+
private <T> List<T> recordsAsList( Function<Record,T> mapFunction )
232+
{
233+
if ( !finished )
234+
{
235+
throw new IllegalStateException( "Can't get records as list because SUCCESS or FAILURE did not arrive" );
236+
}
237+
238+
List<T> result = new ArrayList<>( records.size() );
239+
while ( !records.isEmpty() )
240+
{
241+
Record record = records.poll();
242+
result.add( mapFunction.apply( record ) );
243+
}
244+
return result;
245+
}
246+
216247
private Throwable extractFailure()
217248
{
218249
if ( failure == null )

driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultCursorTest.java

Lines changed: 42 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3939
import org.neo4j.driver.v1.summary.ResultSummary;
4040
import org.neo4j.driver.v1.summary.StatementType;
41+
import org.neo4j.driver.v1.util.Function;
42+
import org.neo4j.driver.v1.util.Functions;
4143

4244
import static java.util.Arrays.asList;
4345
import static java.util.Collections.emptyList;
@@ -54,6 +56,7 @@
5456
import static org.mockito.Mockito.verify;
5557
import static org.mockito.Mockito.when;
5658
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
59+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5760
import static org.neo4j.driver.v1.Values.value;
5861
import static org.neo4j.driver.v1.Values.values;
5962
import static org.neo4j.driver.v1.util.TestUtil.await;
@@ -354,159 +357,102 @@ public void shouldFailForEachWhenGivenActionThrows()
354357
}
355358

356359
@Test
357-
public void shouldListAsyncWhenResultContainsMultipleRecords()
360+
public void shouldReturnFailureWhenExists()
358361
{
359362
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
360363

361-
Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) );
362-
Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 2, 2 ) );
363-
Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 3, 3 ) );
364-
Record record4 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 4, 4, 4 ) );
365-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) )
366-
.thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) )
367-
.thenReturn( completedFuture( record4 ) ).thenReturn( completedWithNull() );
364+
ServiceUnavailableException error = new ServiceUnavailableException( "Hi" );
365+
when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( error ) );
368366

369367
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
370368

371-
assertEquals( asList( record1, record2, record3, record4 ), await( cursor.listAsync() ) );
369+
assertEquals( error, await( cursor.failureAsync() ) );
372370
}
373371

374372
@Test
375-
public void shouldListAsyncWhenResultContainsOneRecords()
373+
public void shouldReturnNullFailureWhenDoesNotExist()
376374
{
377375
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
378-
379-
Record record = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 1, 1 ) );
380-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record ) )
381-
.thenReturn( completedWithNull() );
376+
when( pullAllHandler.failureAsync() ).thenReturn( completedWithNull() );
382377

383378
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
384379

385-
assertEquals( singletonList( record ), await( cursor.listAsync() ) );
380+
assertNull( await( cursor.failureAsync() ) );
386381
}
387382

388383
@Test
389-
public void shouldListAsyncWhenResultContainsNoRecords()
384+
public void shouldListAsyncWithoutMapFunction()
390385
{
391386
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
392-
when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() );
393387

394-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
395-
396-
assertEquals( 0, await( cursor.listAsync() ).size() );
397-
}
398-
399-
@Test
400-
public void shouldListAsyncWithFunctionWhenResultContainsMultipleRecords()
401-
{
402-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
388+
Record record1 = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) );
389+
Record record2 = new InternalRecord( asList( "key1", "key2" ), values( 2, 2 ) );
390+
List<Record> records = asList( record1, record2 );
403391

404-
Record record1 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 11, 111 ) );
405-
Record record2 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 2, 22, 222 ) );
406-
Record record3 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 3, 33, 333 ) );
407-
Record record4 = new InternalRecord( asList( "key1", "key2", "key3" ), values( 4, 44, 444 ) );
408-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) )
409-
.thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) )
410-
.thenReturn( completedFuture( record4 ) ).thenReturn( completedWithNull() );
392+
when( pullAllHandler.listAsync( Functions.identity() ) ).thenReturn( completedFuture( records ) );
411393

412394
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
413395

414-
List<Integer> values = await( cursor.listAsync( record -> record.get( "key2" ).asInt() ) );
415-
assertEquals( asList( 11, 22, 33, 44 ), values );
396+
assertEquals( records, await( cursor.listAsync() ) );
397+
verify( pullAllHandler ).listAsync( Functions.identity() );
416398
}
417399

418400
@Test
419-
public void shouldListAsyncWithFunctionWhenResultContainsOneRecords()
401+
public void shouldListAsyncWithMapFunction()
420402
{
403+
Function<Record,String> mapFunction = record -> record.get( 0 ).asString();
421404
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
422405

423-
Record singleRecord = new InternalRecord( asList( "key1", "key2", "key3" ), values( 1, 11, 111 ) );
424-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( singleRecord ) )
425-
.thenReturn( completedWithNull() );
406+
List<String> values = asList( "a", "b", "c", "d", "e" );
407+
when( pullAllHandler.listAsync( mapFunction ) ).thenReturn( completedFuture( values ) );
426408

427409
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
428410

429-
List<Long> values = await( cursor.listAsync( record -> record.get( "key3" ).asLong() ) );
430-
assertEquals( singletonList( 111L ), values );
411+
assertEquals( values, await( cursor.listAsync( mapFunction ) ) );
412+
verify( pullAllHandler ).listAsync( mapFunction );
431413
}
432414

433415
@Test
434-
public void shouldListAsyncWithFunctionWhenResultContainsNoRecords()
416+
public void shouldPropagateFailureFromListAsyncWithoutMapFunction()
435417
{
436418
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
437-
when( pullAllHandler.nextAsync() ).thenReturn( completedWithNull() );
419+
RuntimeException error = new RuntimeException( "Hi" );
420+
when( pullAllHandler.listAsync( Functions.identity() ) ).thenReturn( failedFuture( error ) );
438421

439422
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
440423

441-
List<String> values = await( cursor.listAsync( record -> record.get( "key42" ).asString() ) );
442-
assertEquals( 0, values.size() );
443-
}
444-
445-
@Test
446-
public void shouldFailListAsyncWhenGivenFunctionThrows()
447-
{
448-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
449-
450-
Record record1 = new InternalRecord( asList( "key1", "key2" ), values( 1, 1 ) );
451-
Record record2 = new InternalRecord( asList( "key1", "key2" ), values( 2, 2 ) );
452-
Record record3 = new InternalRecord( asList( "key1", "key2" ), values( 3, 3 ) );
453-
when( pullAllHandler.nextAsync() ).thenReturn( completedFuture( record1 ) )
454-
.thenReturn( completedFuture( record2 ) ).thenReturn( completedFuture( record3 ) )
455-
.thenReturn( completedWithNull() );
456-
457-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
458-
459-
AtomicInteger recordsProcessed = new AtomicInteger();
460-
RuntimeException error = new RuntimeException( "Hello" );
461-
462-
CompletionStage<List<Integer>> stage = cursor.listAsync( record ->
463-
{
464-
if ( record.get( "key1" ).asInt() == 2 )
465-
{
466-
throw error;
467-
}
468-
else
469-
{
470-
recordsProcessed.incrementAndGet();
471-
return record.get( "key1" ).asInt();
472-
}
473-
} );
474-
475424
try
476425
{
477-
await( stage );
426+
await( cursor.listAsync() );
478427
fail( "Exception expected" );
479428
}
480429
catch ( RuntimeException e )
481430
{
482431
assertEquals( error, e );
483432
}
484-
assertEquals( 1, recordsProcessed.get() );
485-
verify( pullAllHandler, times( 2 ) ).nextAsync();
486-
}
487-
488-
@Test
489-
public void shouldReturnFailureWhenExists()
490-
{
491-
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
492-
493-
ServiceUnavailableException error = new ServiceUnavailableException( "Hi" );
494-
when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( error ) );
495-
496-
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
497-
498-
assertEquals( error, await( cursor.failureAsync() ) );
433+
verify( pullAllHandler ).listAsync( Functions.identity() );
499434
}
500435

501436
@Test
502-
public void shouldReturnNullFailureWhenDoesNotExist()
437+
public void shouldPropagateFailureFromListAsyncWithMapFunction()
503438
{
439+
Function<Record,String> mapFunction = record -> record.get( 0 ).asString();
504440
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
505-
when( pullAllHandler.failureAsync() ).thenReturn( completedWithNull() );
441+
RuntimeException error = new RuntimeException( "Hi" );
442+
when( pullAllHandler.listAsync( mapFunction ) ).thenReturn( failedFuture( error ) );
506443

507444
InternalStatementResultCursor cursor = newCursor( pullAllHandler );
508445

509-
assertNull( await( cursor.failureAsync() ) );
446+
try
447+
{
448+
await( cursor.listAsync( mapFunction ) );
449+
fail( "Exception expected" );
450+
}
451+
catch ( RuntimeException e )
452+
{
453+
assertEquals( error, e );
454+
}
455+
verify( pullAllHandler ).listAsync( mapFunction );
510456
}
511457

512458
private static InternalStatementResultCursor newCursor( PullAllResponseHandler pullAllHandler )

0 commit comments

Comments
 (0)