Skip to content

Commit 266fefe

Browse files
committed
fix
1 parent b2c0ca8 commit 266fefe

14 files changed

+149
-10
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyQueryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public int getMaxResults() {
4949

5050
@Override
5151
public Uni<Long> getResultCount() {
52-
throw new UnsupportedOperationException();
52+
return uni( delegate::getReactiveResultsCount );
5353
}
5454

5555
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySelectionQueryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public int getMaxResults() {
4747

4848
@Override
4949
public Uni<Long> getResultCount() {
50-
throw new UnsupportedOperationException();
50+
return uni( delegate::getReactiveResultsCount );
5151
}
5252

5353
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/ReactiveSelectionQuery.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ default CompletionStage<List<R>> getReactiveResultList() {
4949

5050
CompletionStage<R> getReactiveSingleResultOrNull();
5151

52+
CompletionStage<Long> getReactiveResultsCount();
53+
5254
CompletionStage<R> reactiveUnique();
5355

5456
CompletionStage<Optional<R>> reactiveUniqueResultOptional();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ private ReactiveSelectQueryPlan<R> buildAggregatedSelectQueryPlan(SqmSelectState
269269
return new AggregatedSelectReactiveQueryPlan<>( aggregatedQueryPlans );
270270
}
271271

272-
private <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
272+
public <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
273273
SqmSelectStatement<?> concreteSqmStatement,
274274
Class<T> resultType,
275275
QueryOptions queryOptions) {

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ public List<R> list() {
187187
return selectionQueryDelegate.list();
188188
}
189189

190+
@Override
191+
public CompletionStage<Long> getReactiveResultsCount() {
192+
throw new UnsupportedOperationException();
193+
}
194+
190195
@Override
191196
public CompletionStage<List<R>> reactiveList() {
192197
return selectionQueryDelegate.reactiveList();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ConcreteSqmSelectReactiveQueryPlan.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
3737
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
3838
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
39+
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
3940
import org.hibernate.sql.ast.SqlAstTranslator;
4041
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
4142
import org.hibernate.sql.ast.spi.FromClauseAccess;
@@ -59,6 +60,7 @@
5960
public class ConcreteSqmSelectReactiveQueryPlan<R> extends ConcreteSqmSelectQueryPlan<R>
6061
implements ReactiveSelectQueryPlan<R> {
6162

63+
private final SqmInterpreter<Object, ReactiveResultsConsumer<Object, R>> executeQueryInterpreter;
6264
private final SqmInterpreter<List<R>, Void> listInterpreter;
6365
private final RowTransformer<R> rowTransformer;
6466

@@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
8082
this.rowTransformer = determineRowTransformer( sqm, resultType, tupleMetadata, queryOptions );
8183
this.listInterpreter = (unused, executionContext, sqmInterpretation, jdbcParameterBindings) ->
8284
listInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer );
85+
this.executeQueryInterpreter = (resultsConsumer, executionContext, sqmInterpretation, jdbcParameterBindings) ->
86+
executeQueryInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer, resultsConsumer );
8387
}
8488

8589
private static <R> CompletionStage<List<R>> listInterpreter(
@@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
110114
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
111115
}
112116

117+
private static <R> CompletionStage<Object> executeQueryInterpreter(
118+
String hql,
119+
DomainParameterXref domainParameterXref,
120+
DomainQueryExecutionContext executionContext,
121+
CacheableSqmInterpretation sqmInterpretation,
122+
JdbcParameterBindings jdbcParameterBindings,
123+
RowTransformer<R> rowTransformer,
124+
ReactiveResultsConsumer<Object, R> resultsConsumer) {
125+
final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor) executionContext.getSession();
126+
final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation.getJdbcSelect();
127+
// I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
128+
Supplier<SubselectFetch.RegistrationHandler> fetchHandlerSupplier = () -> SubselectFetch
129+
.createRegistrationHandler( session.getPersistenceContext().getBatchFetchQueue(), sqmInterpretation.selectStatement, JdbcParametersList.empty(), jdbcParameterBindings );
130+
return completedFuture( fetchHandlerSupplier )
131+
.thenApply( Supplier::get )
132+
.thenCompose( subSelectFetchKeyHandler -> session
133+
.reactiveAutoFlushIfRequired( jdbcSelect.getAffectedTableNames() )
134+
.thenCompose( required -> StandardReactiveSelectExecutor.INSTANCE
135+
.executeQuery( jdbcSelect,
136+
jdbcParameterBindings,
137+
ConcreteSqmSelectQueryPlan.listInterpreterExecutionContext( hql, executionContext, jdbcSelect, subSelectFetchKeyHandler ),
138+
rowTransformer,
139+
null,
140+
sql -> executionContext.getSession()
141+
.getJdbcCoordinator()
142+
.getStatementPreparer()
143+
.prepareQueryStatement( sql, false, null ),
144+
resultsConsumer
145+
)
146+
)
147+
)
148+
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
149+
}
150+
113151
@Override
114152
public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, DomainQueryExecutionContext executionContext) {
115153
throw new UnsupportedOperationException();
@@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
119157
public CompletionStage<List<R>> reactivePerformList(DomainQueryExecutionContext executionContext) {
120158
return executionContext.getQueryOptions().getEffectiveLimit().getMaxRowsJpa() == 0
121159
? completedFuture( emptyList() )
122-
: withCacheableSqmInterpretation( executionContext, listInterpreter );
160+
: withCacheableSqmInterpretation( executionContext, null, listInterpreter );
161+
}
162+
163+
@Override
164+
public <T> CompletionStage<T> reactiveExecuteQuery(
165+
DomainQueryExecutionContext executionContext,
166+
ReactiveResultsConsumer<T, R> resultsConsumer) {
167+
return withCacheableSqmInterpretation(
168+
executionContext,
169+
resultsConsumer,
170+
(SqmInterpreter<T, ReactiveResultsConsumer<T, R>>) (SqmInterpreter) executeQueryInterpreter
171+
);
123172
}
124173

125-
private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, SqmInterpreter<T, X> interpreter) {
174+
private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, X context, SqmInterpreter<T, X> interpreter) {
126175
// NOTE : VERY IMPORTANT - intentional double-lock checking
127176
// The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
128177
// to protect access. However, synchronized is much simpler here. We will verify
@@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
162211
jdbcParameterBindings = createJdbcParameterBindings( localCopy, executionContext );
163212
}
164213

