@@ -517,6 +517,20 @@ public void registerProcess(BeforeTransactionCompletionProcess process) {
517
517
beforeTransactionProcesses .register ( process );
518
518
}
519
519
520
+ public void registerProcess (ReactiveAfterTransactionCompletionProcess process ) {
521
+ if ( afterTransactionProcesses == null ) {
522
+ afterTransactionProcesses = new AfterTransactionCompletionProcessQueue ( session );
523
+ }
524
+ afterTransactionProcesses .registerReactive ( process );
525
+ }
526
+
527
+ public void registerProcess (ReactiveBeforeTransactionCompletionProcess process ) {
528
+ if ( beforeTransactionProcesses == null ) {
529
+ beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue ( session );
530
+ }
531
+ beforeTransactionProcesses .registerReactive ( process );
532
+ }
533
+
520
534
/**
521
535
* Perform all currently queued entity-insertion actions.
522
536
*
@@ -576,25 +590,27 @@ private void prepareActions(ExecutableList<?> queue) throws HibernateException {
576
590
*
577
591
* @param success Was the transaction successful.
578
592
*/
579
- public void afterTransactionCompletion (boolean success ) {
593
+ public CompletionStage < Void > afterTransactionCompletion (boolean success ) {
580
594
if ( !isTransactionCoordinatorShared ) {
581
595
// Execute completion actions only in transaction owner (aka parent session).
582
596
if ( afterTransactionProcesses != null ) {
583
- afterTransactionProcesses .afterTransactionCompletion ( success );
597
+ return afterTransactionProcesses .afterTransactionCompletion ( success );
584
598
}
585
599
}
600
+ return CompletionStages .voidFuture ();
586
601
}
587
602
588
603
/**
589
604
* Execute any registered {@link org.hibernate.action.spi.BeforeTransactionCompletionProcess}
590
605
*/
591
- public void beforeTransactionCompletion () {
606
+ public CompletionStage < Void > beforeTransactionCompletion () {
592
607
if ( !isTransactionCoordinatorShared ) {
593
608
// Execute completion actions only in transaction owner (aka parent session).
594
609
if ( beforeTransactionProcesses != null ) {
595
- beforeTransactionProcesses .beforeTransactionCompletion ();
610
+ return beforeTransactionProcesses .beforeTransactionCompletion ();
596
611
}
597
612
}
613
+ return CompletionStages .voidFuture ();
598
614
}
599
615
600
616
/**
@@ -917,11 +933,12 @@ public void serialize(ObjectOutputStream oos) throws IOException {
917
933
}
918
934
}
919
935
920
- private abstract static class AbstractTransactionCompletionProcessQueue <T > {
936
+ private abstract static class AbstractTransactionCompletionProcessQueue <T , U > {
921
937
protected SessionImplementor session ;
922
938
// Concurrency handling required when transaction completion process is dynamically registered
923
939
// inside event listener (HHH-7478).
924
940
protected Queue <T > processes = new ConcurrentLinkedQueue <>();
941
+ protected Queue <U > reactiveProcesses = new ConcurrentLinkedQueue <>();
925
942
926
943
private AbstractTransactionCompletionProcessQueue (SessionImplementor session ) {
927
944
this .session = session ;
@@ -934,21 +951,29 @@ public void register(T process) {
934
951
processes .add ( process );
935
952
}
936
953
954
+ public void registerReactive (U process ) {
955
+ if ( process == null ) {
956
+ return ;
957
+ }
958
+ reactiveProcesses .add ( process );
959
+ }
960
+
937
961
public boolean hasActions () {
938
- return !processes .isEmpty ();
962
+ return !processes .isEmpty () && ! reactiveProcesses . isEmpty () ;
939
963
}
940
964
}
941
965
942
966
/**
943
967
* Encapsulates behavior needed for before transaction processing
944
968
*/
945
969
private static class BeforeTransactionCompletionProcessQueue
946
- extends AbstractTransactionCompletionProcessQueue <BeforeTransactionCompletionProcess > {
970
+ extends AbstractTransactionCompletionProcessQueue <BeforeTransactionCompletionProcess ,
971
+ ReactiveBeforeTransactionCompletionProcess > {
947
972
private BeforeTransactionCompletionProcessQueue (SessionImplementor session ) {
948
973
super ( session );
949
974
}
950
975
951
- public void beforeTransactionCompletion () {
976
+ public CompletionStage < Void > beforeTransactionCompletion () {
952
977
while ( !processes .isEmpty () ) {
953
978
try {
954
979
processes .poll ().doBeforeTransactionCompletion ( session );
@@ -960,15 +985,20 @@ public void beforeTransactionCompletion() {
960
985
throw new AssertionFailure ( "Unable to perform beforeTransactionCompletion callback" , e );
961
986
}
962
987
}
988
+ return CompletionStages .loop (
989
+ reactiveProcesses ,
990
+ process -> process .doBeforeTransactionCompletion ( session )
991
+ ).whenComplete ( (v , e ) -> reactiveProcesses .clear () );
963
992
}
964
993
}
965
994
966
995
/**
967
996
* Encapsulates behavior needed for after transaction processing
968
997
*/
969
998
private static class AfterTransactionCompletionProcessQueue
970
- extends AbstractTransactionCompletionProcessQueue <AfterTransactionCompletionProcess > {
971
- private Set <Serializable > querySpacesToInvalidate = new HashSet <>();
999
+ extends AbstractTransactionCompletionProcessQueue <AfterTransactionCompletionProcess ,
1000
+ ReactiveAfterTransactionCompletionProcess > {
1001
+ private final Set <Serializable > querySpacesToInvalidate = new HashSet <>();
972
1002
973
1003
private AfterTransactionCompletionProcessQueue (SessionImplementor session ) {
974
1004
super ( session );
@@ -978,7 +1008,7 @@ public void addSpaceToInvalidate(Serializable space) {
978
1008
querySpacesToInvalidate .add ( space );
979
1009
}
980
1010
981
- public void afterTransactionCompletion (boolean success ) {
1011
+ public CompletionStage < Void > afterTransactionCompletion (boolean success ) {
982
1012
while ( !processes .isEmpty () ) {
983
1013
try {
984
1014
processes .poll ().doAfterTransactionCompletion ( success , session );
@@ -999,6 +1029,11 @@ public void afterTransactionCompletion(boolean success) {
999
1029
);
1000
1030
}
1001
1031
querySpacesToInvalidate .clear ();
1032
+
1033
+ return CompletionStages .loop (
1034
+ reactiveProcesses ,
1035
+ process -> process .doAfterTransactionCompletion ( success , session )
1036
+ ).whenComplete ( (v , e ) -> reactiveProcesses .clear () );
1002
1037
}
1003
1038
}
1004
1039
0 commit comments