36
36
import org .hibernate .reactive .query .sqm .spi .ReactiveSelectQueryPlan ;
37
37
import org .hibernate .reactive .sql .exec .internal .StandardReactiveSelectExecutor ;
38
38
import org .hibernate .reactive .sql .results .spi .ReactiveListResultsConsumer ;
39
+ import org .hibernate .reactive .sql .results .spi .ReactiveResultsConsumer ;
39
40
import org .hibernate .sql .ast .SqlAstTranslator ;
40
41
import org .hibernate .sql .ast .SqlAstTranslatorFactory ;
41
42
import org .hibernate .sql .ast .spi .FromClauseAccess ;
59
60
public class ConcreteSqmSelectReactiveQueryPlan <R > extends ConcreteSqmSelectQueryPlan <R >
60
61
implements ReactiveSelectQueryPlan <R > {
61
62
63
+ private final SqmInterpreter <Object , ReactiveResultsConsumer <Object , R >> executeQueryInterpreter ;
62
64
private final SqmInterpreter <List <R >, Void > listInterpreter ;
63
65
private final RowTransformer <R > rowTransformer ;
64
66
@@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
80
82
this .rowTransformer = determineRowTransformer ( sqm , resultType , tupleMetadata , queryOptions );
81
83
this .listInterpreter = (unused , executionContext , sqmInterpretation , jdbcParameterBindings ) ->
82
84
listInterpreter ( hql , domainParameterXref , executionContext , sqmInterpretation , jdbcParameterBindings , rowTransformer );
85
+ this .executeQueryInterpreter = (resultsConsumer , executionContext , sqmInterpretation , jdbcParameterBindings ) ->
86
+ executeQueryInterpreter ( hql , domainParameterXref , executionContext , sqmInterpretation , jdbcParameterBindings , rowTransformer , resultsConsumer );
83
87
}
84
88
85
89
private static <R > CompletionStage <List <R >> listInterpreter (
@@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
110
114
.whenComplete ( (rs , t ) -> domainParameterXref .clearExpansions () );
111
115
}
112
116
117
+ private static <R > CompletionStage <Object > executeQueryInterpreter (
118
+ String hql ,
119
+ DomainParameterXref domainParameterXref ,
120
+ DomainQueryExecutionContext executionContext ,
121
+ CacheableSqmInterpretation sqmInterpretation ,
122
+ JdbcParameterBindings jdbcParameterBindings ,
123
+ RowTransformer <R > rowTransformer ,
124
+ ReactiveResultsConsumer <Object , R > resultsConsumer ) {
125
+ final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor ) executionContext .getSession ();
126
+ final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation .getJdbcSelect ();
127
+ // I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
128
+ Supplier <SubselectFetch .RegistrationHandler > fetchHandlerSupplier = () -> SubselectFetch
129
+ .createRegistrationHandler ( session .getPersistenceContext ().getBatchFetchQueue (), sqmInterpretation .selectStatement , JdbcParametersList .empty (), jdbcParameterBindings );
130
+ return completedFuture ( fetchHandlerSupplier )
131
+ .thenApply ( Supplier ::get )
132
+ .thenCompose ( subSelectFetchKeyHandler -> session
133
+ .reactiveAutoFlushIfRequired ( jdbcSelect .getAffectedTableNames () )
134
+ .thenCompose ( required -> StandardReactiveSelectExecutor .INSTANCE
135
+ .executeQuery ( jdbcSelect ,
136
+ jdbcParameterBindings ,
137
+ ConcreteSqmSelectQueryPlan .listInterpreterExecutionContext ( hql , executionContext , jdbcSelect , subSelectFetchKeyHandler ),
138
+ rowTransformer ,
139
+ null ,
140
+ sql -> executionContext .getSession ()
141
+ .getJdbcCoordinator ()
142
+ .getStatementPreparer ()
143
+ .prepareQueryStatement ( sql , false , null ),
144
+ resultsConsumer
145
+ )
146
+ )
147
+ )
148
+ .whenComplete ( (rs , t ) -> domainParameterXref .clearExpansions () );
149
+ }
150
+
113
151
@ Override
114
152
public ScrollableResultsImplementor <R > performScroll (ScrollMode scrollMode , DomainQueryExecutionContext executionContext ) {
115
153
throw new UnsupportedOperationException ();
@@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
119
157
public CompletionStage <List <R >> reactivePerformList (DomainQueryExecutionContext executionContext ) {
120
158
return executionContext .getQueryOptions ().getEffectiveLimit ().getMaxRowsJpa () == 0
121
159
? completedFuture ( emptyList () )
122
- : withCacheableSqmInterpretation ( executionContext , listInterpreter );
160
+ : withCacheableSqmInterpretation ( executionContext , null , listInterpreter );
161
+ }
162
+
163
+ @ Override
164
+ public <T > CompletionStage <T > reactiveExecuteQuery (
165
+ DomainQueryExecutionContext executionContext ,
166
+ ReactiveResultsConsumer <T , R > resultsConsumer ) {
167
+ return withCacheableSqmInterpretation (
168
+ executionContext ,
169
+ resultsConsumer ,
170
+ (SqmInterpreter <T , ReactiveResultsConsumer <T , R >>) (SqmInterpreter ) executeQueryInterpreter
171
+ );
123
172
}
124
173
125
- private <T , X > CompletionStage <T > withCacheableSqmInterpretation (DomainQueryExecutionContext executionContext , SqmInterpreter <T , X > interpreter ) {
174
+ private <T , X > CompletionStage <T > withCacheableSqmInterpretation (DomainQueryExecutionContext executionContext , X context , SqmInterpreter <T , X > interpreter ) {
126
175
// NOTE : VERY IMPORTANT - intentional double-lock checking
127
176
// The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
128
177
// to protect access. However, synchronized is much simpler here. We will verify
@@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
162
211
jdbcParameterBindings = createJdbcParameterBindings ( localCopy , executionContext );
163
212
}
164
213
165
- return interpreter .interpret ( null , executionContext , localCopy , jdbcParameterBindings );
214
+ return interpreter .interpret ( context , executionContext , localCopy , jdbcParameterBindings );
166
215
}
167
216
168
217
private JdbcParameterBindings createJdbcParameterBindings (CacheableSqmInterpretation sqmInterpretation , DomainQueryExecutionContext executionContext ) {
0 commit comments