Skip to content

Commit 7c2baed

Browse files
author
Zhen Li
committed
Changed session.run not discard previous run result, but buffer all into memory.
This enables nested session runs.
1 parent 91e0bb4 commit 7c2baed

File tree

11 files changed

+147
-127
lines changed

11 files changed

+147
-127
lines changed

driver/src/main/java/org/neo4j/driver/StatementResult.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ public interface StatementResult extends Iterator<Record>
143143
*
144144
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
145145
*
146-
* If you want to obtain the summary without discard the records, invoke
147-
* {@link StatementResult#list()} before calling this method to buffer all records into memory.
146+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
148147
*
149148
* @return a summary for the whole query result.
150149
*/

driver/src/main/java/org/neo4j/driver/async/StatementResultCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.neo4j.driver.Record;
2929
import org.neo4j.driver.Records;
30+
import org.neo4j.driver.StatementResult;
3031
import org.neo4j.driver.exceptions.NoSuchRecordException;
3132
import org.neo4j.driver.summary.ResultSummary;
3233

@@ -74,8 +75,7 @@ public interface StatementResultCursor
7475
* <p>
7576
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
7677
* <p>
77-
* If you want to obtain the summary without discarding the records, invoke {@link #listAsync()}
78-
* to buffer records into memory before calling this method.
78+
* If you want to access unconsumed records after summary, you shall use {@link StatementResult#list()} to buffer all records into memory before summary.
7979
*
8080
* @return a {@link CompletionStage} completed with a summary for the whole query result. Stage can also be
8181
* completed exceptionally if query execution fails.

driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@
2222

2323
public interface FailableCursor
2424
{
25+
CompletionStage<Throwable> consumeAsync();
2526
CompletionStage<Throwable> failureAsync();
2627
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public CompletionStage<Void> closeAsync()
194194
if ( cursor != null )
195195
{
196196
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
197-
return cursor.failureAsync();
197+
return cursor.consumeAsync();
198198
}
199199
// no result cursor exists so no error exists
200200
return completedWithNull();

driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ private static CompletionStage<Throwable> retrieveFailure( CompletionStage<? ext
7474
{
7575
return cursorStage
7676
.exceptionally( cursor -> null )
77-
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.failureAsync() );
77+
.thenCompose( cursor -> cursor == null ? completedWithNull() : cursor.consumeAsync() );
7878
}
7979
}

driver/src/main/java/org/neo4j/driver/internal/cursor/AsyncStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,18 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
111111
}
112112

113113
@Override
114-
public CompletionStage<Throwable> failureAsync()
114+
public CompletionStage<Throwable> consumeAsync()
115115
{
116116
return pullAllHandler.summaryAsync().handle( ( summary, error ) -> error );
117117
}
118118

119+
@Override
120+
public CompletionStage<Throwable> failureAsync()
121+
{
122+
return pullAllHandler.failureAsync();
123+
}
124+
125+
119126
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
120127
{
121128
CompletionStage<Record> recordFuture = nextAsync();

driver/src/main/java/org/neo4j/driver/internal/cursor/RxStatementResultCursorImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,19 @@ public void cancel()
9191
}
9292

9393
@Override
94-
public CompletionStage<Throwable> failureAsync()
94+
public CompletionStage<Throwable> consumeAsync()
9595
{
9696
// calling this method will enforce discarding record stream and finish running cypher query
9797
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
9898
}
9999

100+
@Override
101+
public CompletionStage<Throwable> failureAsync()
102+
{
103+
// It is safe to discard records as either the streaming has not started at all, or the streaming is fully finished.
104+
return consumeAsync();
105+
}
106+
100107
@Override
101108
public CompletionStage<ResultSummary> summaryAsync()
102109
{

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,7 @@ public interface PullAllResponseHandler extends ResponseHandler
3636

3737
<T> CompletionStage<List<T>> listAsync( Function<Record, T> mapFunction );
3838

39+
CompletionStage<Throwable> failureAsync();
40+
3941
void prePopulateRecords();
4042
}

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ public synchronized <T> CompletionStage<List<T>> listAsync( Function<Record,T> m
176176
return pullAllAsync().thenApply( summary -> recordsAsList( mapFunction ) );
177177
}
178178

179+
@Override
180+
public synchronized CompletionStage<Throwable> failureAsync()
181+
{
182+
return pullAllAsync().handle( ( ignore, error ) -> error );
183+
}
184+
179185
@Override
180186
public void prePopulateRecords()
181187
{

driver/src/test/java/org/neo4j/driver/internal/handlers/LegacyPullAllResponseHandlerTest.java

Lines changed: 0 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020

2121
import org.junit.jupiter.api.Test;
2222

23-
import java.io.IOException;
2423
import java.util.List;
2524
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.CompletionStage;
2725
import java.util.function.Function;
2826

2927
import org.neo4j.driver.Record;
3028
import org.neo4j.driver.Statement;
31-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
32-
import org.neo4j.driver.exceptions.SessionExpiredException;
3329
import org.neo4j.driver.internal.spi.Connection;
3430
import org.neo4j.driver.summary.ResultSummary;
3531

@@ -52,121 +48,6 @@
5248

5349
class LegacyPullAllResponseHandlerTest extends PullAllResponseHandlerTestBase<LegacyPullAllResponseHandler>
5450
{
55-
@Test
56-
void shouldReturnNoFailureWhenAlreadySucceeded()
57-
{
58-
LegacyPullAllResponseHandler handler = newHandler();
59-
handler.onSuccess( emptyMap() );
60-
61-
Throwable failure = await( handler.failureAsync() );
62-
63-
assertNull( failure );
64-
}
65-
66-
@Test
67-
void shouldReturnNoFailureWhenSucceededAfterFailureRequested()
68-
{
69-
LegacyPullAllResponseHandler handler = newHandler();
70-
71-
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
72-
assertFalse( failureFuture.isDone() );
73-
74-
handler.onSuccess( emptyMap() );
75-
76-
assertTrue( failureFuture.isDone() );
77-
assertNull( await( failureFuture ) );
78-
}
79-
80-
@Test
81-
void shouldReturnFailureWhenAlreadyFailed()
82-
{
83-
LegacyPullAllResponseHandler handler = newHandler();
84-
85-
RuntimeException failure = new RuntimeException( "Ops" );
86-
handler.onFailure( failure );
87-
88-
Throwable receivedFailure = await( handler.failureAsync() );
89-
assertEquals( failure, receivedFailure );
90-
}
91-
92-
@Test
93-
void shouldReturnFailureWhenFailedAfterFailureRequested()
94-
{
95-
LegacyPullAllResponseHandler handler = newHandler();
96-
97-
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
98-
assertFalse( failureFuture.isDone() );
99-
100-
IOException failure = new IOException( "Broken pipe" );
101-
handler.onFailure( failure );
102-
103-
assertTrue( failureFuture.isDone() );
104-
assertEquals( failure, await( failureFuture ) );
105-
}
106-
107-
@Test
108-
void shouldReturnFailureWhenRequestedMultipleTimes()
109-
{
110-
LegacyPullAllResponseHandler handler = newHandler();
111-
112-
CompletableFuture<Throwable> failureFuture1 = handler.failureAsync().toCompletableFuture();
113-
CompletableFuture<Throwable> failureFuture2 = handler.failureAsync().toCompletableFuture();
114-
115-
assertFalse( failureFuture1.isDone() );
116-
assertFalse( failureFuture2.isDone() );
117-
118-
RuntimeException failure = new RuntimeException( "Unable to contact database" );
119-
handler.onFailure( failure );
120-
121-
assertTrue( failureFuture1.isDone() );
122-
assertTrue( failureFuture2.isDone() );
123-
124-
assertEquals( failure, await( failureFuture1 ) );
125-
assertEquals( failure, await( failureFuture2 ) );
126-
}
127-
128-
@Test
129-
void shouldReturnFailureOnlyOnceWhenFailedBeforeFailureRequested()
130-
{
131-
LegacyPullAllResponseHandler handler = newHandler();
132-
133-
ServiceUnavailableException failure = new ServiceUnavailableException( "Connection terminated" );
134-
handler.onFailure( failure );
135-
136-
assertEquals( failure, await( handler.failureAsync() ) );
137-
assertNull( await( handler.failureAsync() ) );
138-
}
139-
140-
@Test
141-
void shouldReturnFailureOnlyOnceWhenFailedAfterFailureRequested()
142-
{
143-
LegacyPullAllResponseHandler handler = newHandler();
144-
145-
CompletionStage<Throwable> failureFuture = handler.failureAsync();
146-
147-
SessionExpiredException failure = new SessionExpiredException( "Network unreachable" );
148-
handler.onFailure( failure );
149-
assertEquals( failure, await( failureFuture ) );
150-
151-
assertNull( await( handler.failureAsync() ) );
152-
}
153-
154-
@Test
155-
void shouldReturnSummaryWhenAlreadyFailedAndFailureConsumed()
156-
{
157-
Statement statement = new Statement( "CREATE ()" );
158-
LegacyPullAllResponseHandler handler = newHandler( statement );
159-
160-
ServiceUnavailableException failure = new ServiceUnavailableException( "Neo4j unreachable" );
161-
handler.onFailure( failure );
162-
163-
assertEquals( failure, await( handler.failureAsync() ) );
164-
165-
ResultSummary summary = await( handler.summaryAsync() );
166-
assertNotNull( summary );
167-
assertEquals( statement, summary.statement() );
168-
}
169-
17051
@Test
17152
void shouldDisableAutoReadWhenTooManyRecordsArrive()
17253
{

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTestBase.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.neo4j.driver.Record;
3131
import org.neo4j.driver.Statement;
3232
import org.neo4j.driver.Value;
33+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
34+
import org.neo4j.driver.exceptions.SessionExpiredException;
3335
import org.neo4j.driver.internal.BoltServerAddress;
3436
import org.neo4j.driver.internal.InternalRecord;
3537
import org.neo4j.driver.internal.spi.Connection;
@@ -56,6 +58,121 @@
5658

5759
public abstract class PullAllResponseHandlerTestBase<T extends PullAllResponseHandler>
5860
{
61+
@Test
62+
void shouldReturnNoFailureWhenAlreadySucceeded()
63+
{
64+
PullAllResponseHandler handler = newHandler();
65+
handler.onSuccess( emptyMap() );
66+
67+
Throwable failure = await( handler.failureAsync() );
68+
69+
assertNull( failure );
70+
}
71+
72+
@Test
73+
void shouldReturnNoFailureWhenSucceededAfterFailureRequested()
74+
{
75+
PullAllResponseHandler handler = newHandler();
76+
77+
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
78+
assertFalse( failureFuture.isDone() );
79+
80+
handler.onSuccess( emptyMap() );
81+
82+
assertTrue( failureFuture.isDone() );
83+
assertNull( await( failureFuture ) );
84+
}
85+
86+
@Test
87+
void shouldReturnFailureWhenAlreadyFailed()
88+
{
89+
PullAllResponseHandler handler = newHandler();
90+
91+
RuntimeException failure = new RuntimeException( "Ops" );
92+
handler.onFailure( failure );
93+
94+
Throwable receivedFailure = await( handler.failureAsync() );
95+
assertEquals( failure, receivedFailure );
96+
}
97+
98+
@Test
99+
void shouldReturnFailureWhenFailedAfterFailureRequested()
100+
{
101+
PullAllResponseHandler handler = newHandler();
102+
103+
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
104+
assertFalse( failureFuture.isDone() );
105+
106+
IOException failure = new IOException( "Broken pipe" );
107+
handler.onFailure( failure );
108+
109+
assertTrue( failureFuture.isDone() );
110+
assertEquals( failure, await( failureFuture ) );
111+
}
112+
113+
@Test
114+
void shouldReturnFailureWhenRequestedMultipleTimes()
115+
{
116+
PullAllResponseHandler handler = newHandler();
117+
118+
CompletableFuture<Throwable> failureFuture1 = handler.failureAsync().toCompletableFuture();
119+
CompletableFuture<Throwable> failureFuture2 = handler.failureAsync().toCompletableFuture();
120+
121+
assertFalse( failureFuture1.isDone() );
122+
assertFalse( failureFuture2.isDone() );
123+
124+
RuntimeException failure = new RuntimeException( "Unable to contact database" );
125+
handler.onFailure( failure );
126+
127+
assertTrue( failureFuture1.isDone() );
128+
assertTrue( failureFuture2.isDone() );
129+
130+
assertEquals( failure, await( failureFuture1 ) );
131+
assertEquals( failure, await( failureFuture2 ) );
132+
}
133+
134+
@Test
135+
void shouldReturnFailureOnlyOnceWhenFailedBeforeFailureRequested()
136+
{
137+
PullAllResponseHandler handler = newHandler();
138+
139+
ServiceUnavailableException failure = new ServiceUnavailableException( "Connection terminated" );
140+
handler.onFailure( failure );
141+
142+
assertEquals( failure, await( handler.failureAsync() ) );
143+
assertNull( await( handler.failureAsync() ) );
144+
}
145+
146+
@Test
147+
void shouldReturnFailureOnlyOnceWhenFailedAfterFailureRequested()
148+
{
149+
PullAllResponseHandler handler = newHandler();
150+
151+
CompletionStage<Throwable> failureFuture = handler.failureAsync();
152+
153+
SessionExpiredException failure = new SessionExpiredException( "Network unreachable" );
154+
handler.onFailure( failure );
155+
assertEquals( failure, await( failureFuture ) );
156+
157+
assertNull( await( handler.failureAsync() ) );
158+
}
159+
160+
@Test
161+
void shouldReturnSummaryWhenAlreadyFailedAndFailureConsumed()
162+
{
163+
Statement statement = new Statement( "CREATE ()" );
164+
PullAllResponseHandler handler = newHandler( statement );
165+
166+
ServiceUnavailableException failure = new ServiceUnavailableException( "Neo4j unreachable" );
167+
handler.onFailure( failure );
168+
169+
assertEquals( failure, await( handler.failureAsync() ) );
170+
171+
ResultSummary summary = await( handler.summaryAsync() );
172+
assertNotNull( summary );
173+
assertEquals( statement, summary.statement() );
174+
}
175+
59176
@Test
60177
void shouldReturnSummaryWhenAlreadySucceeded()
61178
{

0 commit comments

Comments
 (0)