Skip to content

Commit f582364

Browse files
committed
Refactoring
1 parent 4d670f8 commit f582364

File tree

6 files changed

+31
-11
lines changed

6 files changed

+31
-11
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalPath.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public InternalPath(List<Entity> alternatingNodeAndRel) {
8989
// even index - this should be a node
9090
try {
9191
lastNode = (Node) entity;
92-
if (nodes.isEmpty() || isEndpoint(lastNode, lastRelationship)) {
92+
if (nodes.isEmpty() || (lastRelationship != null && isEndpoint(lastNode, lastRelationship))) {
9393
nodes.add(lastNode);
9494
} else {
9595
throw new IllegalArgumentException("Node argument " + index

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ private Retry exponentialBackoffRetryRx() {
186186
if (canRetryOn(error)) {
187187
var currentTime = clock.millis();
188188

189+
@SuppressWarnings("DataFlowIssue")
189190
long startTime = contextView.getOrDefault("startTime", currentTime);
191+
@SuppressWarnings("DataFlowIssue")
190192
long nextDelayMs = contextView.getOrDefault("nextDelayMs", initialRetryDelayMs);
191193

192194
var elapsedTime = currentTime - startTime;
@@ -213,6 +215,7 @@ private Retry exponentialBackoffRetryRx() {
213215
}
214216
addSuppressed(throwable, errors);
215217

218+
//noinspection DataFlowIssue
216219
return Mono.error(throwable);
217220
})));
218221
}

driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveTransactionIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.integration.reactive;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2223
import static org.junit.jupiter.api.Assertions.assertThrows;
2324

2425
import java.util.List;
@@ -46,6 +47,7 @@ void shouldPreventPullAfterTransactionTermination() {
4647
// Given
4748
var session = neo4j.driver().session(ReactiveSession.class);
4849
var tx = Mono.fromDirect(session.beginTransaction()).block();
50+
assertNotNull(tx);
4951
var streamSize = Config.defaultConfig().fetchSize() + 1;
5052
var result0 = Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
5153
.block();
@@ -58,6 +60,8 @@ void shouldPreventPullAfterTransactionTermination() {
5860
assertEquals(terminationException.code(), "Neo.ClientError.Statement.SyntaxError");
5961

6062
// Then
63+
assertNotNull(result0);
64+
assertNotNull(result1);
6165
for (var result : List.of(result0, result1)) {
6266
var exception = assertThrows(
6367
ClientException.class, () -> Flux.from(result.records()).blockFirst());
@@ -72,6 +76,7 @@ void shouldPreventDiscardAfterTransactionTermination() {
7276
// Given
7377
var session = neo4j.driver().session(ReactiveSession.class);
7478
var tx = Mono.fromDirect(session.beginTransaction()).block();
79+
assertNotNull(tx);
7580
var streamSize = Config.defaultConfig().fetchSize() + 1;
7681
var result0 = Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
7782
.block();
@@ -84,6 +89,8 @@ void shouldPreventDiscardAfterTransactionTermination() {
8489
assertEquals(terminationException.code(), "Neo.ClientError.Statement.SyntaxError");
8590

8691
// Then
92+
assertNotNull(result0);
93+
assertNotNull(result1);
8794
for (var result : List.of(result0, result1)) {
8895
var exception = assertThrows(ClientException.class, () -> Mono.fromDirect(result.consume())
8996
.block());
@@ -98,6 +105,7 @@ void shouldPreventRunAfterTransactionTermination() {
98105
// Given
99106
var session = neo4j.driver().session(ReactiveSession.class);
100107
var tx = Mono.fromDirect(session.beginTransaction()).block();
108+
assertNotNull(tx);
101109
var terminationException = assertThrows(
102110
ClientException.class, () -> Mono.fromDirect(tx.run("invalid")).block());
103111
assertEquals(terminationException.code(), "Neo.ClientError.Statement.SyntaxError");
@@ -118,6 +126,7 @@ void shouldPreventPullAfterDriverTransactionTermination() {
118126
var session = neo4j.driver().session(ReactiveSession.class);
119127
var tx = (InternalReactiveTransaction)
120128
Mono.fromDirect(session.beginTransaction()).block();
129+
assertNotNull(tx);
121130
var streamSize = Config.defaultConfig().fetchSize() + 1;
122131
var result0 = Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
123132
.block();
@@ -128,6 +137,8 @@ void shouldPreventPullAfterDriverTransactionTermination() {
128137
Mono.fromDirect(tx.terminate()).block();
129138

130139
// Then
140+
assertNotNull(result0);
141+
assertNotNull(result1);
131142
for (var result : List.of(result0, result1)) {
132143
assertThrows(TransactionTerminatedException.class, () -> Flux.from(result.records())
133144
.blockFirst());
@@ -142,6 +153,7 @@ void shouldPreventDiscardAfterDriverTransactionTermination() {
142153
var session = neo4j.driver().session(ReactiveSession.class);
143154
var tx = (InternalReactiveTransaction)
144155
Mono.fromDirect(session.beginTransaction()).block();
156+
assertNotNull(tx);
145157
var streamSize = Config.defaultConfig().fetchSize() + 1;
146158
var result0 = Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
147159
.block();
@@ -152,6 +164,8 @@ void shouldPreventDiscardAfterDriverTransactionTermination() {
152164
Mono.fromDirect(tx.terminate()).block();
153165

154166
// Then
167+
assertNotNull(result0);
168+
assertNotNull(result1);
155169
for (var result : List.of(result0, result1)) {
156170
assertThrows(TransactionTerminatedException.class, () -> Mono.fromDirect(result.consume())
157171
.block());
@@ -166,6 +180,7 @@ void shouldPreventRunAfterDriverTransactionTermination() {
166180
var session = neo4j.driver().session(ReactiveSession.class);
167181
var tx = (InternalReactiveTransaction)
168182
Mono.fromDirect(session.beginTransaction()).block();
183+
assertNotNull(tx);
169184
var streamSize = Config.defaultConfig().fetchSize() + 1;
170185
Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
171186
.block();
@@ -187,6 +202,7 @@ void shouldTerminateTransactionAndHandleFailureResponseOrPreventFurtherPulls() {
187202
var session = neo4j.driver().session(ReactiveSession.class);
188203
var tx = (InternalReactiveTransaction)
189204
Mono.fromDirect(session.beginTransaction()).block();
205+
assertNotNull(tx);
190206
var streamSize = Config.defaultConfig().fetchSize() + 1;
191207
var result = Mono.fromDirect(tx.run("UNWIND range(1, $limit) AS x RETURN x", Map.of("limit", streamSize)))
192208
.block();
@@ -195,6 +211,7 @@ void shouldTerminateTransactionAndHandleFailureResponseOrPreventFurtherPulls() {
195211
Mono.fromDirect(tx.terminate()).block();
196212

197213
// Then
214+
assertNotNull(result);
198215
assertThrows(TransactionTerminatedException.class, () -> Flux.from(result.records())
199216
.blockLast());
200217
Mono.fromDirect(tx.close()).block();

driver/src/test/java/org/neo4j/driver/integration/reactive/RxNestedQueriesIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void shouldHandleNestedQueriesInTheSameTransaction() {
158158
},
159159
RxTransaction::commit,
160160
(tx, error) -> tx.rollback(),
161-
null);
161+
RxTransaction::close);
162162

163163
StepVerifier.create(nodeIds).expectNextCount(size).verifyComplete();
164164
}

driver/src/test/java/org/neo4j/driver/integration/reactive/RxTransactionIT.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void shouldBePossibleToRunSingleQueryAndCommit() {
115115
.map(record -> record.get(0).asNode().get("id").asInt()),
116116
RxTransaction::commit,
117117
(tx, error) -> tx.rollback(),
118-
null);
118+
RxTransaction::close);
119119

120120
StepVerifier.create(ids).expectNext(42).verifyComplete();
121121
assertEquals(1, countNodes(42));
@@ -334,7 +334,7 @@ void shouldAllowRollbackAfterFailedCommit() {
334334
tx -> Flux.from(tx.run("WRONG").records()),
335335
RxTransaction::commit,
336336
(tx, error) -> tx.rollback(),
337-
null);
337+
RxTransaction::close);
338338

339339
StepVerifier.create(records)
340340
.verifyErrorSatisfies(error -> assertThat(error.getMessage(), containsString("Invalid input")));
@@ -481,7 +481,7 @@ void shouldFailForEachWhenActionFails() {
481481
}),
482482
RxTransaction::commit,
483483
(tx, error) -> tx.rollback(),
484-
null);
484+
RxTransaction::close);
485485

486486
StepVerifier.create(records)
487487
.expectErrorSatisfies(error -> assertEquals(e, error))
@@ -527,7 +527,7 @@ void shouldFailWhenListTransformationFunctionFails() {
527527
tx -> Flux.from(tx.run("RETURN 'Hi!'").records()).handle((record, sink) -> sink.error(e)),
528528
RxTransaction::commit,
529529
(tx, error) -> tx.rollback(),
530-
null);
530+
RxTransaction::close);
531531

532532
StepVerifier.create(records)
533533
.expectErrorSatisfies(error -> assertEquals(e, error))
@@ -659,7 +659,7 @@ void shouldUpdateSessionBookmarkAfterCommit() {
659659
tx -> tx.run("CREATE (:MyNode)").records(),
660660
RxTransaction::commit,
661661
(tx, error) -> tx.rollback(),
662-
null));
662+
RxTransaction::close));
663663

664664
var bookmarkAfter = session.lastBookmark();
665665

@@ -787,7 +787,7 @@ private void testForEach(String query, int expectedSeenRecords) {
787787
},
788788
RxTransaction::commit,
789789
(tx, error) -> tx.rollback(),
790-
null);
790+
RxTransaction::close);
791791

792792
StepVerifier.create(summary).expectNextCount(1).verifyComplete(); // we indeed get a summary.
793793
}
@@ -800,7 +800,7 @@ private <T> void testList(String query, List<T> expectedList) {
800800
tx -> Flux.from(tx.run(query).records()).collectList(),
801801
RxTransaction::commit,
802802
(tx, error) -> tx.rollback(),
803-
null);
803+
RxTransaction::close);
804804

805805
StepVerifier.create(records.single())
806806
.consumeNextWith(allRecords -> {
@@ -819,7 +819,7 @@ private void testConsume(String query) {
819819
tx -> tx.run(query).consume(),
820820
RxTransaction::commit,
821821
(tx, error) -> tx.rollback(),
822-
null);
822+
RxTransaction::close);
823823

824824
StepVerifier.create(summary.single())
825825
.consumeNextWith(Assertions::assertNotNull)

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public CompletionStage<Void> execute(C context) {
4747
tx -> tx.run("UNWIND [10, 5, 0] AS x RETURN 10 / x").records(),
4848
RxTransaction::commit,
4949
(tx, error) -> tx.rollback(),
50-
null)
50+
RxTransaction::close)
5151
.subscribe(
5252
record -> assertThat(
5353
record.get(0).asInt(), either(equalTo(1)).or(equalTo(2))),

0 commit comments

Comments
 (0)