27
27
import org .neo4j .driver .internal .InternalRecord ;
28
28
import org .neo4j .driver .internal .spi .Connection ;
29
29
import org .neo4j .driver .internal .spi .ResponseHandler ;
30
+ import org .neo4j .driver .internal .util .Futures ;
30
31
import org .neo4j .driver .internal .util .MetadataUtil ;
31
32
import org .neo4j .driver .v1 .Record ;
32
33
import org .neo4j .driver .v1 .Statement ;
@@ -55,7 +56,6 @@ public abstract class PullAllResponseHandler implements ResponseHandler
55
56
private ResultSummary summary ;
56
57
57
58
private CompletableFuture <Record > recordFuture ;
58
- private CompletableFuture <ResultSummary > summaryFuture ;
59
59
private CompletableFuture <Throwable > failureFuture ;
60
60
61
61
public PullAllResponseHandler ( Statement statement , RunResponseHandler runResponseHandler , Connection connection )
@@ -74,7 +74,6 @@ public synchronized void onSuccess( Map<String,Value> metadata )
74
74
afterSuccess ();
75
75
76
76
completeRecordFuture ( null );
77
- completeSummaryFuture ( summary );
78
77
completeFailureFuture ( null );
79
78
}
80
79
@@ -91,26 +90,16 @@ public synchronized void onFailure( Throwable error )
91
90
boolean failedRecordFuture = failRecordFuture ( error );
92
91
if ( failedRecordFuture )
93
92
{
94
- // error propagated through record future, complete other two
95
- completeSummaryFuture ( summary );
93
+ // error propagated through the record future
96
94
completeFailureFuture ( null );
97
95
}
98
96
else
99
97
{
100
- boolean failedSummaryFuture = failSummaryFuture ( error );
101
- if ( failedSummaryFuture )
98
+ boolean completedFailureFuture = completeFailureFuture ( error );
99
+ if ( ! completedFailureFuture )
102
100
{
103
- // error propagated through summary future, complete other one
104
- completeFailureFuture ( null );
105
- }
106
- else
107
- {
108
- boolean completedFailureFuture = completeFailureFuture ( error );
109
- if ( !completedFailureFuture )
110
- {
111
- // error has not been propagated to the user, remember it
112
- failure = error ;
113
- }
101
+ // error has not been propagated to the user, remember it
102
+ failure = error ;
114
103
}
115
104
}
116
105
}
@@ -121,7 +110,7 @@ public synchronized void onFailure( Throwable error )
121
110
public synchronized void onRecord ( Value [] fields )
122
111
{
123
112
Record record = new InternalRecord ( runResponseHandler .statementKeys (), fields );
124
- queueRecord ( record );
113
+ enqueueRecord ( record );
125
114
completeRecordFuture ( record );
126
115
}
127
116
@@ -159,26 +148,14 @@ public synchronized CompletionStage<Record> nextAsync()
159
148
160
149
public synchronized CompletionStage <ResultSummary > summaryAsync ()
161
150
{
162
- if ( failure != null )
151
+ return failureAsync (). thenApply ( error ->
163
152
{
164
- return failedFuture ( extractFailure () );
165
- }
166
- else if ( summary != null )
167
- {
168
- return completedFuture ( summary );
169
- }
170
- else
171
- {
172
- if ( summaryFuture == null )
153
+ if ( error != null )
173
154
{
174
- // neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
175
- // future will be completed with summary on SUCCESS and completed exceptionally on FAILURE
176
- // enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
177
- connection .enableAutoRead ();
178
- summaryFuture = new CompletableFuture <>();
155
+ throw Futures .asCompletionException ( error );
179
156
}
180
- return summaryFuture ;
181
- }
157
+ return summary ;
158
+ } );
182
159
}
183
160
184
161
public synchronized CompletionStage <Throwable > failureAsync ()
@@ -205,14 +182,14 @@ else if ( finished )
205
182
}
206
183
}
207
184
208
- private void queueRecord ( Record record )
185
+ private void enqueueRecord ( Record record )
209
186
{
210
187
records .add ( record );
211
188
212
- boolean shouldBufferAllRecords = summaryFuture != null || failureFuture != null ;
213
- // when summary or failure is requested we have to buffer all remaining records and then return summary/failure
189
+ boolean shouldBufferAllRecords = failureFuture != null ;
190
+ // when failure is requested we have to buffer all remaining records and then return the error
214
191
// do not disable auto-read in this case, otherwise records will not be consumed and trailing
215
- // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for summary/failure
192
+ // SUCCESS or FAILURE message will not arrive as well, so callers will get stuck waiting for the error
216
193
if ( !shouldBufferAllRecords && records .size () > RECORD_BUFFER_HIGH_WATERMARK )
217
194
{
218
195
// more than high watermark records are already queued, tell connection to stop auto-reading from network
@@ -270,28 +247,6 @@ private boolean failRecordFuture( Throwable error )
270
247
return false ;
271
248
}
272
249
273
- private void completeSummaryFuture ( ResultSummary summary )
274
- {
275
- if ( summaryFuture != null )
276
- {
277
- CompletableFuture <ResultSummary > future = summaryFuture ;
278
- summaryFuture = null ;
279
- future .complete ( summary );
280
- }
281
- }
282
-
283
- private boolean failSummaryFuture ( Throwable error )
284
- {
285
- if ( summaryFuture != null )
286
- {
287
- CompletableFuture <ResultSummary > future = summaryFuture ;
288
- summaryFuture = null ;
289
- future .completeExceptionally ( error );
290
- return true ;
291
- }
292
- return false ;
293
- }
294
-
295
250
private boolean completeFailureFuture ( Throwable error )
296
251
{
297
252
if ( failureFuture != null )
0 commit comments