Skip to content

Commit 742d280

Browse files
authored
Merge pull request #567 from ali-ince/1.7-pass-access-mode
Pass AccessMode in BEGIN and RUN messages
2 parents 49ef4db + 227c0fc commit 742d280

26 files changed

+769
-84
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222

23+
import org.neo4j.driver.internal.async.AccessModeConnection;
2324
import org.neo4j.driver.internal.spi.Connection;
2425
import org.neo4j.driver.internal.spi.ConnectionPool;
2526
import org.neo4j.driver.internal.spi.ConnectionProvider;
@@ -45,7 +46,7 @@ public class DirectConnectionProvider implements ConnectionProvider
4546
@Override
4647
public CompletionStage<Connection> acquireConnection( AccessMode mode )
4748
{
48-
return connectionPool.acquire( address );
49+
return connectionPool.acquire( address ).thenApply( connection -> new AccessModeConnection( connection, mode ) );
4950
}
5051

5152
@Override
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.internal.BoltServerAddress;
24+
import org.neo4j.driver.internal.messaging.BoltProtocol;
25+
import org.neo4j.driver.internal.messaging.Message;
26+
import org.neo4j.driver.internal.spi.Connection;
27+
import org.neo4j.driver.internal.spi.ResponseHandler;
28+
import org.neo4j.driver.internal.util.ServerVersion;
29+
import org.neo4j.driver.v1.AccessMode;
30+
31+
public class AccessModeConnection implements Connection
32+
{
33+
private final Connection delegate;
34+
private final AccessMode mode;
35+
36+
public AccessModeConnection( Connection delegate, AccessMode mode )
37+
{
38+
this.delegate = delegate;
39+
this.mode = mode;
40+
}
41+
42+
public Connection connection()
43+
{
44+
return delegate;
45+
}
46+
47+
@Override
48+
public boolean isOpen()
49+
{
50+
return delegate.isOpen();
51+
}
52+
53+
@Override
54+
public void enableAutoRead()
55+
{
56+
delegate.enableAutoRead();
57+
}
58+
59+
@Override
60+
public void disableAutoRead()
61+
{
62+
delegate.disableAutoRead();
63+
}
64+
65+
@Override
66+
public void write( Message message, ResponseHandler handler )
67+
{
68+
delegate.write( message, handler );
69+
}
70+
71+
@Override
72+
public void write( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 )
73+
{
74+
delegate.write( message1, handler1, message2, handler2 );
75+
}
76+
77+
@Override
78+
public void writeAndFlush( Message message, ResponseHandler handler )
79+
{
80+
delegate.writeAndFlush( message, handler );
81+
}
82+
83+
@Override
84+
public void writeAndFlush( Message message1, ResponseHandler handler1, Message message2, ResponseHandler handler2 )
85+
{
86+
delegate.writeAndFlush( message1, handler1, message2, handler2 );
87+
}
88+
89+
@Override
90+
public CompletionStage<Void> reset()
91+
{
92+
return delegate.reset();
93+
}
94+
95+
@Override
96+
public CompletionStage<Void> release()
97+
{
98+
return delegate.release();
99+
}
100+
101+
@Override
102+
public void terminateAndRelease( String reason )
103+
{
104+
delegate.terminateAndRelease( reason );
105+
}
106+
107+
@Override
108+
public BoltServerAddress serverAddress()
109+
{
110+
return delegate.serverAddress();
111+
}
112+
113+
@Override
114+
public ServerVersion serverVersion()
115+
{
116+
return delegate.serverVersion();
117+
}
118+
119+
@Override
120+
public BoltProtocol protocol()
121+
{
122+
return delegate.protocol();
123+
}
124+
125+
@Override
126+
public AccessMode mode()
127+
{
128+
return mode;
129+
}
130+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.neo4j.driver.internal.BoltServerAddress;
2727
import org.neo4j.driver.internal.RoutingErrorHandler;
28+
import org.neo4j.driver.internal.async.AccessModeConnection;
2829
import org.neo4j.driver.internal.async.RoutingConnection;
2930
import org.neo4j.driver.internal.cluster.AddressSet;
3031
import org.neo4j.driver.internal.cluster.ClusterComposition;
@@ -95,7 +96,8 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
9596
{
9697
return freshRoutingTable( mode )
9798
.thenCompose( routingTable -> acquire( mode, routingTable ) )
98-
.thenApply( connection -> new RoutingConnection( connection, mode, this ) );
99+
.thenApply( connection -> new RoutingConnection( connection, mode, this ) )
100+
.thenApply( connection -> new AccessModeConnection( connection, mode ) );
99101
}
100102

101103
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,22 @@
2323
import java.util.Objects;
2424

2525
import org.neo4j.driver.internal.Bookmarks;
26+
import org.neo4j.driver.v1.AccessMode;
2627
import org.neo4j.driver.v1.TransactionConfig;
2728
import org.neo4j.driver.v1.Value;
2829

2930
public class BeginMessage extends TransactionStartingMessage
3031
{
3132
public static final byte SIGNATURE = 0x11;
3233

33-
public BeginMessage( Bookmarks bookmarks, TransactionConfig config )
34+
public BeginMessage( Bookmarks bookmarks, TransactionConfig config, AccessMode mode )
3435
{
35-
this( bookmarks, config.timeout(), config.metadata() );
36+
this( bookmarks, config.timeout(), config.metadata(), mode );
3637
}
3738

38-
public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
39+
public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata, AccessMode mode )
3940
{
40-
super( bookmarks, txTimeout, txMetadata );
41+
super( bookmarks, txTimeout, txMetadata, mode );
4142
}
4243

4344
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Objects;
2424

2525
import org.neo4j.driver.internal.Bookmarks;
26+
import org.neo4j.driver.v1.AccessMode;
2627
import org.neo4j.driver.v1.TransactionConfig;
2728
import org.neo4j.driver.v1.Value;
2829

@@ -33,14 +34,15 @@ public class RunWithMetadataMessage extends TransactionStartingMessage
3334
private final String statement;
3435
private final Map<String,Value> parameters;
3536

36-
public RunWithMetadataMessage( String statement, Map<String,Value> parameters, Bookmarks bookmarks, TransactionConfig config )
37+
public RunWithMetadataMessage( String statement, Map<String,Value> parameters, Bookmarks bookmarks, TransactionConfig config, AccessMode mode )
3738
{
38-
this( statement, parameters, bookmarks, config.timeout(), config.metadata() );
39+
this( statement, parameters, bookmarks, config.timeout(), config.metadata(), mode );
3940
}
4041

41-
public RunWithMetadataMessage( String statement, Map<String,Value> parameters, Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
42+
public RunWithMetadataMessage( String statement, Map<String,Value> parameters, Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata,
43+
AccessMode mode )
4244
{
43-
super( bookmarks, txTimeout, txMetadata );
45+
super( bookmarks, txTimeout, txMetadata, mode );
4446
this.statement = statement;
4547
this.parameters = parameters;
4648
}

driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionStartingMessage.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.neo4j.driver.internal.Bookmarks;
2525
import org.neo4j.driver.internal.messaging.Message;
2626
import org.neo4j.driver.internal.util.Iterables;
27+
import org.neo4j.driver.v1.AccessMode;
2728
import org.neo4j.driver.v1.Value;
2829

2930
import static java.util.Collections.emptyMap;
@@ -34,26 +35,29 @@ abstract class TransactionStartingMessage implements Message
3435
private static final String BOOKMARKS_METADATA_KEY = "bookmarks";
3536
private static final String TX_TIMEOUT_METADATA_KEY = "tx_timeout";
3637
private static final String TX_METADATA_METADATA_KEY = "tx_metadata";
38+
private static final String MODE_KEY = "mode";
39+
private static final String MODE_READ_VALUE = "r";
3740

3841
final Map<String,Value> metadata;
3942

40-
TransactionStartingMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
43+
TransactionStartingMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata, AccessMode mode )
4144
{
42-
this.metadata = buildMetadata( bookmarks, txTimeout, txMetadata );
45+
this.metadata = buildMetadata( bookmarks, txTimeout, txMetadata, mode );
4346
}
4447

4548
public final Map<String,Value> metadata()
4649
{
4750
return metadata;
4851
}
4952

50-
private static Map<String,Value> buildMetadata( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
53+
private static Map<String,Value> buildMetadata( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata, AccessMode mode )
5154
{
5255
boolean bookmarksPresent = bookmarks != null && !bookmarks.isEmpty();
5356
boolean txTimeoutPresent = txTimeout != null;
5457
boolean txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
58+
boolean accessModePresent = mode == AccessMode.READ;
5559

56-
if ( !bookmarksPresent && !txTimeoutPresent && !txMetadataPresent )
60+
if ( !bookmarksPresent && !txTimeoutPresent && !txMetadataPresent && !accessModePresent )
5761
{
5862
return emptyMap();
5963
}
@@ -73,6 +77,13 @@ private static Map<String,Value> buildMetadata( Bookmarks bookmarks, Duration tx
7377
result.put( TX_METADATA_METADATA_KEY, value( txMetadata ) );
7478
}
7579

80+
switch ( mode )
81+
{
82+
case READ:
83+
result.put( MODE_KEY, value( MODE_READ_VALUE ) );
84+
break;
85+
}
86+
7687
return result;
7788
}
7889
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void prepareToCloseChannel( Channel channel )
9696
@Override
9797
public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config )
9898
{
99-
BeginMessage beginMessage = new BeginMessage( bookmarks, config );
99+
BeginMessage beginMessage = new BeginMessage( bookmarks, config, connection.mode() );
100100

101101
if ( bookmarks.isEmpty() )
102102
{
@@ -148,7 +148,7 @@ private static CompletionStage<InternalStatementResultCursor> runStatement( Conn
148148
Map<String,Value> params = statement.parameters().asMap( ofValue() );
149149

150150
CompletableFuture<Void> runCompletedFuture = new CompletableFuture<>();
151-
Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config );
151+
Message runMessage = new RunWithMetadataMessage( query, params, bookmarksHolder.getBookmarks(), config, connection.mode() );
152152
RunResponseHandler runHandler = new RunResponseHandler( runCompletedFuture, METADATA_EXTRACTOR );
153153
PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, bookmarksHolder, tx );
154154

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.neo4j.driver.internal.messaging.BoltProtocol;
2525
import org.neo4j.driver.internal.messaging.Message;
2626
import org.neo4j.driver.internal.util.ServerVersion;
27+
import org.neo4j.driver.v1.AccessMode;
2728

2829
public interface Connection
2930
{
@@ -52,4 +53,9 @@ public interface Connection
5253
ServerVersion serverVersion();
5354

5455
BoltProtocol protocol();
56+
57+
default AccessMode mode()
58+
{
59+
return AccessMode.WRITE;
60+
}
5561
}

driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,20 @@
1919
package org.neo4j.driver.internal;
2020

2121
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.EnumSource;
2224

2325
import java.util.concurrent.CompletableFuture;
2426
import java.util.stream.Stream;
2527

28+
import org.neo4j.driver.internal.async.AccessModeConnection;
2629
import org.neo4j.driver.internal.spi.Connection;
2730
import org.neo4j.driver.internal.spi.ConnectionPool;
31+
import org.neo4j.driver.v1.AccessMode;
2832

2933
import static java.util.concurrent.CompletableFuture.completedFuture;
34+
import static org.hamcrest.Matchers.instanceOf;
35+
import static org.hamcrest.junit.MatcherAssert.assertThat;
3036
import static org.junit.jupiter.api.Assertions.assertEquals;
3137
import static org.junit.jupiter.api.Assertions.assertSame;
3238
import static org.mockito.Mockito.mock;
@@ -48,8 +54,26 @@ void acquiresConnectionsFromThePool()
4854
ConnectionPool pool = poolMock( address, connection1, connection2 );
4955
DirectConnectionProvider provider = new DirectConnectionProvider( address, pool );
5056

51-
assertSame( connection1, await( provider.acquireConnection( READ ) ) );
52-
assertSame( connection2, await( provider.acquireConnection( WRITE ) ) );
57+
Connection acquired1 = await( provider.acquireConnection( READ ) );
58+
assertThat( acquired1, instanceOf( AccessModeConnection.class ) );
59+
assertSame( connection1, ((AccessModeConnection) acquired1).connection() );
60+
61+
Connection acquired2 = await( provider.acquireConnection( WRITE ) );
62+
assertThat( acquired2, instanceOf( AccessModeConnection.class ) );
63+
assertSame( connection2, ((AccessModeConnection) acquired2).connection() );
64+
}
65+
66+
@ParameterizedTest
67+
@EnumSource( AccessMode.class )
68+
void returnsCorrectAccessMode( AccessMode mode )
69+
{
70+
BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT;
71+
ConnectionPool pool = poolMock( address, mock( Connection.class ) );
72+
DirectConnectionProvider provider = new DirectConnectionProvider( address, pool );
73+
74+
Connection acquired = await( provider.acquireConnection( mode ) );
75+
76+
assertEquals( mode, acquired.mode() );
5377
}
5478

5579
@Test

0 commit comments

Comments
 (0)