165-
return interpreter.interpret( null, executionContext, localCopy, jdbcParameterBindings );
214+
return interpreter.interpret( context, executionContext, localCopy, jdbcParameterBindings );
166215
}
167216

168217
private JdbcParameterBindings createJdbcParameterBindings(CacheableSqmInterpretation sqmInterpretation, DomainQueryExecutionContext executionContext) {

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java

+15
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.hibernate.query.criteria.internal.NamedCriteriaQueryMementoImpl;
4343
import org.hibernate.query.hql.internal.NamedHqlQueryMementoImpl;
4444
import org.hibernate.query.hql.internal.QuerySplitter;
45+
import org.hibernate.query.internal.DelegatingDomainQueryExecutionContext;
4546
import org.hibernate.query.spi.AbstractSelectionQuery;
4647
import org.hibernate.query.spi.DomainQueryExecutionContext;
4748
import org.hibernate.query.spi.HqlInterpretation;
@@ -62,6 +63,7 @@
6263
import org.hibernate.reactive.query.sqm.mutation.spi.ReactiveSqmMultiTableMutationStrategy;
6364
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
6465
import org.hibernate.reactive.session.ReactiveSqmQueryImplementor;
66+
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
6567
import org.hibernate.transform.ResultTransformer;
6668

6769
import jakarta.persistence.CacheRetrieveMode;
@@ -139,6 +141,19 @@ public CompletionStage<R> getReactiveSingleResult() {
139141
return selectionQueryDelegate.getReactiveSingleResult();
140142
}
141143

144+
@Override
145+
public CompletionStage<Long> getReactiveResultsCount() {
146+
final DelegatingDomainQueryExecutionContext context = new DelegatingDomainQueryExecutionContext( this ) {
147+
@Override
148+
public QueryOptions getQueryOptions() {
149+
return QueryOptions.NONE;
150+
}
151+
};
152+
return selectionQueryDelegate
153+
.buildConcreteSelectQueryPlan( ( (SqmSelectStatement<?>) getSqmStatement() ).createCountQuery(), Long.class, getQueryOptions() )
154+
.reactiveExecuteQuery( context, new ReactiveSingleResultConsumer<>() );
155+
}
156+
142157
@Override
143158
public CompletionStage<R> getReactiveSingleResultOrNull() {
144159
return selectionQueryDelegate.getReactiveSingleResultOrNull();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java

+14
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.hibernate.query.sqm.tree.select.SqmSelectStatement;
4040
import org.hibernate.reactive.query.spi.ReactiveAbstractSelectionQuery;
4141
import org.hibernate.reactive.query.sqm.ReactiveSqmSelectionQuery;
42+
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
4243

4344
import jakarta.persistence.CacheRetrieveMode;
4445
import jakarta.persistence.CacheStoreMode;
@@ -212,6 +213,19 @@ public R getSingleResultOrNull() {
212213
return selectionQueryDelegate.getSingleResultOrNull();
213214
}
214215

216+
@Override
217+
public CompletionStage<Long> getReactiveResultsCount() {
218+
final DelegatingDomainQueryExecutionContext context = new DelegatingDomainQueryExecutionContext(this) {
219+
@Override
220+
public QueryOptions getQueryOptions() {
221+
return QueryOptions.NONE;
222+
}
223+
};
224+
return selectionQueryDelegate
225+
.buildConcreteSelectQueryPlan( getSqmStatement().createCountQuery(), Long.class, getQueryOptions() )
226+
.reactiveExecuteQuery( context, new ReactiveSingleResultConsumer<>() );
227+
}
228+
215229
@Override
216230
public List<R> getResultList() {
217231
return selectionQueryDelegate.getResultList();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/spi/ReactiveSelectQueryPlan.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.hibernate.query.spi.ScrollableResultsImplementor;
1515
import org.hibernate.query.spi.SelectQueryPlan;
1616
import org.hibernate.reactive.logging.impl.Log;
17+
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
1718
import org.hibernate.sql.results.spi.ResultsConsumer;
1819

1920
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
@@ -44,7 +45,7 @@ default <T> T executeQuery(DomainQueryExecutionContext executionContext, Results
4445
/**
4546
* Execute the query
4647
*/
47-
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ResultsConsumer<T, R> resultsConsumer) {
48+
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ReactiveResultsConsumer<T, R> resultsConsumer) {
4849
return failedFuture( new UnsupportedOperationException() );
4950
}
5051

hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveInitializersList.java

+6
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public void finishUpRow(final RowProcessingState rowProcessingState) {
5656
}
5757
}
5858

59+
public void startLoading(final RowProcessingState rowProcessingState) {
60+
for ( int i = initializers.length - 1; i >= 0; i-- ) {
61+
initializers[i].startLoading( rowProcessingState );
62+
}
63+
}
64+
5965
public CompletionStage<Void> initializeInstance(final ReactiveRowProcessingState rowProcessingState) {
6066
return loop( initializers, initializer -> {
6167
if ( initializer instanceof ReactiveInitializer ) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.sql.results.spi;
7+
8+
import java.util.concurrent.CompletionStage;
9+
10+
import org.hibernate.Incubating;
11+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
12+
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
13+
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
14+
import org.hibernate.sql.results.jdbc.internal.JdbcValuesSourceProcessingStateStandardImpl;
15+
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingOptions;
16+
17+
@Incubating
18+
public class ReactiveSingleResultConsumer<T> implements ReactiveResultsConsumer<T, T> {
19+
20+
@Override
21+
public CompletionStage<T> consume(
22+
ReactiveValuesResultSet jdbcValues,
23+
SharedSessionContractImplementor session,
24+
JdbcValuesSourceProcessingOptions processingOptions,
25+
JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState,
26+
ReactiveRowProcessingState rowProcessingState,
27+
ReactiveRowReader<T> rowReader) {
28+
rowReader.getReactiveInitializersList().startLoading( rowProcessingState );
29+
return rowProcessingState.next()
30+
.thenCompose( hasNext -> rowReader
31+
.reactiveReadRow( rowProcessingState, processingOptions )
32+
.thenApply( result -> {
33+
rowProcessingState.finishRowProcessing( true );
34+
rowReader.finishUp( jdbcValuesSourceProcessingState );
35+
jdbcValuesSourceProcessingState.finishUp( false );
36+
return result;
37+
} )
38+
);
39+
}
40+
41+
@Override
42+
public boolean canResultsBeCached() {
43+
return false;
44+
}
45+
46+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageQueryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public int getMaxResults() {
4040

4141
@Override
4242
public CompletionStage<Long> getResultCount() {
43-
throw new UnsupportedOperationException();
43+
return delegate.getReactiveResultsCount();
4444
}
4545

4646
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSelectionQueryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public int getMaxResults() {
4343

4444
@Override
4545
public CompletionStage<Long> getResultCount() {
46-
throw new UnsupportedOperationException();
46+
return delegate.getReactiveResultsCount();
4747
}
4848

4949
@Override

hibernate-reactive-core/src/test/java/org/hibernate/reactive/QueryTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,8 @@ public void testQueryGetResultCountWithStage(VertxTestContext context) {
628628
.thenCompose( v -> getSessionFactory().withSession( s -> s
629629
.createQuery( "from Author", Author.class )
630630
.getResultCount() ) )
631-
.thenAccept( count -> assertEquals( 2L, count ) )
631+
.thenAccept( count ->
632+
assertEquals( 2L, count ) )
632633
.thenCompose( v -> getSessionFactory().withSession( s -> s
633634
.createQuery( "from Author", Author.class )
634635
.setMaxResults( 1 )

0 commit comments

Comments
 (0)