Skip to content

Commit 3830fc5

Browse files
committed
updated
1 parent a2d7246 commit 3830fc5

File tree

2 files changed

+101
-145
lines changed

2 files changed

+101
-145
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 86 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.locks.Lock;
2727
import java.util.concurrent.locks.ReentrantLock;
2828
import java.util.function.BiFunction;
29+
import java.util.function.Function;
2930

3031
import org.neo4j.driver.Bookmark;
3132
import org.neo4j.driver.Query;
@@ -40,10 +41,12 @@
4041
import org.neo4j.driver.internal.cursor.RxResultCursor;
4142
import org.neo4j.driver.internal.messaging.BoltProtocol;
4243
import org.neo4j.driver.internal.spi.Connection;
43-
import org.neo4j.driver.internal.util.Futures;
4444

45+
import static org.neo4j.driver.internal.util.Futures.asCompletionException;
46+
import static org.neo4j.driver.internal.util.Futures.combineErrors;
4547
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4648
import static org.neo4j.driver.internal.util.Futures.failedFuture;
49+
import static org.neo4j.driver.internal.util.Futures.futureCompletingConsumer;
4750
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
4851

4952
public class UnmanagedTransaction
@@ -123,164 +126,25 @@ else if ( beginError instanceof ConnectionReadTimeoutException )
123126
{
124127
connection.release();
125128
}
126-
throw Futures.asCompletionException( beginError );
129+
throw asCompletionException( beginError );
127130
}
128131
return this;
129132
} );
130133
}
131134

132135
public CompletionStage<Void> closeAsync()
133136
{
134-
CompletionStage<Void> stage = executeWithLock( lock, () ->
135-
{
136-
CompletionStage<Void> resultStage = null;
137-
if ( !isOpen() )
138-
{
139-
resultStage = completedWithNull();
140-
}
141-
else if ( state == State.COMMITTED )
142-
{
143-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTED_MSG ) );
144-
}
145-
else if ( state == State.ROLLED_BACK )
146-
{
147-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_ROLLED_BACK_MSG ) );
148-
}
149-
else if ( commitFuture != null )
150-
{
151-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTING_MSG ) );
152-
}
153-
else if ( rollbackFuture != null )
154-
{
155-
resultStage = rollbackFuture;
156-
}
157-
else
158-
{
159-
rollbackFuture = new CompletableFuture<>();
160-
}
161-
return resultStage;
162-
} );
163-
164-
if ( stage == null )
165-
{
166-
stage = resultCursors
167-
.retrieveNotConsumedError()
168-
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
169-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
170-
stage.whenComplete( ( result, error ) ->
171-
{
172-
if ( error != null )
173-
{
174-
rollbackFuture.completeExceptionally( error );
175-
}
176-
else
177-
{
178-
rollbackFuture.complete( result );
179-
}
180-
} );
181-
}
182-
183-
return stage;
137+
return closeAsync( false, true );
184138
}
185139

186140
public CompletionStage<Void> commitAsync()
187141
{
188-
CompletionStage<Void> stage = executeWithLock( lock, () ->
189-
{
190-
CompletionStage<Void> resultStage = null;
191-
if ( state == State.COMMITTED )
192-
{
193-
resultStage = failedFuture( new ClientException( CANT_COMMIT_COMMITTED_MSG ) );
194-
}
195-
else if ( state == State.ROLLED_BACK )
196-
{
197-
resultStage = failedFuture( new ClientException( CANT_COMMIT_ROLLED_BACK_MSG ) );
198-
}
199-
else if ( rollbackFuture != null )
200-
{
201-
resultStage = failedFuture( new ClientException( CANT_COMMIT_ROLLING_BACK_MSG ) );
202-
}
203-
else if ( commitFuture != null )
204-
{
205-
resultStage = commitFuture;
206-
}
207-
else
208-
{
209-
commitFuture = new CompletableFuture<>();
210-
}
211-
return resultStage;
212-
} );
213-
214-
if ( stage == null )
215-
{
216-
stage = resultCursors
217-
.retrieveNotConsumedError()
218-
.thenCompose( error -> doCommitAsync( error ).handle( handleCommitOrRollback( error ) ) )
219-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( true, error ) );
220-
stage.whenComplete( ( result, error ) ->
221-
{
222-
if ( error != null )
223-
{
224-
commitFuture.completeExceptionally( error );
225-
}
226-
else
227-
{
228-
commitFuture.complete( result );
229-
}
230-
} );
231-
}
232-
233-
return stage;
142+
return closeAsync( true, false );
234143
}
235144

