Skip to content

Commit 0a1af0d

Browse files
committed
Added #forEach() to async result cursor
Method allows processing of every incoming record in an async fashion. It returns a future which signals about processing completion. It can either be failed or completed with `null`.
1 parent cd90b3c commit 0a1af0d

File tree

6 files changed

+163
-30
lines changed

6 files changed

+163
-30
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,36 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import io.netty.util.concurrent.Future;
22+
import io.netty.util.concurrent.FutureListener;
23+
2124
import java.util.Collections;
2225
import java.util.List;
2326

2427
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2528
import org.neo4j.driver.internal.handlers.RunResponseHandler;
29+
import org.neo4j.driver.internal.util.Consumer;
2630
import org.neo4j.driver.v1.Record;
2731
import org.neo4j.driver.v1.Response;
2832
import org.neo4j.driver.v1.StatementResultCursor;
2933
import org.neo4j.driver.v1.summary.ResultSummary;
3034

35+
import static java.util.Objects.requireNonNull;
36+
3137
public class InternalStatementResultCursor implements StatementResultCursor
3238
{
39+
private final AsyncConnection connection;
3340
private final RunResponseHandler runResponseHandler;
3441
private final PullAllResponseHandler pullAllHandler;
3542

36-
private Response<Record> peekedRecordResponse;
43+
private InternalFuture<Record> peekedRecordResponse;
3744

38-
public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
45+
public InternalStatementResultCursor( AsyncConnection connection, RunResponseHandler runResponseHandler,
46+
PullAllResponseHandler pullAllHandler )
3947
{
40-
this.runResponseHandler = runResponseHandler;
41-
this.pullAllHandler = pullAllHandler;
48+
this.connection = requireNonNull( connection );
49+
this.runResponseHandler = requireNonNull( runResponseHandler );
50+
this.pullAllHandler = requireNonNull( pullAllHandler );
4251
}
4352

4453
@Override
@@ -57,16 +66,7 @@ public Response<ResultSummary> summaryAsync()
5766
@Override
5867
public Response<Record> nextAsync()
5968
{
60-
if ( peekedRecordResponse != null )
61-
{
62-
Response<Record> result = peekedRecordResponse;
63-
peekedRecordResponse = null;
64-
return result;
65-
}
66-
else
67-
{
68-
return pullAllHandler.nextAsync();
69-
}
69+
return internalNextAsync();
7070
}
7171

7272
@Override
@@ -78,4 +78,60 @@ public Response<Record> peekAsync()
7878
}
7979
return peekedRecordResponse;
8080
}
81+
82+
@Override
83+
public Response<Void> forEachAsync( final Consumer<Record> action )
84+
{
85+
InternalPromise<Void> result = connection.newPromise();
86+
internalForEachAsync( action, result );
87+
return result;
88+
}
89+
90+
private void internalForEachAsync( final Consumer<Record> action, final InternalPromise<Void> result )
91+
{
92+
final InternalFuture<Record> recordFuture = internalNextAsync();
93+
94+
recordFuture.addListener( new FutureListener<Record>()
95+
{
96+
@Override
97+
public void operationComplete( Future<Record> future )
98+
{
99+
if ( future.isCancelled() )
100+
{
101+
result.cancel( true );
102+
}
103+
else if ( future.isSuccess() )
104+
{
105+
Record record = future.getNow();
106+
if ( record != null )
107+
{
108+
action.accept( record );
109+
internalForEachAsync( action, result );
110+
}
111+
else
112+
{
113+
result.setSuccess( null );
114+
}
115+
}
116+
else
117+
{
118+
result.setFailure( future.cause() );
119+
}
120+
}
121+
} );
122+
}
123+
124+
private InternalFuture<Record> internalNextAsync()
125+
{
126+
if ( peekedRecordResponse != null )
127+
{
128+
InternalFuture<Record> result = peekedRecordResponse;
129+
peekedRecordResponse = null;
130+
return result;
131+
}
132+
else
133+
{
134+
return pullAllHandler.nextAsync();
135+
}
136+
}
81137
}

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static InternalFuture<StatementResultCursor> runAsync( AsyncConnection co
4343
return runAsync( connection, statement, null );
4444
}
4545

46-
public static InternalFuture<StatementResultCursor> runAsync( AsyncConnection connection, Statement statement,
46+
public static InternalFuture<StatementResultCursor> runAsync( final AsyncConnection connection, Statement statement,
4747
ExplicitTransaction tx )
4848
{
4949
String query = statement.text();
@@ -62,7 +62,7 @@ public static InternalFuture<StatementResultCursor> runAsync( AsyncConnection co
6262
@Override
6363
public StatementResultCursor apply( Void ignore )
6464
{
65-
return new InternalStatementResultCursor( runHandler, pullAllHandler );
65+
return new InternalStatementResultCursor( connection, runHandler, pullAllHandler );
6666
}
6767
} );
6868
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,7 @@ public synchronized void onSuccess( Map<String,Value> metadata )
8282

8383
succeeded = true;
8484
afterSuccess();
85-
86-
if ( recordPromise != null )
87-
{
88-
recordPromise.setSuccess( null );
89-
recordPromise = null;
90-
}
85+
succeedRecordPromise( null );
9186
}
9287

9388
protected abstract void afterSuccess();
@@ -97,12 +92,7 @@ public synchronized void onFailure( Throwable error )
9792
{
9893
failure = error;
9994
afterFailure( error );
100-
101-
if ( recordPromise != null )
102-
{
103-
recordPromise.setFailure( error );
104-
recordPromise = null;
105-
}
95+
failRecordPromise( error );
10696
}
10797

