Skip to content

Commit 1fb7d00

Browse files
committed
Handle applications not closing sessions better.
- Introduces basic finalizer to the InternalSession object, which closes the underlying connection if the session was not closed by the user, and prints a warning to stderr.
1 parent 678f51f commit 1fb7d00

File tree

16 files changed

+245
-46
lines changed

16 files changed

+245
-46
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222

2323
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.spi.Logger;
2425
import org.neo4j.driver.internal.summary.ResultBuilder;
2526
import org.neo4j.driver.internal.types.InternalTypeSystem;
2627
import org.neo4j.driver.v1.ResultCursor;
@@ -35,6 +36,8 @@ public class InternalSession implements Session
3536
{
3637
private final Connection connection;
3738

39+
private final Logger logger;
40+
3841
/** Called when a transaction object is closed */
3942
private final Runnable txCleanup = new Runnable()
4043
{
@@ -48,9 +51,10 @@ public void run()
4851
private Transaction currentTransaction;
4952
private boolean isOpen = true;
5053

51-
public InternalSession( Connection connection )
54+
public InternalSession( Connection connection, Logger logger )
5255
{
5356
this.connection = connection;
57+
this.logger = logger;
5458
}
5559

5660
@Override
@@ -59,7 +63,6 @@ public ResultCursor run( String statementText, Map<String,Value> statementParame
5963
ensureConnectionIsValid();
6064
ResultBuilder resultBuilder = new ResultBuilder( statementText, statementParameters );
6165
connection.run( statementText, statementParameters, resultBuilder );
62-
6366
connection.pullAll( resultBuilder );
6467
connection.sync();
6568
return resultBuilder.build();
@@ -127,6 +130,17 @@ private void ensureConnectionIsValid()
127130
ensureConnectionIsOpen();
128131
}
129132

133+
@Override
134+
protected void finalize() throws Throwable
135+
{
136+
if( isOpen )
137+
{
138+
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
139+
"method on Sessions before disposing of the objects.", null );
140+
connection.close();
141+
}
142+
}
143+
130144
private void ensureNoOpenTransaction()
131145
{
132146
if ( currentTransaction != null )

driver/src/main/java/org/neo4j/driver/internal/connector/socket/LoggingResponseHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.neo4j.driver.internal.spi.Logger;
2525
import org.neo4j.driver.v1.Value;
2626

27-
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
27+
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
2828
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
2929
import static org.neo4j.driver.internal.messaging.IgnoredMessage.IGNORED;
3030
import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL;
@@ -68,10 +68,10 @@ public void handleDiscardAllMessage()
6868
}
6969

7070
@Override
71-
public void handleAckFailureMessage()
71+
public void handleResetMessage()
7272
{
73-
super.handleAckFailureMessage();
74-
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, ACK_FAILURE );
73+
super.handleResetMessage();
74+
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, RESET );
7575
}
7676

7777
@Override

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.LinkedList;
2424
import java.util.Map;
2525

26-
import org.neo4j.driver.internal.messaging.AckFailureMessage;
26+
import org.neo4j.driver.internal.messaging.ResetMessage;
2727
import org.neo4j.driver.internal.messaging.InitMessage;
2828
import org.neo4j.driver.internal.messaging.Message;
2929
import org.neo4j.driver.internal.messaging.PullAllMessage;
@@ -94,6 +94,13 @@ public void pullAll( StreamCollector collector )
9494
responseHandler.registerResultCollector( messageId, collector );
9595
}
9696

97+
@Override
98+
public void reset( StreamCollector collector )
99+
{
100+
int messageId = queueMessage( ResetMessage.RESET );
101+
responseHandler.registerResultCollector( messageId, collector );
102+
}
103+
97104
@Override
98105
public void sync()
99106
{
@@ -111,7 +118,7 @@ public void sync()
111118
{
112119
// Its enough to simply add the ack message to the outbound queue, it'll value sent
113120
// off as the first message the next time we need to sync with the database.
114-
queueMessage( new AckFailureMessage() );
121+
reset( StreamCollector.NO_OP );
115122
throw responseHandler.serverFailure();
116123
}
117124
}

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void handleDiscardAllMessage()
194194
}
195195

196196
@Override
197-
public void handleAckFailureMessage()
197+
public void handleResetMessage()
198198
{
199199

200200
}

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface MessageHandler
3434

3535
void handleDiscardAllMessage() throws IOException;
3636

37-
void handleAckFailureMessage() throws IOException;
37+
void handleResetMessage() throws IOException;
3838

3939
// Responses
4040
void handleSuccessMessage( Map<String,Value> meta ) throws IOException;

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
public class PackStreamMessageFormatV1 implements MessageFormat
6060
{
6161
public final static byte MSG_INIT = 0x01;
62-
public final static byte MSG_ACK_FAILURE = 0x0F;
62+
public final static byte MSG_RESET = 0x0F;
6363
public final static byte MSG_RUN = 0x10;
6464
public final static byte MSG_DISCARD_ALL = 0x2F;
6565
public final static byte MSG_PULL_ALL = 0x3F;
@@ -152,9 +152,9 @@ public void handleDiscardAllMessage() throws IOException
152152
}
153153