236145
public CompletionStage<Void> rollbackAsync()
237146
{
238-
CompletionStage<Void> stage = executeWithLock( lock, () ->
239-
{
240-
CompletionStage<Void> resultStage = null;
241-
if ( state == State.COMMITTED )
242-
{
243-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTED_MSG ) );
244-
}
245-
else if ( state == State.ROLLED_BACK )
246-
{
247-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_ROLLED_BACK_MSG ) );
248-
}
249-
else if ( commitFuture != null )
250-
{
251-
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTING_MSG ) );
252-
}
253-
else if ( rollbackFuture != null )
254-
{
255-
resultStage = rollbackFuture;
256-
}
257-
else
258-
{
259-
rollbackFuture = new CompletableFuture<>();
260-
}
261-
return resultStage;
262-
} );
263-
264-
if ( stage == null )
265-
{
266-
stage = resultCursors
267-
.retrieveNotConsumedError()
268-
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
269-
.whenComplete( ( ignore, error ) -> handleTransactionCompletion( false, error ) );
270-
stage.whenComplete( ( result, error ) ->
271-
{
272-
if ( error != null )
273-
{
274-
rollbackFuture.completeExceptionally( error );
275-
}
276-
else
277-
{
278-
rollbackFuture.complete( result );
279-
}
280-
} );
281-
}
282-
283-
return stage;
147+
return closeAsync( false, false );
284148
}
285149

286150
public CompletionStage<ResultCursor> runAsync( Query query )
@@ -383,7 +247,7 @@ private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable
383247
{
384248
return ( ignore, commitOrRollbackError ) ->
385249
{
386-
CompletionException combinedError = Futures.combineErrors( cursorFailure, commitOrRollbackError );
250+
CompletionException combinedError = combineErrors( cursorFailure, commitOrRollbackError );
387251
if ( combinedError != null )
388252
{
389253
throw combinedError;
@@ -418,4 +282,81 @@ else if ( throwable instanceof ConnectionReadTimeoutException )
418282
connection.release(); // release in background
419283
}
420284
}
285+
286+
private CompletionStage<Void> closeAsync( boolean commit, boolean completeWithNullIfNotOpen )
287+
{
288+
CompletionStage<Void> stage = executeWithLock( lock, () ->
289+
{
290+
CompletionStage<Void> resultStage = null;
291+
if ( completeWithNullIfNotOpen && !isOpen() )
292+
{
293+
resultStage = completedWithNull();
294+
}
295+
else if ( state == State.COMMITTED )
296+
{
297+
resultStage = failedFuture( new ClientException( commit ? CANT_COMMIT_COMMITTED_MSG : CANT_ROLLBACK_COMMITTED_MSG ) );
298+
}
299+
else if ( state == State.ROLLED_BACK )
300+
{
301+
resultStage = failedFuture( new ClientException( commit ? CANT_COMMIT_ROLLED_BACK_MSG : CANT_ROLLBACK_ROLLED_BACK_MSG ) );
302+
}
303+
else
304+
{
305+
if ( commit )
306+
{
307+
if ( rollbackFuture != null )
308+
{
309+
resultStage = failedFuture( new ClientException( CANT_COMMIT_ROLLING_BACK_MSG ) );
310+
}
311+
else if ( commitFuture != null )
312+
{
313+
resultStage = commitFuture;
314+
}
315+
else
316+
{
317+
commitFuture = new CompletableFuture<>();
318+
}
319+
}
320+
else
321+
{
322+
if ( commitFuture != null )
323+
{
324+
resultStage = failedFuture( new ClientException( CANT_ROLLBACK_COMMITTING_MSG ) );
325+
}
326+
else if ( rollbackFuture != null )
327+
{
328+
resultStage = rollbackFuture;
329+
}
330+
else
331+
{
332+
rollbackFuture = new CompletableFuture<>();
333+
}
334+
}
335+
}
336+
return resultStage;
337+
} );
338+
339+
if ( stage == null )
340+
{
341+
CompletableFuture<Void> targetFuture;
342+
Function<Throwable,CompletionStage<Void>> targetAction;
343+
if ( commit )
344+
{
345+
targetFuture = commitFuture;
346+
targetAction = throwable -> doCommitAsync( throwable ).handle( handleCommitOrRollback( throwable ) );
347+
}
348+
else
349+
{
350+
targetFuture = rollbackFuture;
351+
targetAction = throwable -> doRollbackAsync().handle( handleCommitOrRollback( throwable ) );
352+
}
353+
resultCursors.retrieveNotConsumedError()
354+
.thenCompose( targetAction )
355+
.whenComplete( ( ignored, throwable ) -> handleTransactionCompletion( commit, throwable ) )
356+
.whenComplete( futureCompletingConsumer( targetFuture ) );
357+
stage = targetFuture;
358+
}
359+
360+
return stage;
361+
}
421362
}

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,21 @@ public static <T> CompletableFuture<T> onErrorContinue( CompletableFuture<T> fut
269269
} );
270270
}
271271

272+
public static <T> BiConsumer<T,Throwable> futureCompletingConsumer( CompletableFuture<T> future )
273+
{
274+
return ( value, throwable ) ->
275+
{
276+
if ( throwable != null )
277+
{
278+
future.completeExceptionally( throwable );
279+
}
280+
else
281+
{
282+
future.complete( value );
283+
}
284+
};
285+
}
286+
272287
private static class CompletionResult<T>
273288
{
274289
T value;

0 commit comments

Comments
 (0)