Skip to content

Commit bb1ad0f

Browse files
authored
Merge pull request #546 from zhenlineo/2.0-react
Reactive implementation on the client side
2 parents 92b6746 + f9cbbbc commit bb1ad0f

File tree

138 files changed

+7308
-729
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+7308
-729
lines changed

driver/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,18 @@
2525

2626
<dependencies>
2727
<!-- Compile dependencies -->
28+
<dependency>
29+
<groupId>org.reactivestreams</groupId>
30+
<artifactId>reactive-streams</artifactId>
31+
</dependency>
2832
<dependency>
2933
<groupId>io.netty</groupId>
3034
<artifactId>netty-handler</artifactId>
3135
</dependency>
36+
<dependency>
37+
<groupId>io.projectreactor</groupId>
38+
<artifactId>reactor-core</artifactId>
39+
</dependency>
3240

3341
<!-- Optional dependencies -->
3442
<dependency>
@@ -66,6 +74,10 @@
6674
<groupId>com.fasterxml.jackson.core</groupId>
6775
<artifactId>jackson-databind</artifactId>
6876
</dependency>
77+
<dependency>
78+
<groupId>io.projectreactor</groupId>
79+
<artifactId>reactor-test</artifactId>
80+
</dependency>
6981
</dependencies>
7082

7183
<build>
@@ -203,13 +215,18 @@
203215
<artifactSet>
204216
<includes>
205217
<include>io.netty:*</include>
218+
<include>io.projectreactor:*</include>
206219
</includes>
207220
</artifactSet>
208221
<relocations>
209222
<relocation>
210223
<pattern>io.netty</pattern>
211224
<shadedPattern>org.neo4j.driver.internal.shaded.io.netty</shadedPattern>
212225
</relocation>
226+
<relocation>
227+
<pattern>io.projectreactor</pattern>
228+
<shadedPattern>org.neo4j.driver.internal.shaded.io.projectreactor</shadedPattern>
229+
</relocation>
213230
</relocations>
214231
<shadeTestJar>true</shadeTestJar>
215232
<createSourcesJar>true</createSourcesJar>

driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.neo4j.driver.v1.Values;
3434
import org.neo4j.driver.v1.types.TypeSystem;
3535

36-
abstract class AbstractStatementRunner implements StatementRunner
36+
public abstract class AbstractStatementRunner implements StatementRunner
3737
{
3838
@Override
3939
public final StatementResult run( String statementTemplate, Value parameters )
@@ -89,12 +89,12 @@ public final TypeSystem typeSystem()
8989
return InternalTypeSystem.TYPE_SYSTEM;
9090
}
9191

92-
private static Value parameters( Record record )
92+
public static Value parameters( Record record )
9393
{
9494
return record == null ? Values.EmptyMap : parameters( record.asMap() );
9595
}
9696

97-
private static Value parameters( Map<String,Object> map )
97+
public static Value parameters( Map<String,Object> map )
9898
{
9999
if ( map == null || map.isEmpty() )
100100
{

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java renamed to driver/src/main/java/org/neo4j/driver/internal/AsyncStatementResultCursor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,20 @@
2525
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
2626
import org.neo4j.driver.internal.handlers.RunResponseHandler;
2727
import org.neo4j.driver.internal.util.Futures;
28+
import org.neo4j.driver.reactive.internal.cursor.InternalStatementResultCursor;
2829
import org.neo4j.driver.v1.Record;
29-
import org.neo4j.driver.v1.StatementResultCursor;
3030
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
3131
import org.neo4j.driver.v1.summary.ResultSummary;
3232
import org.neo4j.driver.v1.util.Consumer;
3333
import org.neo4j.driver.v1.util.Function;
3434
import org.neo4j.driver.v1.util.Functions;
3535

36-
public class InternalStatementResultCursor implements StatementResultCursor
36+
public class AsyncStatementResultCursor implements InternalStatementResultCursor
3737
{
3838
private final RunResponseHandler runResponseHandler;
3939
private final PullAllResponseHandler pullAllHandler;
4040

41-
public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
41+
public AsyncStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
4242
{
4343
this.runResponseHandler = runResponseHandler;
4444
this.pullAllHandler = pullAllHandler;
@@ -118,6 +118,7 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
118118
return pullAllHandler.listAsync( mapFunction );
119119
}
120120

121+
@Override
121122
public CompletionStage<Throwable> failureAsync()
122123
{
123124
return pullAllHandler.failureAsync();

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.neo4j.driver.internal.messaging.BoltProtocol;
2727
import org.neo4j.driver.internal.spi.Connection;
2828
import org.neo4j.driver.internal.util.Futures;
29+
import org.neo4j.driver.reactive.internal.cursor.InternalStatementResultCursor;
30+
import org.neo4j.driver.reactive.internal.cursor.RxStatementResultCursor;
2931
import org.neo4j.driver.v1.Session;
3032
import org.neo4j.driver.v1.Statement;
3133
import org.neo4j.driver.v1.StatementResult;
@@ -149,7 +151,7 @@ else if ( state == State.ROLLED_BACK )
149151
{
150152
return resultCursors.retrieveNotConsumedError()
151153
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
152-
.whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) );
154+
.whenComplete( ( ignore, error ) -> transactionClosed( error == null ) );
153155
}
154156
}
155157

@@ -168,7 +170,7 @@ else if ( state == State.ROLLED_BACK )
168170
{
169171
return resultCursors.retrieveNotConsumedError()
170172
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
171-
.whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) );
173+
.whenComplete( ( ignore, error ) -> transactionClosed( false ) );
172174
}
173175
}
174176

