Skip to content

Commit a2d7246

Browse files
committed
wip
1 parent 7405aa2 commit a2d7246

File tree

1 file changed

+127
-27
lines changed

1 file changed

+127
-27
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 127 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Arrays;
2222
import java.util.EnumSet;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionException;
2425
import java.util.concurrent.CompletionStage;
2526
import java.util.concurrent.locks.Lock;
@@ -71,6 +72,12 @@ private enum State
7172
}
7273

7374
private static final EnumSet<State> OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED );
75+
private static final String CANT_COMMIT_COMMITTED_MSG = "Can't commit, transaction has been committed";
76+
private static final String CANT_ROLLBACK_COMMITTED_MSG = "Can't rollback, transaction has been committed";
77+
private static final String CANT_COMMIT_ROLLED_BACK_MSG = "Can't commit, transaction has been rolled back";
78+
private static final String CANT_ROLLBACK_ROLLED_BACK_MSG = "Can't rollback, transaction has been rolled back";
79+
private static final String CANT_COMMIT_ROLLING_BACK_MSG = "Can't commit, transaction has been requested to be rolled back";
80+
private static final String CANT_ROLLBACK_COMMITTING_MSG = "Can't rollback, transaction has been requested to be committed";
7481

7582
private final Connection connection;
7683
private final BoltProtocol protocol;
@@ -79,8 +86,8 @@ private enum State
7986
private final long fetchSize;
8087
private final Lock lock = new ReentrantLock();
8188
private State state = State.ACTIVE;
82-
private CompletionStage<Void> commitStage;
83-
private CompletionStage<Void> rollbackStage;
89+
private CompletableFuture<Void> commitFuture;
90+
private CompletableFuture<Void> rollbackFuture;
8491
private Throwable causeOfTermination;
8592

8693
public UnmanagedTransaction( Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
@@ -124,61 +131,156 @@ else if ( beginError instanceof ConnectionReadTimeoutException )
124131

125132
public CompletionStage<Void> closeAsync()
126133
{
127-
return executeWithLock( lock, () -> isOpen() ? rollbackAsync() : completedWithNull() );
134+
CompletionStage<Void> stage = executeWithLock( lock, () ->
135+
{
136+
CompletionStage<Void> resultStage = null;
137+
if ( !isOpen() )
138+
{
139+
resultStage = completedWithNull();
140+
}
141+
else if ( state == State.COMMITTED )
142+
{
143+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTED_MSG ) );
144+
}
145+
else if ( state == State.ROLLED_BACK )
146+
{
147+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_ROLLED_BACK_MSG ) );
148+
}
149+
else if ( commitFuture != null )
150+
{
151+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTING_MSG ) );
152+
}
153+
else if ( rollbackFuture != null )
154+
{
155+
resultStage = rollbackFuture;
156+
}
157+
else
158+
{
159+
rollbackFuture = new CompletableFuture<>();
160+
}
161+
return resultStage;
162+
} );
163+
164+
if ( stage == null )
165+
{
166+
stage = resultCursors
167+
.retrieveNotConsumedError()
168+
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
169+
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
170+
stage.whenComplete( ( result, error ) ->
171+
{
172+
if ( error != null )
173+
{
174+
rollbackFuture.completeExceptionally( error );
175+
}
176+
else
177+
{
178+
rollbackFuture.complete( result );
179+
}
180+
} );
181+
}
182+
183+
return stage;
128184
}
129185

