Skip to content

Commit da8e744

Browse files
author
Zhen Li
committed
Changed session.run not discard previous run result, but buffer all into memory.
This enables nested session runs.
1 parent 91e0bb4 commit da8e744

15 files changed

+153
-133
lines changed

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ public interface StatementResult extends Iterator<Record>
143143
*
144144
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
145145
*
146-
* If you want to obtain the summary without discard the records, invoke
147-
* {@link StatementResult#list()} before calling this method to buffer all records into memory.
146+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
148147
*
149148
* @return a summary for the whole query result.
150149
*/

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.neo4j.driver.Record;
2929
import org.neo4j.driver.Records;
30+
import org.neo4j.driver.StatementResult;
3031
import org.neo4j.driver.exceptions.NoSuchRecordException;
3132
import org.neo4j.driver.summary.ResultSummary;
3233

@@ -74,8 +75,7 @@ public interface StatementResultCursor
7475
* <p>
7576
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
7677
* <p>
77-
* If you want to obtain the summary without discarding the records, invoke {@link #listAsync()}
78-
* to buffer records into memory before calling this method.
78+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
7979
*
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@
2222

2323
public interface FailableCursor
2424
{
25+
CompletionStage<Throwable> consumeAsync();
2526
CompletionStage<Throwable> failureAsync();
2627
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public CompletionStage<Void> closeAsync()
194194
if ( cursor != null )
195195
{
196196
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
197-
return cursor.failureAsync();
197+
return cursor.consumeAsync();
198198
}
199199
// no result cursor exists so no error exists
200200
return completedWithNull();

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ private static CompletionStage<Throwable> retrieveFailure( CompletionStage<? ext
7474
{
7575
return cursorStage
7676
.exceptionally( cursor -> null )
77-
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() );
77+
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
7878
}
7979
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,18 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
111111
}
112112

113113
@Override
114-
public CompletionStage<Throwable> failureAsync()
114+
public CompletionStage<Throwable> consumeAsync()
115115
{
116116
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
117117
}
118118

119+
@Override
120+
public CompletionStage<Throwable> failureAsync()
121+
{
122+
return pullAllHandler.failureAsync();
123+
}
124+
125+
119126
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
120127
{
121128
CompletionStage<Record> recordFuture = nextAsync();

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,19 @@ public void cancel()
9191
}
9292

9393
@Override
94-
public CompletionStage<Throwable> failureAsync()
94+
public CompletionStage<Throwable> consumeAsync()
9595
{
9696
// calling this method will enforce discarding record stream and finish running cypher query
9797
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
9898
}
9999

100+
@Override
101+
public CompletionStage<Throwable> failureAsync()
102+
{
103+
// It is safe to discard records as either the streaming has not started at all, or the streaming is fully finished.
104+
return consumeAsync();
105+
}
106+
100107
@Override
101108
public CompletionStage<ResultSummary> summaryAsync()
102109
{

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,7 @@ public interface PullAllResponseHandler extends ResponseHandler
3636

3737
<T> CompletionStage<List<T>> listAsync( Function<Record, T> mapFunction );
3838

39+
CompletionStage<Throwable> failureAsync();
40+
3941
void prePopulateRecords();
4042
}

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> m
176176
return pullAllAsync().thenApply( summary -> recordsAsList( mapFunction ) );
177177
}
178178

179+
@Override
180+
public synchronized CompletionStage<Throwable> failureAsync()
181+
{
182+
return pullAllAsync().handle( ( ignore, error ) -> error );
183+
}
184+
179185
@Override
180186
public void prePopulateRecords()
181187
{

driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@
7373
import static org.junit.jupiter.api.Assertions.assertNull;
7474
import static org.junit.jupiter.api.Assertions.assertThrows;
7575
import static org.junit.jupiter.api.Assertions.assertTrue;
76-
import static org.neo4j.driver.Values.parameters;
7776
import static org.neo4j.driver.SessionConfig.builder;
77+
import static org.neo4j.driver.Values.parameters;
7878
import static org.neo4j.driver.internal.util.Futures.failedFuture;
7979
import static org.neo4j.driver.internal.util.Iterables.single;
8080
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;

driver/src/test/java/org/neo4j/driver/internal/async/AsyncStatementResultCursorImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ void shouldReturnFailureWhenExists()
291291
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
292292

293293
ServiceUnavailableException error = new ServiceUnavailableException( "Hi" );
294-
when( pullAllHandler.summaryAsync() ).thenReturn( failedFuture( error ) );
294+
when( pullAllHandler.failureAsync() ).thenReturn( completedFuture( error ) );
295295

296296
AsyncStatementResultCursorImpl cursor = newCursor( pullAllHandler );
297297

@@ -302,7 +302,7 @@ void shouldReturnFailureWhenExists()
302302
void shouldReturnNullFailureWhenDoesNotExist()
303303
{
304304
PullAllResponseHandler pullAllHandler = mock( PullAllResponseHandler.class );
305-
when( pullAllHandler.summaryAsync() ).thenReturn( completedWithNull() );
305+
when( pullAllHandler.failureAsync() ).thenReturn( completedWithNull() );
306306

307307
AsyncStatementResultCursorImpl cursor = newCursor( pullAllHandler );
308308

driver/src/test/java/org/neo4j/driver/internal/async/ResultCursorsHolderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private static CompletionStage<AsyncStatementResultCursorImpl> cursorWithError(
149149
private static CompletionStage<AsyncStatementResultCursorImpl> cursorWithFailureFuture( CompletableFuture<Throwable> future )
150150
{
151151
AsyncStatementResultCursorImpl cursor = mock( AsyncStatementResultCursorImpl.class );
152-
when( cursor.failureAsync() ).thenReturn( future );
152+
when( cursor.consumeAsync() ).thenReturn( future );
153153
return completedFuture( cursor );
154154
}
155155
}

driver/src/test/java/org/neo4j/driver/internal/handlers/LegacyPullAllResponseHandlerTest.java

Lines changed: 0 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020

2121
import org.junit.jupiter.api.Test;
2222

23-
import java.io.IOException;
2423
import java.util.List;
2524
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.CompletionStage;
2725
import java.util.function.Function;
2826

2927
import org.neo4j.driver.Record;
3028
import org.neo4j.driver.Statement;
31-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
32-
import org.neo4j.driver.exceptions.SessionExpiredException;
3329
import org.neo4j.driver.internal.spi.Connection;
3430
import org.neo4j.driver.summary.ResultSummary;
3531

@@ -52,121 +48,6 @@
5248

5349
class LegacyPullAllResponseHandlerTest extends PullAllResponseHandlerTestBase<LegacyPullAllResponseHandler>
5450
{
55-
@Test
56-
void shouldReturnNoFailureWhenAlreadySucceeded()
57-
{
58-
LegacyPullAllResponseHandler handler = newHandler();
59-
handler.onSuccess( emptyMap() );
60-
61-
Throwable failure = await( handler.failureAsync() );
62-
63-
assertNull( failure );
64-
}
65-
66-
@Test
67-
void shouldReturnNoFailureWhenSucceededAfterFailureRequested()
68-
{
69-
LegacyPullAllResponseHandler handler = newHandler();
70-
71-
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
72-
assertFalse( failureFuture.isDone() );
73-
74-
handler.onSuccess( emptyMap() );
75-
76-
assertTrue( failureFuture.isDone() );
77-
assertNull( await( failureFuture ) );
78-
}
79-
80-
@Test
81-
void shouldReturnFailureWhenAlreadyFailed()
82-
{
83-
LegacyPullAllResponseHandler handler = newHandler();
84-
85-
RuntimeException failure = new RuntimeException( "Ops" );
86-
handler.onFailure( failure );
87-
88-
Throwable receivedFailure = await( handler.failureAsync() );
89-
assertEquals( failure, receivedFailure );
90-
}
91-
92-
@Test
93-
void shouldReturnFailureWhenFailedAfterFailureRequested()
94-
{
95-
LegacyPullAllResponseHandler handler = newHandler();
96-
97-
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
98-
assertFalse( failureFuture.isDone() );
99-
100-
IOException failure = new IOException( "Broken pipe" );
101-
handler.onFailure( failure );
102-
103-
assertTrue( failureFuture.isDone() );
104-
assertEquals( failure, await( failureFuture ) );
105-
}
106-
107-
@Test
108-
void shouldReturnFailureWhenRequestedMultipleTimes()
109-
{
110-
LegacyPullAllResponseHandler handler = newHandler();
111-
112-
CompletableFuture<Throwable> failureFuture1 = handler.failureAsync().toCompletableFuture();
113-
CompletableFuture<Throwable> failureFuture2 = handler.failureAsync().toCompletableFuture();
114-
115-
assertFalse( failureFuture1.isDone() );
116-
assertFalse( failureFuture2.isDone() );
117-
118-
RuntimeException failure = new RuntimeException( "Unable to contact database" );
119-
handler.onFailure( failure );
120-
121-
assertTrue( failureFuture1.isDone() );
122-
assertTrue( failureFuture2.isDone() );
123-
124-
assertEquals( failure, await( failureFuture1 ) );
125-
assertEquals( failure, await( failureFuture2 ) );
126-
}
127-
128-
@Test
129-
void shouldReturnFailureOnlyOnceWhenFailedBeforeFailureRequested()
130-
{
131-
LegacyPullAllResponseHandler handler = newHandler();
132-
133-
ServiceUnavailableException failure = new ServiceUnavailableException( "Connection terminated" );
134-
handler.onFailure( failure );
135-
136-
assertEquals( failure, await( handler.failureAsync() ) );
137-
assertNull( await( handler.failureAsync() ) );
138-
}
139-
140-
@Test
141-
void shouldReturnFailureOnlyOnceWhenFailedAfterFailureRequested()
142-
{
143-
LegacyPullAllResponseHandler handler = newHandler();
144-
145-
CompletionStage<Throwable> failureFuture = handler.failureAsync();
146-
147-
SessionExpiredException failure = new SessionExpiredException( "Network unreachable" );
148-
handler.onFailure( failure );
149-
assertEquals( failure, await( failureFuture ) );
150-
151-
assertNull( await( handler.failureAsync() ) );
152-
}
153-
154-
@Test
155-
void shouldReturnSummaryWhenAlreadyFailedAndFailureConsumed()
156-
{
157-
Statement statement = new Statement( "CREATE ()" );
158-
LegacyPullAllResponseHandler handler = newHandler( statement );
159-
160-
ServiceUnavailableException failure = new ServiceUnavailableException( "Neo4j unreachable" );
161-
handler.onFailure( failure );
162-
163-
assertEquals( failure, await( handler.failureAsync() ) );
164-
165-
ResultSummary summary = await( handler.summaryAsync() );
166-
assertNotNull( summary );
167-
assertEquals( statement, summary.statement() );
168-
}
169-
17051
@Test
17152
void shouldDisableAutoReadWhenTooManyRecordsArrive()
17253
{

0 commit comments

Comments
 (0)