@@ -190,7 +192,17 @@ public CompletionStage<StatementResultCursor> runAsync( Statement statement )
190192
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
191193
{
192194
ensureCanRunQueries();
193-
CompletionStage<InternalStatementResultCursor> cursorStage = protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse );
195+
CompletionStage<InternalStatementResultCursor> cursorStage =
196+
protocol.runInExplicitTransaction( connection, statement, this, waitForRunResponse ).asyncResult();
197+
resultCursors.add( cursorStage );
198+
return cursorStage;
199+
}
200+
201+
public CompletionStage<RxStatementResultCursor> runRx( Statement statement )
202+
{
203+
ensureCanRunQueries();
204+
CompletionStage<RxStatementResultCursor> cursorStage =
205+
protocol.runInExplicitTransaction( connection, statement, this, false ).rxResult();
194206
resultCursors.add( cursorStage );
195207
return cursorStage;
196208
}
@@ -265,9 +277,16 @@ private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable
265277
};
266278
}
267279

268-
private void transactionClosed( State newState )
280+
private void transactionClosed( boolean isCommitted )
269281
{
270-
state = newState;
282+
if ( isCommitted )
283+
{
284+
state = State.COMMITTED;
285+
}
286+
else
287+
{
288+
state = State.ROLLED_BACK;
289+
}
271290
connection.release(); // release in background
272291
}
273292

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
public interface FailableCursor
24+
{
25+
CompletionStage<Throwable> failureAsync();
26+
}

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.neo4j.driver.internal.metrics.MetricsProvider;
2525
import org.neo4j.driver.internal.security.SecurityPlan;
2626
import org.neo4j.driver.internal.util.Futures;
27+
import org.neo4j.driver.reactive.internal.InternalRxSession;
28+
import org.neo4j.driver.reactive.RxSession;
2729
import org.neo4j.driver.v1.AccessMode;
2830
import org.neo4j.driver.v1.Driver;
2931
import org.neo4j.driver.v1.Logger;
@@ -93,10 +95,10 @@ public Session session( AccessMode mode, Iterable<String> bookmarks )
9395
return newSession( mode, Bookmarks.from( bookmarks ) );
9496
}
9597

