36
36
import java .util .Properties ;
37
37
import java .util .concurrent .CompletionStage ;
38
38
import java .util .concurrent .Executor ;
39
+ import java .util .concurrent .atomic .AtomicInteger ;
39
40
40
41
import org .hibernate .boot .model .relational .SqlStringGenerationContext ;
41
42
import org .hibernate .engine .jdbc .env .spi .JdbcEnvironment ;
53
54
public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {
54
55
55
56
private final ReactiveConnectionPool service ;
57
+ private final AtomicInteger level = new AtomicInteger ( 0 );
58
+ private CompletionStage <ReactiveConnection > connectionStage ;
56
59
57
60
public ReactiveImprovedExtractionContextImpl (
58
61
ServiceRegistry registry ,
@@ -68,23 +71,35 @@ public ReactiveImprovedExtractionContextImpl(
68
71
service = registry .getService ( ReactiveConnectionPool .class );
69
72
}
70
73
71
- @ Override
72
- public <T > T getQueryResults (
73
- String queryString ,
74
- Object [] positionalParameters ,
75
- ResultSetProcessor <T > resultSetProcessor ) throws SQLException {
74
+ public ReactiveImprovedExtractionContextImpl push () {
75
+ int currentLevel = level .getAndIncrement ();
76
+ if ( currentLevel == 0 ) {
77
+ connectionStage = service .getConnection ();
78
+ }
79
+ return this ;
80
+ }
76
81
77
- final CompletionStage <ReactiveConnection > connectionStage = service .getConnection ();
82
+ public ReactiveImprovedExtractionContextImpl pop () {
83
+ int currentLevel = level .decrementAndGet ();
84
+ if ( currentLevel == 0 && connectionStage != null ) {
85
+ // This method doesn't return a reactive type, so we start closing the connection and ignore the result
86
+ connectionStage
87
+ .handle ( ReactiveImprovedExtractionContextImpl ::ignoreException )
88
+ .thenCompose ( ReactiveImprovedExtractionContextImpl ::closeConnection )
89
+ .toCompletableFuture ()
90
+ .join ();
91
+ }
92
+ return this ;
93
+ }
78
94
95
+ @ Override
96
+ public <T > T getQueryResults ( String queryString , Object [] positionalParameters , ResultSetProcessor <T > resultSetProcessor ) throws SQLException {
97
+ push ();
79
98
try (final ResultSet resultSet = getQueryResultSet ( queryString , positionalParameters , connectionStage )) {
80
99
return resultSetProcessor .process ( resultSet );
81
100
}
82
101
finally {
83
- // This method doesn't return a reactive type, so we start closing the connection and ignore the result
84
- connectionStage
85
- .handle ( ReactiveImprovedExtractionContextImpl ::ignoreException )
86
- .thenCompose ( ReactiveImprovedExtractionContextImpl ::closeConnection );
87
-
102
+ pop ();
88
103
}
89
104
}
90
105
@@ -102,13 +117,12 @@ private ResultSet getQueryResultSet(
102
117
Object [] positionalParameters ,
103
118
CompletionStage <ReactiveConnection > connectionStage ) {
104
119
final Object [] parametersToUse = positionalParameters != null ? positionalParameters : new Object [0 ];
105
- final Parameters parametersDialectSpecific = Parameters .instance (
106
- getJdbcEnvironment ().getDialect ()
107
- );
120
+ final Parameters parametersDialectSpecific = Parameters .instance ( getJdbcEnvironment ().getDialect () );
108
121
final String queryToUse = parametersDialectSpecific .process ( queryString , parametersToUse .length );
109
- return connectionStage .thenCompose ( c -> c .selectJdbcOutsideTransaction ( queryToUse , parametersToUse ) )
122
+ return connectionStage
123
+ .thenCompose ( c -> c .selectJdbcOutsideTransaction ( queryToUse , parametersToUse ) )
110
124
.whenComplete ( (resultSet , err ) -> logSqlException ( err , () -> "could not execute query " , queryToUse ) )
111
- .thenApply (ResultSetWorkaround ::new )
125
+ .thenApply ( ResultSetWorkaround ::new )
112
126
.toCompletableFuture ()
113
127
.join ();
114
128
}
0 commit comments