130186
public CompletionStage<Void> commitAsync()
131187
{
132-
return executeWithLock( lock, () ->
188+
CompletionStage<Void> stage = executeWithLock( lock, () ->
133189
{
190+
CompletionStage<Void> resultStage = null;
134191
if ( state == State.COMMITTED )
135192
{
136-
return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) );
193+
resultStage = failedFuture( new ClientException( CANT_COMMIT_COMMITTED_MSG ) );
137194
}
138195
else if ( state == State.ROLLED_BACK )
139196
{
140-
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
197+
resultStage = failedFuture( new ClientException( CANT_COMMIT_ROLLED_BACK_MSG ) );
198+
}
199+
else if ( rollbackFuture != null )
200+
{
201+
resultStage = failedFuture( new ClientException( CANT_COMMIT_ROLLING_BACK_MSG ) );
141202
}
142-
else if ( commitStage != null )
203+
else if ( commitFuture != null )
143204
{
144-
return commitStage;
205+
resultStage = commitFuture;
145206
}
146207
else
147208
{
148-
commitStage = resultCursors
149-
.retrieveNotConsumedError()
150-
.thenCompose( error -> doCommitAsync( error ).handle( handleCommitOrRollback( error ) ) )
151-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( true, error ) );
152-
return commitStage;
209+
commitFuture = new CompletableFuture<>();
153210
}
211+
return resultStage;
154212
} );
213+
214+
if ( stage == null )
215+
{
216+
stage = resultCursors
217+
.retrieveNotConsumedError()
218+
.thenCompose( error -> doCommitAsync( error ).handle( handleCommitOrRollback( error ) ) )
219+
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( true, error ) );
220+
stage.whenComplete( ( result, error ) ->
221+
{
222+
if ( error != null )
223+
{
224+
commitFuture.completeExceptionally( error );
225+
}
226+
else
227+
{
228+
commitFuture.complete( result );
229+
}
230+
} );
231+
}
232+
233+
return stage;
155234
}
156235

157236
public CompletionStage<Void> rollbackAsync()
158237
{
159-
return executeWithLock( lock, () ->
238+
CompletionStage<Void> stage = executeWithLock( lock, () ->
160239
{
240+
CompletionStage<Void> resultStage = null;
161241
if ( state == State.COMMITTED )
162242
{
163-
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
243+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTED_MSG ) );
164244
}
165245
else if ( state == State.ROLLED_BACK )
166246
{
167-
return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) );
247+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_ROLLED_BACK_MSG ) );
168248
}
169-
else if ( rollbackStage != null )
249+
else if ( commitFuture != null )
170250
{
171-
return rollbackStage;
251+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTING_MSG ) );
252+
}
253+
else if ( rollbackFuture != null )
254+
{
255+
resultStage = rollbackFuture;
172256
}
173257
else
174258
{
175-
rollbackStage = resultCursors
176-
.retrieveNotConsumedError()
177-
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
178-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
179-
return rollbackStage;
259+
rollbackFuture = new CompletableFuture<>();
180260
}
261+
return resultStage;
181262
} );
263+
264+
if ( stage == null )
265+
{
266+
stage = resultCursors
267+
.retrieveNotConsumedError()
268+
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
269+
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
270+
stage.whenComplete( ( result, error ) ->
271+
{
272+
if ( error != null )
273+
{
274+
rollbackFuture.completeExceptionally( error );
275+
}
276+
else
277+
{
278+
rollbackFuture.complete( result );
279+
}
280+
} );
281+
}
282+
283+
return stage;
182284
}
183285

184286
public CompletionStage<ResultCursor> runAsync( Query query )
@@ -201,8 +303,7 @@ public CompletionStage<RxResultCursor> runRx( Query query )
201303

202304
public boolean isOpen()
203305
{
204-
State currentState = executeWithLock( lock, () -> state );
205-
return OPEN_STATES.contains( currentState );
306+
return OPEN_STATES.contains( executeWithLock( lock, () -> state ) );
206307
}
207308

208309
public void markTerminated( Throwable cause )
@@ -275,8 +376,7 @@ private CompletionStage<Void> doCommitAsync( Throwable cursorFailure )
275376

276377
private CompletionStage<Void> doRollbackAsync()
277378
{
278-
State currentState = executeWithLock( lock, () -> state );
279-
return currentState == State.TERMINATED ? completedWithNull() : protocol.rollbackTransaction( connection );
379+
return executeWithLock( lock, () -> state ) == State.TERMINATED ? completedWithNull() : protocol.rollbackTransaction( connection );
280380
}
281381

282382
private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )

0 commit comments

Comments
 (0)