21
21
import static org .neo4j .driver .internal .util .Futures .completionExceptionCause ;
22
22
23
23
import java .time .Duration ;
24
- import java .util .ArrayList ;
25
24
import java .util .Collections ;
26
25
import java .util .HashSet ;
27
- import java .util .List ;
28
26
import java .util .Map ;
29
27
import java .util .Objects ;
30
28
import java .util .Set ;
46
44
import org .neo4j .driver .Logger ;
47
45
import org .neo4j .driver .Logging ;
48
46
import org .neo4j .driver .Query ;
49
- import org .neo4j .driver .Record ;
50
47
import org .neo4j .driver .TransactionConfig ;
51
48
import org .neo4j .driver .Value ;
52
49
import org .neo4j .driver .Values ;
58
55
import org .neo4j .driver .exceptions .UnsupportedFeatureException ;
59
56
import org .neo4j .driver .internal .DatabaseBookmark ;
60
57
import org .neo4j .driver .internal .FailableCursor ;
61
- import org .neo4j .driver .internal .InternalRecord ;
62
58
import org .neo4j .driver .internal .NotificationConfigMapper ;
63
59
import org .neo4j .driver .internal .bolt .api .AuthData ;
64
60
import org .neo4j .driver .internal .bolt .api .BoltConnection ;
73
69
import org .neo4j .driver .internal .bolt .api .TelemetryApi ;
74
70
import org .neo4j .driver .internal .bolt .api .TransactionType ;
75
71
import org .neo4j .driver .internal .bolt .api .exception .MinVersionAcquisitionException ;
76
- import org .neo4j .driver .internal .bolt .api .summary .PullSummary ;
77
72
import org .neo4j .driver .internal .bolt .api .summary .RunSummary ;
78
73
import org .neo4j .driver .internal .cursor .DisposableResultCursorImpl ;
79
74
import org .neo4j .driver .internal .cursor .ResultCursorImpl ;
@@ -150,13 +145,14 @@ public NetworkSession(
150
145
151
146
public CompletionStage <ResultCursor > runAsync (Query query , TransactionConfig config ) {
152
147
ensureSessionIsOpen ();
153
- return ensureNoOpenTxBeforeRunningQuery ()
148
+ var disposable = ensureNoOpenTxBeforeRunningQuery ()
154
149
.thenCompose (ignore -> acquireConnection (mode ))
155
150
.thenCompose (connection -> {
156
151
var parameters = query .parameters ().asMap (Values ::value );
157
152
var apiTelemetryWork = new ApiTelemetryWork (TelemetryApi .AUTO_COMMIT_TRANSACTION );
158
153
apiTelemetryWork .setEnabled (!telemetryDisabled );
159
- var responseHandler = new RunResponseHandler (connection , query , fetchSize , this ::handleNewBookmark );
154
+ var resultCursor = new ResultCursorImpl (
155
+ connection , query , fetchSize , null , this ::handleNewBookmark , true , () -> null , null , null );
160
156
var cursorStage = apiTelemetryWork
161
157
.pipelineTelemetryIfEnabled (connection )
162
158
.thenCompose (conn -> conn .runInAutoCommitTransaction (
@@ -172,31 +168,31 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
172
168
config .metadata (),
173
169
notificationConfig ))
174
170
.thenCompose (conn -> conn .pull (-1 , fetchSize ))
175
- .thenCompose (conn -> conn .flush (responseHandler ))
176
- .thenCompose (ignored -> responseHandler . cursorFuture )
177
- .handle ((resultCursor , throwable ) -> {
171
+ .thenCompose (conn -> conn .flush (resultCursor ))
172
+ .thenCompose (ignored -> resultCursor . resultCursor () )
173
+ .handle ((resultCursorImpl , throwable ) -> {
178
174
var error = completionExceptionCause (throwable );
179
175
if (error != null ) {
180
- return connection
181
- .close ()
182
- .<DisposableResultCursorImpl >handle ((ignored , closeError ) -> {
183
- if (closeError != null ) {
184
- error .addSuppressed (closeError );
185
- }
186
- if (error instanceof RuntimeException runtimeException ) {
187
- throw runtimeException ;
188
- } else {
189
- throw new CompletionException (error );
190
- }
191
- });
176
+ return connection .close ().<ResultCursorImpl >handle ((ignored , closeError ) -> {
177
+ if (closeError != null ) {
178
+ error .addSuppressed (closeError );
179
+ }
180
+ if (error instanceof RuntimeException runtimeException ) {
181
+ throw runtimeException ;
182
+ } else {
183
+ throw new CompletionException (error );
184
+ }
185
+ });
192
186
} else {
193
- return CompletableFuture .completedStage (resultCursor );
187
+ return CompletableFuture .completedStage (resultCursorImpl );
194
188
}
195
189
})
196
- .thenCompose (Function .identity ());
197
- resultCursorStage = cursorStage . exceptionally ( error -> null );
190
+ .thenCompose (Function .identity ())
191
+ . thenApply ( DisposableResultCursorImpl :: new );
198
192
return cursorStage .thenApply (Function .identity ());
199
193
});
194
+ resultCursorStage = disposable .exceptionally (error -> null );
195
+ return disposable .thenApply (Function .identity ());
200
196
}
201
197
202
198
public CompletionStage <RxResultCursor > runRx (
@@ -789,97 +785,6 @@ public AuthToken overrideAuthToken() {
789
785
}
790
786
}
791
787
792
- public static class RunResponseHandler implements ResponseHandler {
793
- final CompletableFuture <DisposableResultCursorImpl > cursorFuture = new CompletableFuture <>();
794
- private final BoltConnection connection ;
795
- private final Query query ;
796
- private final long fetchSize ;
797
- private final Consumer <DatabaseBookmark > bookmarkConsumer ;
798
- private RunSummary runSummary ;
799
- private final List <Record > records = new ArrayList <>();
800
- private PullSummary pullSummary ;
801
- private Throwable error ;
802
- private int ignoredCount ;
803
-
804
- public RunResponseHandler (
805
- BoltConnection connection , Query query , long fetchSize , Consumer <DatabaseBookmark > bookmarkConsumer ) {
806
- this .connection = connection ;
807
- this .query = query ;
808
- this .fetchSize = fetchSize ;
809
- this .bookmarkConsumer = bookmarkConsumer ;
810
- }
811
-
812
- @ SuppressWarnings ("DuplicatedCode" )
813
- @ Override
814
- public void onError (Throwable throwable ) {
815
- if (throwable instanceof CompletionException ) {
816
- throwable = throwable .getCause ();
817
- }
818
- if (error == null ) {
819
- error = throwable ;
820
- } else {
821
- if (error instanceof Neo4jException && !(throwable instanceof Neo4jException )) {
822
- // higher order error has occurred
823
- throwable .addSuppressed (error );
824
- error = throwable ;
825
- } else {
826
- error .addSuppressed (throwable );
827
- }
828
- }
829
- }
830
-
831
- @ Override
832
- public void onRunSummary (RunSummary summary ) {
833
- runSummary = summary ;
834
- }
835
-
836
- @ Override
837
- public void onRecord (Value [] fields ) {
838
- records .add (new InternalRecord (runSummary .keys (), fields ));
839
- }
840
-
841
- @ Override
842
- public void onPullSummary (PullSummary summary ) {
843
- pullSummary = summary ;
844
- }
845
-
846
- @ Override
847
- public void onIgnored () {
848
- ignoredCount ++;
849
- }
850
-
851
- @ Override
852
- public void onComplete () {
853
- if (runSummary != null ) {
854
- if (error == null && ignoredCount > 0 ) {
855
- cursorFuture .completeExceptionally (new ClientException ("Run exchange contains ignored messages." ));
856
- } else {
857
- cursorFuture .complete (new DisposableResultCursorImpl (new ResultCursorImpl (
858
- connection ,
859
- query ,
860
- fetchSize ,
861
- null ,
862
- bookmarkConsumer ,
863
- true ,
864
- runSummary ,
865
- () -> null ,
866
- records ,
867
- pullSummary ,
868
- null ,
869
- error )));
870
- }
871
- } else {
872
- if (error != null ) {
873
- cursorFuture .completeExceptionally (error );
874
- } else if (ignoredCount > 0 ) {
875
- cursorFuture .completeExceptionally (new ClientException ("Run exchange contains ignored messages." ));
876
- } else {
877
- cursorFuture .completeExceptionally (new ClientException ("Unexpected state during session run." ));
878
- }
879
- }
880
- }
881
- }
882
-
883
788
public static class RunRxResponseHandler implements ResponseHandler {
884
789
final CompletableFuture <RxResultCursor > cursorFuture = new CompletableFuture <>();
885
790
private final BoltConnection connection ;
0 commit comments