Skip to content

Commit 6eef420

Browse files
committed
Added #listAsync() to async result cursor
Method allows async retrieval of all records as list.
1 parent 0a1af0d commit 6eef420

File tree

5 files changed

+99
-1
lines changed

5 files changed

+99
-1
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.util.concurrent.Future;
2222
import io.netty.util.concurrent.FutureListener;
2323

24+
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.List;
2627

@@ -87,6 +88,14 @@ public Response<Void> forEachAsync( final Consumer<Record> action )
8788
return result;
8889
}
8990

91+
@Override
92+
public Response<List<Record>> listAsync()
93+
{
94+
InternalPromise<List<Record>> result = connection.newPromise();
95+
internalListAsync( new ArrayList<Record>(), result );
96+
return result;
97+
}
98+
9099
private void internalForEachAsync( final Consumer<Record> action, final InternalPromise<Void> result )
91100
{
92101
final InternalFuture<Record> recordFuture = internalNextAsync();
@@ -121,6 +130,40 @@ else if ( future.isSuccess() )
121130
} );
122131
}
123132

133+
private void internalListAsync( final List<Record> records, final InternalPromise<List<Record>> result )
134+
{
135+
final InternalFuture<Record> recordFuture = internalNextAsync();
136+
137+
recordFuture.addListener( new FutureListener<Record>()
138+
{
139+
@Override
140+
public void operationComplete( Future<Record> future )
141+
{
142+
if ( future.isCancelled() )
143+
{
144+
result.cancel( true );
145+
}
146+
else if ( future.isSuccess() )
147+
{
148+
Record record = future.getNow();
149+
if ( record != null )
150+
{
151+
records.add( record );
152+
internalListAsync( records, result );
153+
}
154+
else
155+
{
156+
result.setSuccess( records );
157+
}
158+
}
159+
else
160+
{
161+
result.setFailure( future.cause() );
162+
}
163+
}
164+
} );
165+
}
166+
124167
private InternalFuture<Record> internalNextAsync()
125168
{
126169
if ( peekedRecordResponse != null )

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ public synchronized InternalFuture<Record> nextAsync()
130130
if ( recordPromise == null )
131131
{
132132
recordPromise = connection.newPromise();
133-
System.out.println( "setting promise " + recordPromise.hashCode() );
134133
}
135134
return recordPromise;
136135
}

driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,6 @@ public interface StatementResultCursor
3939
Response<Record> peekAsync();
4040

4141
Response<Void> forEachAsync( Consumer<Record> action );
42+
43+
Response<List<Record>> listAsync();
4244
}

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.Arrays;
31+
import java.util.Collections;
3132
import java.util.Iterator;
3233
import java.util.List;
3334
import java.util.concurrent.Future;
@@ -490,6 +491,19 @@ public void shouldForEachWithNonEmptyCursor()
490491
testForEach( "UNWIND range(1, 10000) AS x RETURN x", 10000 );
491492
}
492493

494+
@Test
495+
public void shouldConvertToListWithEmptyCursor()
496+
{
497+
testList( "MATCH (n:NoSuchLabel) RETURN n", Collections.emptyList() );
498+
}
499+
500+
@Test
501+
public void shouldConvertToListWithNonEmptyCursor()
502+
{
503+
testList( "UNWIND range(1, 100, 10) AS x RETURN x",
504+
Arrays.asList( 1L, 11L, 21L, 31L, 41L, 51L, 61L, 71L, 81L, 91L ) );
505+
}
506+
493507
private Future<List<Future<Record>>> runNestedQueries( StatementResultCursor inputCursor )
494508
{
495509
Promise<List<Future<Record>>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise();
@@ -576,6 +590,18 @@ public void accept( Record record )
576590
assertEquals( expectedSeenRecords, recordsSeen.get() );
577591
}
578592

593+
private <T> void testList( String query, List<T> expectedList )
594+
{
595+
StatementResultCursor cursor = await( session.runAsync( query ) );
596+
List<Record> records = await( cursor.listAsync() );
597+
List<Object> actualList = new ArrayList<>();
598+
for ( Record record : records )
599+
{
600+
actualList.add( record.get( 0 ).asObject() );
601+
}
602+
assertEquals( expectedList, actualList );
603+
}
604+
579605
private static void assertSyntaxError( Exception e )
580606
{
581607
assertThat( e, instanceOf( ClientException.class ) );

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import org.junit.Rule;
2424
import org.junit.Test;
2525

26+
import java.util.ArrayList;
2627
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.List;
2730
import java.util.concurrent.TimeUnit;
2831
import java.util.concurrent.atomic.AtomicInteger;
2932

@@ -558,6 +561,18 @@ public void shouldForEachWithNonEmptyCursor()
558561
testForEach( "UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555 );
559562
}
560563

564+
@Test
565+
public void shouldConvertToListWithEmptyCursor()
566+
{
567+
testList( "CREATE (:Person)-[:KNOWS]->(:Person)", Collections.emptyList() );
568+
}
569+
570+
@Test
571+
public void shouldConvertToListWithNonEmptyCursor()
572+
{
573+
testList( "UNWIND [1, '1', 2, '2', 3, '3'] AS x RETURN x", Arrays.asList( 1L, "1", 2L, "2", 3L, "3" ) );
574+
}
575+
561576
private int countNodes( Object id )
562577
{
563578
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
@@ -583,6 +598,19 @@ public void accept( Record record )
583598
assertEquals( expectedSeenRecords, recordsSeen.get() );
584599
}
585600

601+
private <T> void testList( String query, List<T> expectedList )
602+
{
603+
Transaction tx = await( session.beginTransactionAsync() );
604+
StatementResultCursor cursor = await( tx.runAsync( query ) );
605+
List<Record> records = await( cursor.listAsync() );
606+
List<Object> actualList = new ArrayList<>();
607+
for ( Record record : records )
608+
{
609+
actualList.add( record.get( 0 ).asObject() );
610+
}
611+
assertEquals( expectedList, actualList );
612+
}
613+
586614
private static void assertSyntaxError( Exception e )
587615
{
588616
assertThat( e, instanceOf( ClientException.class ) );

0 commit comments

Comments
 (0)