Skip to content

Commit 9c0f8f5

Browse files
authored
Merge pull request #418 from lutovich/1.5-session-reset
Fixed Session#reset()
2 parents 7c4406d + b18d7ca commit 9c0f8f5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+729
-573
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
5252
@Override
5353
public CompletionStage<Void> verifyConnectivity()
5454
{
55-
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
55+
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
5656
}
5757

5858
@Override

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.internal.spi.Connection;
3232
import org.neo4j.driver.internal.types.InternalTypeSystem;
3333
import org.neo4j.driver.v1.Record;
34+
import org.neo4j.driver.v1.Session;
3435
import org.neo4j.driver.v1.Statement;
3536
import org.neo4j.driver.v1.StatementResult;
3637
import org.neo4j.driver.v1.StatementResultCursor;
@@ -42,7 +43,6 @@
4243

4344
import static java.util.Collections.emptyMap;
4445
import static java.util.concurrent.CompletableFuture.completedFuture;
45-
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
4646
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4747
import static org.neo4j.driver.internal.util.Futures.getBlocking;
4848
import static org.neo4j.driver.v1.Values.value;
@@ -65,10 +65,9 @@ private enum State
6565
MARKED_FAILED,
6666

6767
/**
68-
* An error has occurred, transaction can no longer be used and no more messages will be sent for this
69-
* transaction.
68+
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
7069
*/
71-
FAILED,
70+
TERMINATED,
7271