96-
private Session newSession( AccessMode mode, Bookmarks bookmarks )
98+
private NetworkSession newSession( AccessMode mode, Bookmarks bookmarks )
9799
{
98100
assertOpen();
99-
Session session = sessionFactory.newInstance( mode, bookmarks );
101+
NetworkSession session = sessionFactory.newInstance( mode, bookmarks );
100102
if ( closed.get() )
101103
{
102104
// session does not immediately acquire connection, it is fine to just throw
@@ -122,6 +124,18 @@ public CompletionStage<Void> closeAsync()
122124
return completedWithNull();
123125
}
124126

127+
@Override
128+
public RxSession rxSession()
129+
{
130+
return new InternalRxSession( newSession( AccessMode.WRITE, Bookmarks.empty() ) );
131+
}
132+
133+
@Override
134+
public RxSession rxSession( String bookmark )
135+
{
136+
return new InternalRxSession( newSession( AccessMode.WRITE, Bookmarks.from( bookmark ) ) );
137+
}
138+
125139
public CompletionStage<Void> verifyConnectivity()
126140
{
127141
return sessionFactory.verifyConnectivity();

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.neo4j.driver.internal.spi.Connection;
3030
import org.neo4j.driver.internal.spi.ConnectionProvider;
3131
import org.neo4j.driver.internal.util.Futures;
32+
import org.neo4j.driver.reactive.internal.cursor.InternalStatementResultCursor;
33+
import org.neo4j.driver.reactive.internal.cursor.RxStatementResultCursor;
34+
import org.neo4j.driver.reactive.internal.cursor.StatementResultCursorFactory;
3235
import org.neo4j.driver.v1.AccessMode;
3336
import org.neo4j.driver.v1.Logger;
3437
import org.neo4j.driver.v1.Logging;
@@ -58,7 +61,7 @@ public class NetworkSession extends AbstractStatementRunner implements Session,
5861
private volatile Bookmarks bookmarks = Bookmarks.empty();
5962
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedWithNull();
6063
private volatile CompletionStage<Connection> connectionStage = completedWithNull();
61-
private volatile CompletionStage<InternalStatementResultCursor> resultCursorStage = completedWithNull();
64+
private volatile CompletionStage<? extends FailableCursor> resultCursorStage = completedWithNull();
6265

6366
private final AtomicBoolean open = new AtomicBoolean( true );
6467

@@ -276,7 +279,7 @@ public void reset()
276279
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
277280
}
278281

279-
private CompletionStage<Void> resetAsync()
282+
public CompletionStage<Void> resetAsync()
280283
{
281284
return existingTransactionOrNull()
282285
.thenAccept( tx ->
@@ -439,20 +442,44 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
439442
}
440443
}
441444

442-
private CompletionStage<InternalStatementResultCursor> run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
445+
public CompletionStage<RxStatementResultCursor> runRx( Statement statement, TransactionConfig config )
443446
{
444-
ensureSessionIsOpen();
445-
446-
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
447-
.thenCompose( ignore -> acquireConnection( mode ) )
448-
.thenCompose( connection ->
449-
connection.protocol().runInAutoCommitTransaction( connection, statement, this, config, waitForRunResponse ) );
447+
CompletionStage<RxStatementResultCursor> newResultCursorStage =
448+
buildResultCursorFactory( statement, config, true ).thenCompose( StatementResultCursorFactory::rxResult );
450449

451450
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
451+
return newResultCursorStage;
452+
}
452453

454+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
455+
{
456+
CompletionStage<InternalStatementResultCursor> newResultCursorStage =
457+
buildResultCursorFactory( statement, config, waitForRunResponse ).thenCompose( StatementResultCursorFactory::asyncResult );
458+
459+
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
453460
return newResultCursorStage;
454461
}
455462

463+
private CompletionStage<StatementResultCursorFactory> buildResultCursorFactory( Statement statement, TransactionConfig config, boolean waitForRunResponse )
464+
{
465+
ensureSessionIsOpen();
466+
467+
return ensureNoOpenTxBeforeRunningQuery()
468+
.thenCompose( ignore -> acquireConnection( mode ) )
469+
.thenCompose( connection -> {
470+
try
471+
{
472+
StatementResultCursorFactory factory = connection.protocol()
473+
.runInAutoCommitTransaction( connection, statement, this, config, waitForRunResponse );
474+
return completedFuture( factory );
475+
}
476+
catch ( Throwable e )
477+
{
478+
return Futures.failedFuture( e );
479+
}
480+
} );
481+
}
482+
456483
private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
457484
{
458485
return Futures.blockingGet( beginTransactionAsync( mode, config ),
@@ -553,7 +580,7 @@ private CompletionStage<Throwable> closeTransactionAndReleaseConnection()
553580
releaseConnection().thenApply( ignore -> txCloseError ) );
554581
}
555582

556-
private CompletionStage<Void> releaseConnection()
583+
public CompletionStage<Void> releaseConnection()
557584
{
558585
return connectionStage.thenCompose( connection ->
559586
{

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
import java.util.concurrent.CompletionStage;
2222

2323
import org.neo4j.driver.v1.AccessMode;
24-
import org.neo4j.driver.v1.Session;
2524

2625
public interface SessionFactory
2726
{
28-
Session newInstance( AccessMode mode, Bookmarks bookmarks );
27+
NetworkSession newInstance( AccessMode mode, Bookmarks bookmarks );
2928

3029
CompletionStage<Void> verifyConnectivity();
3130

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.neo4j.driver.v1.AccessMode;
2626
import org.neo4j.driver.v1.Config;
2727
import org.neo4j.driver.v1.Logging;
28-
import org.neo4j.driver.v1.Session;
2928

3029
public class SessionFactoryImpl implements SessionFactory
3130
{
@@ -43,7 +42,7 @@ public class SessionFactoryImpl implements SessionFactory
4342
}
4443

4544
@Override
46-
public Session newInstance( AccessMode mode, Bookmarks bookmarks )
45+
public NetworkSession newInstance( AccessMode mode, Bookmarks bookmarks )
4746
{
4847
NetworkSession session = createSession( connectionProvider, retryLogic, mode, logging );
4948
session.setBookmarks( bookmarks );

driver/src/main/java/org/neo4j/driver/internal/async/AccessModeConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,10 @@ public AccessMode mode()
127127
{
128128
return mode;
129129
}
130+
131+
@Override
132+
public void flush()
133+
{
134+
delegate.flush();
135+
}
130136
}

driver/src/main/java/org/neo4j/driver/internal/async/BoltProtocolUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.driver.internal.messaging.v1.BoltProtocolV1;
2424
import org.neo4j.driver.internal.messaging.v2.BoltProtocolV2;
2525
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
26+
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
2627

2728
import static io.netty.buffer.Unpooled.copyInt;
2829
import static io.netty.buffer.Unpooled.unreleasableBuffer;
@@ -41,10 +42,10 @@ public final class BoltProtocolUtil
4142

4243
private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt(
4344
BOLT_MAGIC_PREAMBLE,
45+
BoltProtocolV4.VERSION,
4446
BoltProtocolV3.VERSION,
4547
BoltProtocolV2.VERSION,
46-
BoltProtocolV1.VERSION,
47-
NO_PROTOCOL_VERSION ) ).asReadOnly();
48+
BoltProtocolV1.VERSION ) ).asReadOnly();
4849

4950
private static final String HANDSHAKE_STRING = createHandshakeString();
5051

0 commit comments

Comments
 (0)