Skip to content

Commit b665228

Browse files
committed
Refactor: Use composition instead of inheritance
`RoutingNetworkSession` now delegates instead of extends `NetworkSession`
1 parent df9a447 commit b665228

File tree

5 files changed

+269
-20
lines changed

5 files changed

+269
-20
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ public Session session()
242242
@Override
243243
public Session session( final AccessMode mode )
244244
{
245-
return new RoutingNetworkSession( mode, acquireConnection( mode ),
245+
Connection connection = acquireConnection( mode );
246+
return new RoutingNetworkSession( new NetworkSession( connection ), mode, connection.address(),
246247
new RoutingErrorHandler()
247248
{
248249
@Override

driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,68 +19,147 @@
1919
package org.neo4j.driver.internal;
2020

2121

22+
import java.util.Map;
23+
2224
import org.neo4j.driver.internal.net.BoltServerAddress;
23-
import org.neo4j.driver.internal.spi.Connection;
2425
import org.neo4j.driver.v1.AccessMode;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.Session;
2528
import org.neo4j.driver.v1.Statement;
2629
import org.neo4j.driver.v1.StatementResult;
30+
import org.neo4j.driver.v1.Transaction;
31+
import org.neo4j.driver.v1.Value;
32+
import org.neo4j.driver.v1.Values;
2733
import org.neo4j.driver.v1.exceptions.ClientException;
2834
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
2935
import org.neo4j.driver.v1.exceptions.Neo4jException;
3036
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
37+
import org.neo4j.driver.v1.types.TypeSystem;
3138

3239
import static java.lang.String.format;
40+
import static org.neo4j.driver.v1.Values.value;
3341

34-
public class RoutingNetworkSession extends NetworkSession
42+
public class RoutingNetworkSession implements Session
3543
{
44+
protected final Session delegate;
45+
private final BoltServerAddress address;
3646
private final AccessMode mode;
3747
private final RoutingErrorHandler onError;
3848

39-
RoutingNetworkSession( AccessMode mode, Connection connection,
49+
RoutingNetworkSession( Session delegate, AccessMode mode, BoltServerAddress address,
4050
RoutingErrorHandler onError )
4151
{
42-
super( connection );
52+
this.delegate = delegate;
4353
this.mode = mode;
54+
this.address = address;
4455
this.onError = onError;
4556
}
4657

58+
@Override
59+
public StatementResult run( String statementText )
60+
{
61+
return run( statementText, Values.EmptyMap );
62+
}
63+
64+
@Override
65+
public StatementResult run( String statementText, Map<String,Object> statementParameters )
66+
{
67+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
68+
return run( statementText, params );
69+
}
70+
71+
@Override
72+
public StatementResult run( String statementTemplate, Record statementParameters )
73+
{
74+
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
75+
return run( statementTemplate, params );
76+
}
77+
78+
@Override
79+
public StatementResult run( String statementText, Value statementParameters )
80+
{
81+
return run( new Statement( statementText, statementParameters ) );
82+
}
83+
4784
@Override
4885
public StatementResult run( Statement statement )
4986
{
5087
try
5188
{
52-
return new RoutingStatementResult( super.run( statement ), mode, connection.address(), onError );
89+
return new RoutingStatementResult( delegate.run( statement ), mode, address, onError );
5390
}
5491
catch ( ConnectionFailureException e )
5592
{
56-
throw sessionExpired( e, onError, connection.address() );
93+
throw sessionExpired( e, onError, address );
5794
}
5895
catch ( ClientException e )
5996
{
60-
throw filterFailureToWrite( e, mode, onError, connection.address() );
97+
throw filterFailureToWrite( e, mode, onError, address );
6198
}
6299
}
63100

101+
@Override
102+
public TypeSystem typeSystem()
103+
{
104+
return delegate.typeSystem();
105+
}
106+
107+
@Override
108+
public Transaction beginTransaction()
109+
{
110+
return delegate.beginTransaction();
111+
}
112+
113+
@Override
114+
public Transaction beginTransaction( String bookmark )
115+
{
116+
return delegate.beginTransaction(bookmark);
117+
}
118+
119+
@Override
120+
public String lastBookmark()
121+
{
122+
return delegate.lastBookmark();
123+
}
124+
125+
@Override
126+
public void reset()
127+
{
128+
delegate.reset();
129+
}
130+
131+
@Override
132+
public boolean isOpen()
133+
{
134+
return delegate.isOpen();
135+
}
136+
64137
@Override
65138
public void close()
66139
{
67140
try
68141
{
69-
super.close();
142+
delegate.close();
70143
}
71144
catch ( ConnectionFailureException e )
72145
{
73-
throw sessionExpired(e, onError, connection.address());
146+
throw sessionExpired(e, onError, address);
74147
}
75148
catch ( ClientException e )
76149
{
77-
throw filterFailureToWrite( e, mode, onError, connection.address() );
150+
throw filterFailureToWrite( e, mode, onError, address );
78151
}
79152
}
80153

154+
@Override
155+
public String server()
156+
{
157+
return delegate.server();
158+
}
159+
81160
public BoltServerAddress address()
82161
{
83-
return connection.address();
162+
return address;
84163
}
85164

86165
static Neo4jException filterFailureToWrite( ClientException e, AccessMode mode, RoutingErrorHandler onError,

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.neo4j.driver.v1.GraphDatabase;
4444
import org.neo4j.driver.v1.Record;
4545
import org.neo4j.driver.v1.Session;
46+
import org.neo4j.driver.v1.Transaction;
4647
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
4748
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4849
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
@@ -521,6 +522,103 @@ public void shouldHandleLeaderSwitchWhenWriting()
521522
assertThat( server.exitStatus(), equalTo( 0 ) );
522523
}
523524

525+
@Test
526+
public void shouldHandleLeaderSwitchWhenWritingWithoutConsuming()
527+
throws IOException, InterruptedException, StubServer.ForceKilled
528+
{
529+
// Given
530+
StubServer server = StubServer.start( "acquire_endpoints.script", 9001 );
531+
532+
//START a write server that doesn't accept writes
533+
StubServer.start( "not_able_to_write_server.script", 9007 );
534+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
535+
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
536+
boolean failed = false;
537+
try ( Session session = driver.session( AccessMode.WRITE ) )
538+
{
539+
assertThat( driver.writeServers(), hasItem(address( 9007 ) ) );
540+
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
541+
session.run( "CREATE ()" );
542+
}
543+
catch ( SessionExpiredException e )
544+
{
545+
failed = true;
546+
assertThat( e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ) );
547+
}
548+
assertTrue( failed );
549+
assertThat( driver.writeServers(), not( hasItem( address( 9007 ) ) ) );
550+
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
551+
assertTrue( driver.connectionPool().hasAddress( address( 9007 ) ) );
552+
553+
driver.close();
554+
// Finally
555+
assertThat( server.exitStatus(), equalTo( 0 ) );
556+
}
557+
558+
@Ignore
559+
public void shouldHandleLeaderSwitchWhenWritingInTransaction()
560+
throws IOException, InterruptedException, StubServer.ForceKilled
561+
{
562+
// Given
563+
StubServer server = StubServer.start( "acquire_endpoints.script", 9001 );
564+
565+
//START a write server that doesn't accept writes
566+
StubServer.start( "not_able_to_write_server.script", 9007 );
567+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
568+
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
569+
boolean failed = false;
570+
try ( Session session = driver.session( AccessMode.WRITE );
571+
Transaction tx = session.beginTransaction() )
572+
{
573+
tx.run( "CREATE ()" ).consume();
574+
}
575+
catch ( SessionExpiredException e )
576+
{
577+
failed = true;
578+
assertThat( e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ) );
579+
}
580+
assertTrue( failed );
581+
assertThat( driver.writeServers(), not( hasItem( address( 9007 ) ) ) );
582+
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
583+
assertTrue( driver.connectionPool().hasAddress( address( 9007 ) ) );
584+
585+
driver.close();
586+
// Finally
587+
assertThat( server.exitStatus(), equalTo( 0 ) );
588+
}
589+
590+
@Ignore
591+
public void shouldHandleLeaderSwitchWhenWritingInTransactionWithoutConsuming()
592+
throws IOException, InterruptedException, StubServer.ForceKilled
593+
{
594+
// Given
595+
StubServer server = StubServer.start( "acquire_endpoints.script", 9001 );
596+
597+
//START a write server that doesn't accept writes
598+
StubServer.start( "not_able_to_write_server.script", 9007 );
599+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
600+
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
601+
boolean failed = false;
602+
try ( Session session = driver.session( AccessMode.WRITE );
603+
Transaction tx = session.beginTransaction() )
604+
{
605+
tx.run( "CREATE ()" );
606+
}
607+
catch ( SessionExpiredException e )
608+
{
609+
failed = true;
610+
assertThat( e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ) );
611+
}
612+
assertTrue( failed );
613+
assertThat( driver.writeServers(), not( hasItem( address( 9007 ) ) ) );
614+
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
615+
assertTrue( driver.connectionPool().hasAddress( address( 9007 ) ) );
616+
617+
driver.close();
618+
// Finally
619+
assertThat( server.exitStatus(), equalTo( 0 ) );
620+
}
621+
524622
@Test
525623
public void shouldRediscoverOnExpiry() throws IOException, InterruptedException, StubServer.ForceKilled
526624
{
@@ -589,7 +687,7 @@ public void shouldNotPutBackPurgedConnection() throws IOException, InterruptedEx
589687

590688
// now we close the read session and the connection should not be put
591689
// back to the pool
592-
Connection connection = ((RoutingNetworkSession) readSession).connection;
690+
Connection connection = ((NetworkSession) ((RoutingNetworkSession) readSession).delegate).connection;
593691
assertTrue( connection.isOpen() );
594692
readSession.close();
595693
assertFalse( connection.isOpen() );

0 commit comments

Comments
 (0)