7372
/** This transaction has successfully committed */
7473
COMMITTED,
@@ -135,17 +134,10 @@ CompletionStage<Void> closeAsync()
135134
{
136135
return commitAsync();
137136
}
138-
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
137+
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
139138
{
140139
return rollbackAsync();
141140
}
142-
else if ( state == State.FAILED )
143-
{
144-
// unrecoverable error happened, transaction should've been rolled back on the server
145-
// update state so that this transaction does not remain open
146-
state = State.ROLLED_BACK;
147-
return completedFuture( null );
148-
}
149141
else
150142
{
151143
return completedFuture( null );
@@ -161,7 +153,12 @@ public CompletionStage<Void> commitAsync()
161153
}
162154
else if ( state == State.ROLLED_BACK )
163155
{
164-
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
156+
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
157+
}
158+
else if ( state == State.TERMINATED )
159+
{
160+
return failedFuture(
161+
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
165162
}
166163
else
167164
{
@@ -174,12 +171,18 @@ public CompletionStage<Void> rollbackAsync()
174171
{
175172
if ( state == State.COMMITTED )
176173
{
177-
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
174+
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
178175
}
179176
else if ( state == State.ROLLED_BACK )
180177
{
181178
return completedFuture( null );
182179
}
180+
else if ( state == State.TERMINATED )
181+
{
182+
// transaction has been terminated by RESET and should be rolled back by the database
183+
state = State.ROLLED_BACK;
184+
return completedFuture( null );
185+
}
183186
else
184187
{
185188
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
@@ -190,9 +193,8 @@ private BiConsumer<Void,Throwable> transactionClosed( State newState )
190193
{
191194
return ( ignore, error ) ->
192195
{
193-
// todo: test that this state transition always happens when commit or rollback
194196
state = newState;
195-
connection.release();
197+
connection.releaseInBackground();
196198
session.setBookmark( bookmark );
197199
};
198200
}
@@ -280,18 +282,18 @@ public StatementResult run( Statement statement )
280282
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
281283
{
282284
ensureCanRunQueries();
285+
//noinspection unchecked
283286
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
284287
}
285288

286289
@Override
287290
public boolean isOpen()
288291
{
289-
return state != State.COMMITTED && state != State.ROLLED_BACK;
292+
return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED;
290293
}
291294

292295
private void ensureCanRunQueries()
293296
{
294-
// todo: test these two new branches
295297
if ( state == State.COMMITTED )
296298
{
297299
throw new ClientException( "Cannot run more statements in this transaction, it has been committed" );
@@ -300,14 +302,19 @@ else if ( state == State.ROLLED_BACK )
300302
{
301303
throw new ClientException( "Cannot run more statements in this transaction, it has been rolled back" );
302304
}
303-
else if ( state == State.FAILED || state == State.MARKED_FAILED )
305+
else if ( state == State.MARKED_FAILED )
304306
{
305307
throw new ClientException(
306308
"Cannot run more statements in this transaction, because previous statements in the " +
307309
"transaction has failed and the transaction has been rolled back. Please start a new " +
308310
"transaction to run another statement."
309311
);
310312
}
313+
else if ( state == State.TERMINATED )
314+
{
315+
throw new ClientException(
316+
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
317+
}
311318
}
312319

313320
@Override
@@ -316,21 +323,9 @@ public TypeSystem typeSystem()
316323
return InternalTypeSystem.TYPE_SYSTEM;
317324
}
318325

319-
public void resultFailed( Throwable error )
320-
{
321-
if ( isRecoverable( error ) )
322-
{
323-
failure();
324-
}
325-
else
326-
{
327-
markToClose();
328-
}
329-
}
330-
331-
public void markToClose()
326+
public void markTerminated()
332327
{
333-
state = State.FAILED;
328+
state = State.TERMINATED;
334329
}
335330

336331
public Bookmark bookmark()

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void close()
160160
getBlocking( lastResultStage
161161
.exceptionally( error -> null )
162162
.thenCompose( this::ensureBuffered )
163-
.thenCompose( error -> forceReleaseResources().thenApply( ignore ->
163+
.thenCompose( error -> releaseResources().thenApply( ignore ->
164164
{
165165
if ( error != null )
166166
{
@@ -177,7 +177,7 @@ public CompletionStage<Void> closeAsync()
177177
// todo: wait for buffered result?
178178
if ( open.compareAndSet( true, false ) )
179179
{
180-
return forceReleaseResources();
180+
return releaseResources();
181181
}
182182
return completedFuture( null );
183183
}
@@ -254,7 +254,19 @@ public String lastBookmark()
254254
@Override
255255
public void reset()
256256
{
257-
getBlocking( forceReleaseResources() );
257+
getBlocking( resetAsync() );
258+
}
259+
260+
private CompletionStage<Void> resetAsync()
261+
{
262+
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
263+
.thenAccept( tx ->
264+
{
265+
if ( tx != null )
266+
{
267+
tx.markTerminated();
268+
}
269+
} );
258270
}
259271

260272
@Override
@@ -465,41 +477,38 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
465477
return connectionStage;
466478
}
467479

468-
private CompletionStage<Void> forceReleaseResources()
480+
private CompletionStage<Void> releaseResources()
469481
{
470-
return rollbackTransaction().thenCompose( ignore -> forceReleaseConnection() );
482+
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
471483
}
472484

473485
private CompletionStage<Void> rollbackTransaction()
474486
{
475-
return transactionStage
476-
.exceptionally( error -> null ) // handle previous acquisition failures
477-
.thenCompose( tx ->
478-
{
479-
if ( tx != null && tx.isOpen() )
480-
{
481-
return tx.rollbackAsync();
482-
}
483-
return completedFuture( null );
484-
} );
487+
return existingTransactionOrNull().thenCompose( tx ->
488+
{
489+
if ( tx != null )
490+
{
491+
return tx.rollbackAsync();
492+
}
493+
return completedFuture( null );
494+
} ).exceptionally( error ->
495+
{
496+
Throwable cause = Futures.completionErrorCause( error );
497+
logger.error( "Failed to rollback active transaction", cause );
498+
return null;
499+
} );
485500
}
486501

487-
private CompletionStage<Void> forceReleaseConnection()
502+
private CompletionStage<Void> releaseConnectionNow()
488503
{
489-
return connectionStage
490-
.exceptionally( error -> null ) // handle previous acquisition failures
491-
.thenCompose( connection ->
492-
{
493-
if ( connection != null )
494-
{
495-
return connection.forceRelease();
496-
}
497-
return completedFuture( null );
498-
} ).exceptionally( error ->
499-
{
500-
logger.error( "Failed to rollback active transaction", error );
501-
return null;
502-
} );
504+
return existingConnectionOrNull().thenCompose( connection ->
505+
{
506+
if ( connection != null )
507+
{
508+
return connection.releaseNow();
509+
}
510+
return completedFuture( null );
511+
} );
503512
}
504513

505514
private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
@@ -516,14 +525,25 @@ private CompletionStage<Void> ensureNoOpenTxBeforeStartingTx()
516525

517526
private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
518527
{
519-
return transactionStage.exceptionally( error -> null )
520-
.thenAccept( tx ->
521-
{
522-
if ( tx != null && tx.isOpen() )
523-
{
524-
throw new ClientException( errorMessage );
525-
}
526-
} );
528+
return existingTransactionOrNull().thenAccept( tx ->
529+
{
530+
if ( tx != null )
531+
{
532+
throw new ClientException( errorMessage );
533+
}
534+
} );
535+
}
536+
537+
private CompletionStage<ExplicitTransaction> existingTransactionOrNull()
538+
{
539+
return transactionStage
540+
.exceptionally( error -> null ) // handle previous acquisition failures
541+
.thenApply( tx -> tx != null && tx.isOpen() ? tx : null );
542+
}
543+
544+
private CompletionStage<Connection> existingConnectionOrNull()
545+
{
546+
return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures
527547
}
528548

529549
private void ensureSessionIsOpen()

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import java.util.Map;
2727

28-
import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler;
28+
import org.neo4j.driver.internal.handlers.InitResponseHandler;
2929
import org.neo4j.driver.internal.messaging.InitMessage;
3030
import org.neo4j.driver.v1.Value;
3131

@@ -54,7 +54,7 @@ public void operationComplete( ChannelFuture future )
5454
Channel channel = future.channel();
5555

5656
InitMessage message = new InitMessage( userAgent, authToken );
57-
AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
57+
InitResponseHandler handler = new InitResponseHandler( connectionInitializedPromise );
5858

5959
messageDispatcher( channel ).queue( handler );
6060
channel.writeAndFlush( message, channel.voidPromise() );

0 commit comments

Comments
 (0)