Skip to content

Commit a9fcd17

Browse files
committed
Fixed Session#reset()
It got broken after moving all blocking APIs to use async ones underneath. This commit makes session send RESET message on the active connection and move existing transaction to TERMINATED state. Transactions in this state will disallow running new queries and committing. All existing tests for `Session#reset()` are now un-ignored and couple new unit tests are added.
1 parent 87d9686 commit a9fcd17

File tree

10 files changed

+441
-113
lines changed

10 files changed

+441
-113
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.internal.spi.Connection;
3232
import org.neo4j.driver.internal.types.InternalTypeSystem;
3333
import org.neo4j.driver.v1.Record;
34+
import org.neo4j.driver.v1.Session;
3435
import org.neo4j.driver.v1.Statement;
3536
import org.neo4j.driver.v1.StatementResult;
3637
import org.neo4j.driver.v1.StatementResultCursor;
@@ -42,7 +43,6 @@
4243

4344
import static java.util.Collections.emptyMap;
4445
import static java.util.concurrent.CompletableFuture.completedFuture;
45-
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
4646
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4747
import static org.neo4j.driver.internal.util.Futures.getBlocking;
4848
import static org.neo4j.driver.v1.Values.value;
@@ -65,10 +65,9 @@ private enum State
6565
MARKED_FAILED,
6666

6767
/**
68-
* An error has occurred, transaction can no longer be used and no more messages will be sent for this
69-
* transaction.
68+
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
7069
*/
71-
FAILED,
70+
TERMINATED,
7271