154154
@Override
155-
public void handleAckFailureMessage() throws IOException
155+
public void handleResetMessage() throws IOException
156156
{
157-
packer.packStructHeader( 0, MSG_ACK_FAILURE );
157+
packer.packStructHeader( 0, MSG_RESET );
158158
onMessageComplete.run();
159159
}
160160

@@ -429,11 +429,20 @@ public void read( MessageHandler handler ) throws IOException
429429
case MSG_INIT:
430430
unpackInitMessage( handler );
431431
break;
432+
case MSG_RESET:
433+
unpackResetMessage( handler );
434+
break;
432435
default:
433436
throw new IOException( "Unknown message type: " + type );
434437
}
435438
}
436439

440+
private void unpackResetMessage( MessageHandler handler ) throws IOException
441+
{
442+
handler.handleResetMessage();
443+
onMessageComplete.run();
444+
}
445+
437446
private void unpackInitMessage( MessageHandler handler ) throws IOException
438447
{
439448
handler.handleInitMessage( unpacker.unpackString() );

driver/src/main/java/org/neo4j/driver/internal/messaging/AckFailureMessage.java renamed to driver/src/main/java/org/neo4j/driver/internal/messaging/ResetMessage.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.io.IOException;
2222

2323
/**
24-
* ACK_FAILURE request message
24+
* RESET request message
2525
* <p>
26-
* Sent by clients to acknowledge receipt of failures sent by the server. This is required to
26+
* Sent by clients to reset a session to a clean state - closing any open transaction or result streams.
27+
* This also acknowledges receipt of failures sent by the server. This is required to
2728
* allow optimistic sending of multiple messages before responses have been received - pipelining.
2829
* <p>
2930
* When something goes wrong, we want the server to stop processing our already sent messages,
@@ -33,20 +34,20 @@
3334
* This message acts as a barrier after an error, informing the server that we've seen the error
3435
* message, and that messages that follow this one are safe to execute.
3536
*/
36-
public class AckFailureMessage implements Message
37+
public class ResetMessage implements Message
3738
{
38-
public static final AckFailureMessage ACK_FAILURE = new AckFailureMessage();
39+
public static final ResetMessage RESET = new ResetMessage();
3940

4041
@Override
4142
public void dispatch( MessageHandler handler ) throws IOException
4243
{
43-
handler.handleAckFailureMessage();
44+
handler.handleResetMessage();
4445
}
4546

4647
@Override
4748
public String toString()
4849
{
49-
return "[ACK_FAILURE]";
50+
return "[RESET]";
5051
}
5152

5253
@Override

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public Connection acquire( URI sessionURI )
9292
{
9393
try
9494
{
95-
PooledConnection conn = pool( sessionURI ).acquire( 30, TimeUnit.SECONDS );
95+
Connection conn = pool( sessionURI ).acquire( 30, TimeUnit.SECONDS );
9696
if( conn == null )
9797
{
9898
throw new ClientException(

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,19 @@ public void pullAll( StreamCollector collector )
9393
}
9494
}
9595

96+
@Override
97+
public void reset( StreamCollector collector )
98+
{
99+
try
100+
{
101+
delegate.reset( collector );
102+
}
103+
catch ( RuntimeException e )
104+
{
105+
onDelegateException( e );
106+
}
107+
}
108+
96109
@Override
97110
public void sync()
98111
{
@@ -109,6 +122,9 @@ public void sync()
109122
@Override
110123
public void close()
111124
{
125+
// In case this session has an open result or transaction or something,
126+
// make sure it's reset to a nice state before we reuse it.
127+
reset( StreamCollector.NO_OP );
112128
release.accept( this );
113129
}
114130

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public interface Connection extends AutoCloseable
5252
*/
5353
void pullAll( StreamCollector collector );
5454

55+
/**
56+
* Queue a reset action, output will be handed to the collector once the pull starts. This will
57+
* close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run}
58+
*/
59+
void reset( StreamCollector collector );
60+
5561
/**
5662
* Ensure all outstanding actions are carried out on the server.
5763
*/

driver/src/main/java/org/neo4j/driver/v1/Driver.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,13 @@ public class Driver implements AutoCloseable
7474
{
7575
private final ConnectionPool connections;
7676
private final URI url;
77+
private final Config config;
7778

7879
public Driver( URI url, Config config )
7980
{
8081
this.url = url;
8182
this.connections = new InternalConnectionPool( config );
83+
this.config = config;
8284
}
8385

8486
/**
@@ -88,11 +90,7 @@ public Driver( URI url, Config config )
8890
*/
8991
public Session session()
9092
{
91-
return new InternalSession( connections.acquire( url ) );
92-
// TODO a ConnectionPool per URL
93-
// ConnectionPool connections = new InternalConnectionPool( logging, url );
94-
// And to value a connection from the pool could be
95-
// connections.acquire();
93+
return new InternalSession( connections.acquire( url ), config.logging().getLog( "session" ) );
9694
}
9795

9896
/**

0 commit comments

Comments
 (0)