@@ -145,12 +145,11 @@ else if ( commitStage != null )
145
145
}
146
146
else
147
147
{
148
- CompletionStage < Void > partialStage = resultCursors
148
+ commitStage = resultCursors
149
149
.retrieveNotConsumedError ()
150
- .thenCompose ( error -> doCommitAsync ( error ).handle ( handleCommitOrRollback ( error ) ) );
151
- CompletionStage <Void > resultStage = commitStage = partialStage .whenComplete ( ( ignore , error ) -> releaseConnection ( error ) );
152
- partialStage .whenComplete ( ( ignored , error ) -> updateStateAfterCommitOrRollback ( true , error ) );
153
- return resultStage ;
150
+ .thenCompose ( error -> doCommitAsync ( error ).handle ( handleCommitOrRollback ( error ) ) )
151
+ .whenComplete ( ( ignore , error ) -> handleTransactionCompletion ( true , error ) );
152
+ return commitStage ;
154
153
}
155
154
} );
156
155
}
@@ -173,12 +172,11 @@ else if ( rollbackStage != null )
173
172
}
174
173
else
175
174
{
176
- CompletionStage < Void > partialStage = resultCursors
175
+ rollbackStage = resultCursors
177
176
.retrieveNotConsumedError ()
178
- .thenCompose ( error -> doRollbackAsync ().handle ( handleCommitOrRollback ( error ) ) );
179
- CompletionStage <Void > resultStage = rollbackStage = partialStage .whenComplete ( ( ignore , error ) -> releaseConnection ( error ) );
180
- partialStage .whenComplete ( ( ignored , error ) -> updateStateAfterCommitOrRollback ( false , error ) );
181
- return resultStage ;
177
+ .thenCompose ( error -> doRollbackAsync ().handle ( handleCommitOrRollback ( error ) ) )
178
+ .whenComplete ( ( ignore , error ) -> handleTransactionCompletion ( false , error ) );
179
+ return rollbackStage ;
182
180
}
183
181
} );
184
182
}
@@ -294,8 +292,19 @@ private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable
294
292
};
295
293
}
296
294
297
- private void releaseConnection ( Throwable throwable )
295
+ private void handleTransactionCompletion ( boolean commitAttempt , Throwable throwable )
298
296
{
297
+ executeWithLock ( lock , () ->
298
+ {
299
+ if ( commitAttempt && throwable == null )
300
+ {
301
+ state = State .COMMITTED ;
302
+ }
303
+ else
304
+ {
305
+ state = State .ROLLED_BACK ;
306
+ }
307
+ } );
299
308
if ( throwable instanceof AuthorizationExpiredException )
300
309
{
301
310
connection .terminateAndRelease ( AuthorizationExpiredException .DESCRIPTION );
@@ -309,27 +318,4 @@ else if ( throwable instanceof ConnectionReadTimeoutException )
309
318
connection .release (); // release in background
310
319
}
311
320
}
312
-
313
- private void updateStateAfterCommitOrRollback ( boolean commitAttempt , Throwable throwable )
314
- {
315
- executeWithLock ( lock , () ->
316
- {
317
- if ( commitAttempt && throwable == null )
318
- {
319
- state = State .COMMITTED ;
320
- }
321
- else
322
- {
323
- state = State .ROLLED_BACK ;
324
- }
325
- if ( commitAttempt )
326
- {
327
- commitStage = null ;
328
- }
329
- else
330
- {
331
- rollbackStage = null ;
332
- }
333
- } );
334
- }
335
321
}
0 commit comments