7372
/** This transaction has successfully committed */
7473
COMMITTED,
@@ -135,17 +134,10 @@ CompletionStage<Void> closeAsync()
135134
{
136135
return commitAsync();
137136
}
138-
else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
137+
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
139138
{
140139
return rollbackAsync();
141140
}
142-
else if ( state == State.FAILED )
143-
{
144-
// unrecoverable error happened, transaction should've been rolled back on the server
145-
// update state so that this transaction does not remain open
146-
state = State.ROLLED_BACK;
147-
return completedFuture( null );
148-
}
149141
else
150142
{
151143
return completedFuture( null );
@@ -161,7 +153,12 @@ public CompletionStage<Void> commitAsync()
161153
}
162154
else if ( state == State.ROLLED_BACK )
163155
{
164-
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
156+
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
157+
}
158+
else if ( state == State.TERMINATED )
159+
{
160+
return failedFuture(
161+
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
165162
}
166163
else
167164
{
@@ -174,12 +171,18 @@ public CompletionStage<Void> rollbackAsync()
174171
{
175172
if ( state == State.COMMITTED )
176173
{
177-
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
174+
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
178175
}
179176
else if ( state == State.ROLLED_BACK )
180177
{
181178
return completedFuture( null );
182179
}
180+
else if ( state == State.TERMINATED )
181+
{
182+
// transaction has been terminated by RESET and should be rolled back by the database
183+
state = State.ROLLED_BACK;
184+
return completedFuture( null );
185+
}
183186
else
184187
{
185188
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
@@ -190,7 +193,6 @@ private BiConsumer<Void,Throwable> transactionClosed( State newState )
190193
{
191194
return ( ignore, error ) ->
192195
{
193-
// todo: test that this state transition always happens when commit or rollback
194196
state = newState;
195197
connection.releaseInBackground();
196198
session.setBookmark( bookmark );
@@ -280,18 +282,18 @@ public StatementResult run( Statement statement )
280282
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
281283
{
282284
ensureCanRunQueries();
285+
//noinspection unchecked
283286
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
284287
}
285288

286289
@Override
287290
public boolean isOpen()
288291
{
289-
return state != State.COMMITTED && state != State.ROLLED_BACK;
292+
return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED;
290293
}
291294

292295
private void ensureCanRunQueries()
293296
{
294-
// todo: test these two new branches
295297
if ( state == State.COMMITTED )
296298
{
297299
throw new ClientException( "Cannot run more statements in this transaction, it has been committed" );
@@ -300,14 +302,19 @@ else if ( state == State.ROLLED_BACK )
300302
{
301303
throw new ClientException( "Cannot run more statements in this transaction, it has been rolled back" );
302304
}
303-
else if ( state == State.FAILED || state == State.MARKED_FAILED )
305+
else if ( state == State.MARKED_FAILED )
304306
{
305307
throw new ClientException(
306308
"Cannot run more statements in this transaction, because previous statements in the " +
307309
"transaction has failed and the transaction has been rolled back. Please start a new " +
308310
"transaction to run another statement."
309311
);
310312
}
313+
else if ( state == State.TERMINATED )
314+
{
315+
throw new ClientException(
316+
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
317+
}
311318
}
312319

313320
@Override
@@ -316,21 +323,9 @@ public TypeSystem typeSystem()
316323
return InternalTypeSystem.TYPE_SYSTEM;
317324
}
318325

319-
public void resultFailed( Throwable error )
320-
{
321-
if ( isRecoverable( error ) )
322-
{
323-
failure();
324-
}
325-
else
326-
{
327-
markToClose();
328-
}
329-
}
330-
331-
public void markToClose()
326+
public void markTerminated()
332327
{
333-
state = State.FAILED;
328+
state = State.TERMINATED;
334329
}
335330

336331
public Bookmark bookmark()

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void close()
160160
getBlocking( lastResultStage
161161
.exceptionally( error -> null )
162162
.thenCompose( this::ensureBuffered )
163-
.thenCompose( error -> forceReleaseResources().thenApply( ignore ->
163+
.thenCompose( error -> releaseResources().thenApply( ignore ->
164164
{
165165
if ( error != null )
166166
{
@@ -177,7 +177,7 @@ public CompletionStage<Void> closeAsync()
177177
// todo: wait for buffered result?
178178
if ( open.compareAndSet( true, false ) )
179179
{
180-
return forceReleaseResources();
180+
return releaseResources();
181181
}
182182
return completedFuture( null );
183183
}
@@ -254,7 +254,19 @@ public String lastBookmark()
254254
@Override
255255
public void reset()
256256
{
257-
getBlocking( forceReleaseResources() );
257+
getBlocking( resetAsync() );
258+
}
259+
260+
private CompletionStage<Void> resetAsync()
261+
{
262+
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
263+
.thenAccept( tx ->
264+
{
265+
if ( tx != null )
266+
{
267+
tx.markTerminated();
268+
}
269+
} );
258270
}
259271

260272
@Override
@@ -465,42 +477,38 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
465477
return connectionStage;
466478
}
467479

468-
private CompletionStage<Void> forceReleaseResources()
480+
private CompletionStage<Void> releaseResources()
469481
{
470-
return rollbackTransaction().thenCompose( ignore -> forceReleaseConnection() );
482+
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
471483
}
472484

473485
private CompletionStage<Void> rollbackTransaction()
474486
{
475-
return transactionStage
476-
.exceptionally( error -> null ) // handle previous acquisition failures
477-
.thenCompose( tx ->
478-
{
479-
if ( tx != null && tx.isOpen() )
480-
{
481-
return tx.rollbackAsync();
482-
}
483-
return completedFuture( null );
484-
} );
487+
return existingTransactionOrNull().thenCompose( tx ->
488+
{
489+
if ( tx != null )
490+
{
491+
return tx.rollbackAsync();
492+
}
493+
return completedFuture( null );
494+
} ).exceptionally( error ->
495+
{
496+
Throwable cause = Futures.completionErrorCause( error );
497+
logger.error( "Failed to rollback active transaction", cause );
498+
return null;
499+
} );
485500
}
486501

487-
private CompletionStage<Void> forceReleaseConnection()
502+
private CompletionStage<Void> releaseConnectionNow()
488503
{
489-
return connectionStage
490-
.exceptionally( error -> null ) // handle previous acquisition failures
491-
.thenCompose( connection ->
492-
{
493-
if ( connection != null )
494-
{
495-
return connection.releaseNow();
496-
}
497-
return completedFuture( null );
498-
} ).exceptionally( error ->
499-
{
500-
// todo: this log message looks wrong, should it go to #rollbackTransaction() ?
501-
logger.error( "Failed to rollback active transaction", error );
502-
return null;
503-
} );
504+
return existingConnectionOrNull().thenCompose( connection ->
505+
{
506+
if ( connection != null )
507+
{
508+
return connection.releaseNow();
509+
}
510+
return completedFuture( null );
511+
} );
504512
}
505513

506514
private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
@@ -517,14 +525,25 @@ private CompletionStage<Void> ensureNoOpenTxBeforeStartingTx()
517525

518526
private CompletionStage<Void> ensureNoOpenTx( String errorMessage )
519527
{
520-
return transactionStage.exceptionally( error -> null )
521-
.thenAccept( tx ->
522-
{
523-
if ( tx != null && tx.isOpen() )
524-
{
525-
throw new ClientException( errorMessage );
526-
}
527-
} );
528+
return existingTransactionOrNull().thenAccept( tx ->
529+
{
530+
if ( tx != null )
531+
{
532+
throw new ClientException( errorMessage );
533+
}
534+
} );
535+
}
536+
537+
private CompletionStage<ExplicitTransaction> existingTransactionOrNull()
538+
{
539+
return transactionStage
540+
.exceptionally( error -> null ) // handle previous acquisition failures
541+
.thenApply( tx -> tx != null && tx.isOpen() ? tx : null );
542+
}
543+
544+
private CompletionStage<Connection> existingConnectionOrNull()
545+
{
546+
return connectionStage.exceptionally( error -> null ); // handle previous acquisition failures
528547
}
529548

530549
private void ensureSessionIsOpen()

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,6 @@ protected void afterSuccess()
4343
@Override
4444
protected void afterFailure( Throwable error )
4545
{
46-
tx.resultFailed( error );
46+
tx.failure();
4747
}
4848
}

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static Neo4jException newNeo4jError( String code, String message )
5151
}
5252
}
5353

54+
// todo: use this method and close channel after unrecoverable error
5455
public static boolean isRecoverable( Throwable error )
5556
{
5657
if ( error instanceof Neo4jException )

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,13 @@ public void shouldBeOpenWhenMarkedForFailure()
144144
}
145145

146146
@Test
147-
public void shouldBeOpenWhenMarkedToClose()
147+
public void shouldBeClosedWhenMarkedAsTerminated()
148148
{
149149
ExplicitTransaction tx = beginTx( connectionMock() );
150150

151-
tx.markToClose();
151+
tx.markTerminated();
152152

153-
assertTrue( tx.isOpen() );
153+
assertFalse( tx.isOpen() );
154154
}
155155

156156
@Test
@@ -176,11 +176,11 @@ public void shouldBeClosedAfterRollback()
176176
}
177177

178178
@Test
179-
public void shouldBeClosedWhenMarkedToCloseAndClosed()
179+
public void shouldBeClosedWhenMarkedTerminatedAndClosed()
180180
{
181181
ExplicitTransaction tx = beginTx( connectionMock() );
182182

183-
tx.markToClose();
183+
tx.markTerminated();
184184
tx.close();
185185

186186
assertFalse( tx.isOpen() );

0 commit comments

Comments
 (0)