Skip to content

Commit 24e87be

Browse files
authored
Merge pull request #410 from lutovich/1.5-more-async-cursor-methods
More helper methods in StatementResultCursor
2 parents c79fc2d + fe68cf4 commit 24e87be

File tree

5 files changed

+428
-24
lines changed

5 files changed

+428
-24
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,8 @@ public Record single()
106106

107107
if ( hasMoreThanOne )
108108
{
109-
throw new NoSuchRecordException( "Expected a result with a single record, but this result contains at least one more. " +
110-
"Ensure your query returns only one record, or use `first` instead of `single` if " +
111-
"you do not care about the number of records in the result." );
109+
throw new NoSuchRecordException( "Expected a result with a single record, but this result contains " +
110+
"at least one more. Ensure your query returns only one record." );
112111
}
113112

114113
return single;

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionStage;
2626
import java.util.function.Consumer;
27+
import java.util.function.Function;
2728

2829
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2930
import org.neo4j.driver.internal.handlers.RunResponseHandler;
3031
import org.neo4j.driver.v1.Record;
3132
import org.neo4j.driver.v1.StatementResultCursor;
33+
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
3234
import org.neo4j.driver.v1.summary.ResultSummary;
3335

3436
import static java.util.Objects.requireNonNull;
@@ -85,18 +87,54 @@ public CompletionStage<Record> peekAsync()
8587
}
8688

8789
@Override
88-
public CompletionStage<Void> forEachAsync( Consumer<Record> action )
90+
public CompletionStage<Record> singleAsync()
91+
{
92+
return nextAsync().thenCompose( firstRecord ->
93+
{
94+
if ( firstRecord == null )
95+
{
96+
throw new NoSuchRecordException( "Cannot retrieve a single record, because this cursor is empty." );
97+
}
98+
return nextAsync().thenApply( secondRecord ->
99+
{
100+
if ( secondRecord != null )
101+
{
102+
throw new NoSuchRecordException( "Expected a cursor with a single record, but this cursor " +
103+
"contains at least one more. Ensure your query returns only " +
104+
"one record." );
105+
}
106+
return firstRecord;
107+
} );
108+
} );
109+
}
110+
111+
@Override
112+
public CompletionStage<ResultSummary> consumeAsync()
113+
{
114+
return forEachAsync( record ->
115+
{
116+
} );
117+
}
118+
119+
@Override
120+
public CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action )
89121
{
90122
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
91123
internalForEachAsync( action, resultFuture );
92-
return resultFuture;
124+
return resultFuture.thenCompose( ignore -> summaryAsync() );
93125
}
94126

95127
@Override
96128
public CompletionStage<List<Record>> listAsync()
97129
{
98-
CompletableFuture<List<Record>> resultFuture = new CompletableFuture<>();
99-
internalListAsync( new ArrayList<>(), resultFuture );
130+
return listAsync( Function.identity() );
131+
}
132+
133+
@Override
134+
public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
135+
{
136+
CompletableFuture<List<T>> resultFuture = new CompletableFuture<>();
137+
internalListAsync( new ArrayList<>(), resultFuture, mapFunction );
100138
return resultFuture;
101139
}
102140

@@ -114,7 +152,15 @@ private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Vo
114152
}
115153
else if ( record != null )
116154
{
117-
action.accept( record );
155+
try
156+
{
157+
action.accept( record );
158+
}
159+
catch ( Throwable actionError )
160+
{
161+
resultFuture.completeExceptionally( actionError );
162+
return;
163+
}
118164
internalForEachAsync( action, resultFuture );
119165
}
120166
else
@@ -124,7 +170,8 @@ else if ( record != null )
124170
} );
125171
}
126172

127-
private void internalListAsync( List<Record> records, CompletableFuture<List<Record>> resultFuture )
173+
private <T> void internalListAsync( List<T> result, CompletableFuture<List<T>> resultFuture,
174+
Function<Record,T> mapFunction )
128175
{
129176
CompletionStage<Record> recordFuture = nextAsync();
130177

@@ -138,12 +185,22 @@ private void internalListAsync( List<Record> records, CompletableFuture<List<Rec
138185
}
139186
else if ( record != null )
140187
{
141-
records.add( record );
142-
internalListAsync( records, resultFuture );
188+
T value;
189+
try
190+
{
191+
value = mapFunction.apply( record );
192+
}
193+
catch ( Throwable mapError )
194+
{
195+
resultFuture.completeExceptionally( mapError );
196+
return;
197+
}
198+
result.add( value );
199+
internalListAsync( result, resultFuture, mapFunction );
143200
}
144201
else
145202
{
146-
resultFuture.complete( records );
203+
resultFuture.complete( result );
147204
}
148205
} );
149206
}

driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.CompletionStage;
2323
import java.util.function.Consumer;
24+
import java.util.function.Function;
2425

2526
import org.neo4j.driver.v1.summary.ResultSummary;
2627

@@ -39,7 +40,13 @@ public interface StatementResultCursor
3940

4041
CompletionStage<Record> peekAsync();
4142

42-
CompletionStage<Void> forEachAsync( Consumer<Record> action );
43+
CompletionStage<Record> singleAsync();
44+
45+
CompletionStage<ResultSummary> consumeAsync();
46+
47+
CompletionStage<ResultSummary> forEachAsync( Consumer<Record> action );
4348

4449
CompletionStage<List<Record>> listAsync();
50+
51+
<T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction );
4552
}

0 commit comments

Comments
 (0)