10898
protected abstract void afterFailure( Throwable error );
@@ -114,8 +104,7 @@ public synchronized void onRecord( Value[] fields )
114104

115105
if ( recordPromise != null )
116106
{
117-
recordPromise.setSuccess( record );
118-
recordPromise = null;
107+
succeedRecordPromise( record );
119108
}
120109
else
121110
{
@@ -141,6 +130,7 @@ public synchronized InternalFuture<Record> nextAsync()
141130
if ( recordPromise == null )
142131
{
143132
recordPromise = connection.newPromise();
133+
System.out.println( "setting promise " + recordPromise.hashCode() );
144134
}
145135
return recordPromise;
146136
}
@@ -191,6 +181,26 @@ private Record dequeueRecord()
191181
return record;
192182
}
193183

184+
private void succeedRecordPromise( Record record )
185+
{
186+
if ( recordPromise != null )
187+
{
188+
InternalPromise<Record> promise = recordPromise;
189+
recordPromise = null;
190+
promise.setSuccess( record );
191+
}
192+
}
193+
194+
private void failRecordPromise( Throwable error )
195+
{
196+
if ( recordPromise != null )
197+
{
198+
InternalPromise<Record> promise = recordPromise;
199+
recordPromise = null;
200+
promise.setFailure( error );
201+
}
202+
}
203+
194204
private ResultSummary extractResultSummary( Map<String,Value> metadata )
195205
{
196206
return new InternalResultSummary( statement, connection.serverInfo(), extractStatementType( metadata ),

driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.List;
2222

23+
import org.neo4j.driver.internal.util.Consumer;
2324
import org.neo4j.driver.v1.summary.ResultSummary;
2425

2526
public interface StatementResultCursor
@@ -36,4 +37,6 @@ public interface StatementResultCursor
3637
Response<Record> nextAsync();
3738

3839
Response<Record> peekAsync();
40+
41+
Response<Void> forEachAsync( Consumer<Record> action );
3942
}

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636

3737
import org.neo4j.driver.internal.async.InternalPromise;
38+
import org.neo4j.driver.internal.util.Consumer;
3839
import org.neo4j.driver.v1.Record;
3940
import org.neo4j.driver.v1.Response;
4041
import org.neo4j.driver.v1.ResponseListener;
@@ -477,6 +478,18 @@ public void shouldPeekRecordFromCursor()
477478
assertNull( await( cursor.nextAsync() ) );
478479
}
479480

481+
@Test
482+
public void shouldForEachWithEmptyCursor()
483+
{
484+
testForEach( "CREATE ()", 0 );
485+
}
486+
487+
@Test
488+
public void shouldForEachWithNonEmptyCursor()
489+
{
490+
testForEach( "UNWIND range(1, 10000) AS x RETURN x", 10000 );
491+
}
492+
480493
private Future<List<Future<Record>>> runNestedQueries( StatementResultCursor inputCursor )
481494
{
482495
Promise<List<Future<Record>>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise();
@@ -545,6 +558,24 @@ private long countNodesByLabel( String label )
545558
return session.run( "MATCH (n:" + label + ") RETURN count(n)" ).single().get( 0 ).asLong();
546559
}
547560

561+
private void testForEach( String query, int expectedSeenRecords )
562+
{
563+
StatementResultCursor cursor = await( session.runAsync( query ) );
564+
565+
final AtomicInteger recordsSeen = new AtomicInteger();
566+
Response<Void> forEachDone = cursor.forEachAsync( new Consumer<Record>()
567+
{
568+
@Override
569+
public void accept( Record record )
570+
{
571+
recordsSeen.incrementAndGet();
572+
}
573+
} );
574+
575+
assertNull( await( forEachDone ) );
576+
assertEquals( expectedSeenRecords, recordsSeen.get() );
577+
}
578+
548579
private static void assertSyntaxError( Exception e )
549580
{
550581
assertThat( e, instanceOf( ClientException.class ) );

driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525

2626
import java.util.Arrays;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicInteger;
2829

30+
import org.neo4j.driver.internal.util.Consumer;
2931
import org.neo4j.driver.v1.Record;
3032
import org.neo4j.driver.v1.Response;
3133
import org.neo4j.driver.v1.Session;
@@ -544,12 +546,43 @@ public void shouldPeekRecordFromCursor()
544546
await( tx.rollbackAsync() );
545547
}
546548

549+
@Test
550+
public void shouldForEachWithEmptyCursor()
551+
{
552+
testForEach( "MATCH (n:SomeReallyStrangeLabel) RETURN n", 0 );
553+
}
554+
555+
@Test
556+
public void shouldForEachWithNonEmptyCursor()
557+
{
558+
testForEach( "UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555 );
559+
}
560+
547561
private int countNodes( Object id )
548562
{
549563
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
550564
return result.single().get( 0 ).asInt();
551565
}
552566

567+
private void testForEach( String query, int expectedSeenRecords )
568+
{
569+
Transaction tx = await( session.beginTransactionAsync() );
570+
StatementResultCursor cursor = await( tx.runAsync( query ) );
571+
572+
final AtomicInteger recordsSeen = new AtomicInteger();
573+
Response<Void> forEachDone = cursor.forEachAsync( new Consumer<Record>()
574+
{
575+
@Override
576+
public void accept( Record record )
577+
{
578+
recordsSeen.incrementAndGet();
579+
}
580+
} );
581+
582+
assertNull( await( forEachDone ) );
583+
assertEquals( expectedSeenRecords, recordsSeen.get() );
584+
}
585+
553586
private static void assertSyntaxError( Exception e )
554587
{
555588
assertThat( e, instanceOf( ClientException.class ) );

0 commit comments

Comments
 (0)