Skip to content

Commit 55b12f6

Browse files
committed
[hibernate#1905] Reactive find with lock in Quarkus with reactive hibernate
1 parent c3b9c61 commit 55b12f6

15 files changed

+298
-25
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/engine/internal/ReactivePersistenceContextAdapter.java

+86
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,26 @@
1212

1313
import org.hibernate.HibernateException;
1414
import org.hibernate.collection.spi.PersistentCollection;
15+
import org.hibernate.engine.spi.EntityHolder;
1516
import org.hibernate.engine.spi.EntityKey;
1617
import org.hibernate.engine.spi.PersistenceContext;
1718
import org.hibernate.engine.spi.SessionImplementor;
1819
import org.hibernate.engine.spi.SharedSessionContractImplementor;
20+
import org.hibernate.event.service.spi.EventListenerGroup;
21+
import org.hibernate.event.spi.PostLoadEvent;
22+
import org.hibernate.event.spi.PostLoadEventListener;
1923
import org.hibernate.persister.entity.EntityPersister;
24+
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
2025
import org.hibernate.reactive.logging.impl.Log;
2126
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
2227
import org.hibernate.reactive.session.ReactiveSession;
28+
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingState;
2329

2430
import static java.lang.invoke.MethodHandles.lookup;
2531
import static org.hibernate.pretty.MessageHelper.infoString;
2632
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
2733
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
34+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
2835
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
2936

3037
/**
@@ -130,4 +137,83 @@ public Object removeEntity(EntityKey key) {
130137
}
131138
return result;
132139
}
140+
141+
@Override
142+
public void postLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> holderConsumer) {
143+
throw LOG.nonReactiveMethodCall( "reactivePostLoad(JdbcValuesSourceProcessingState, Consumer<EntityHolder>) )" );
144+
}
145+
146+
public CompletionStage<Void> reactivePostLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> holderConsumer) {
147+
final ReactiveCallbackImpl callback = (ReactiveCallbackImpl) processingState.getExecutionContext().getCallback();
148+
149+
if ( processingState.getLoadingEntityHolders() != null ) {
150+
final EventListenerGroup<PostLoadEventListener> listenerGroup =
151+
getSession().getFactory().getEventListenerGroups().eventListenerGroup_POST_LOAD;
152+
final PostLoadEvent postLoadEvent = processingState.getPostLoadEvent();
153+
return loop(
154+
processingState.getLoadingEntityHolders(), entityHolder ->
155+
processLoadedEntityHolder(
156+
entityHolder,
157+
listenerGroup,
158+
postLoadEvent,
159+
callback,
160+
holderConsumer
161+
)
162+
).thenAccept( unused -> processingState.getLoadingEntityHolders().clear() );
163+
}
164+
if ( processingState.getReloadedEntityHolders() != null ) {
165+
return loop(
166+
processingState.getLoadingEntityHolders(), entityHolder ->
167+
processLoadedEntityHolder(
168+
entityHolder,
169+
null,
170+
null,
171+
callback,
172+
holderConsumer
173+
)
174+
).thenAccept( unused -> processingState.getLoadingEntityHolders().clear() );
175+
}
176+
return voidFuture();
177+
}
178+
179+
private CompletionStage<Void> processLoadedEntityHolder(
180+
EntityHolder holder,
181+
EventListenerGroup<PostLoadEventListener> listenerGroup,
182+
PostLoadEvent postLoadEvent,
183+
ReactiveCallbackImpl callback,
184+
Consumer<EntityHolder> holderConsumer) {
185+
if ( holderConsumer != null ) {
186+
holderConsumer.accept( holder );
187+
}
188+
if ( holder.getEntity() == null ) {
189+
// It's possible that we tried to load an entity and found out it doesn't exist,
190+
// in which case we added an entry with a null proxy and entity.
191+
// Remove that empty entry on post load to avoid unwanted side effects
192+
getEntitiesByKey().remove( holder.getEntityKey() );
193+
}
194+
else {
195+
if ( postLoadEvent != null ) {
196+
postLoadEvent.reset();
197+
postLoadEvent.setEntity( holder.getEntity() )
198+
.setId( holder.getEntityKey().getIdentifier() )
199+
.setPersister( holder.getDescriptor() );
200+
listenerGroup.fireEventOnEachListener(
201+
postLoadEvent,
202+
PostLoadEventListener::onPostLoad
203+
);
204+
if ( callback != null ) {
205+
return callback.invokeReactiveLoadActions(
206+
holder.getEntity(),
207+
holder.getDescriptor(),
208+
getSession()
209+
).thenApply( v -> {
210+
holder.resetEntityInitialier();
211+
return v;
212+
} );
213+
}
214+
}
215+
216+
}
217+
return voidFuture();
218+
}
133219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine.impl;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.concurrent.CompletionStage;
11+
12+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
13+
import org.hibernate.loader.ast.spi.AfterLoadAction;
14+
import org.hibernate.metamodel.mapping.EntityMappingType;
15+
import org.hibernate.reactive.logging.impl.Log;
16+
import org.hibernate.sql.exec.spi.Callback;
17+
18+
import static java.lang.invoke.MethodHandles.lookup;
19+
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
20+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
21+
22+
/**
23+
* Reactive equivalent of {@link org.hibernate.sql.exec.internal.CallbackImpl}
24+
*/
25+
public class ReactiveCallbackImpl implements Callback {
26+
private static final Log LOG = make( Log.class, lookup() );
27+
28+
private final List<ReactiveAfterLoadAction> afterLoadActions;
29+
30+
public ReactiveCallbackImpl() {
31+
this.afterLoadActions = new ArrayList<>( 1 );
32+
}
33+
34+
@Override
35+
public void registerAfterLoadAction(AfterLoadAction afterLoadAction) {
36+
throw LOG.nonReactiveMethodCall( "registerReactiveAfterLoadAction(ReactiveCallbackImpl)" );
37+
}
38+
39+
public void registerReactiveAfterLoadAction(ReactiveAfterLoadAction afterLoadAction) {
40+
afterLoadActions.add( afterLoadAction );
41+
}
42+
43+
@Override
44+
public void invokeAfterLoadActions(
45+
Object entity,
46+
EntityMappingType entityMappingType,
47+
SharedSessionContractImplementor session) {
48+
throw LOG.nonReactiveMethodCall( "invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)" );
49+
}
50+
51+
public CompletionStage<Void> invokeReactiveLoadActions(
52+
Object entity,
53+
EntityMappingType entityMappingType,
54+
SharedSessionContractImplementor session) {
55+
for ( int i = 0; i < afterLoadActions.size(); i++ ) {
56+
afterLoadActions.get( i ).reactiveAfterLoad( entity, entityMappingType, session );
57+
}
58+
return voidFuture();
59+
}
60+
61+
@Override
62+
public boolean hasAfterLoadActions() {
63+
return !afterLoadActions.isEmpty();
64+
}
65+
66+
public interface ReactiveAfterLoadAction {
67+
/**
68+
* The action trigger - the {@code entity} is being loaded
69+
*/
70+
CompletionStage<Void> reactiveAfterLoad(Object entity, EntityMappingType entityMappingType, SharedSessionContractImplementor session);
71+
}
72+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.hibernate.reactive.event.impl;
77

8+
89
import org.hibernate.AssertionFailure;
910
import org.hibernate.engine.spi.EntityEntry;
1011
import org.hibernate.event.spi.EventSource;

hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveNaturalIdLoaderDelegate.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.hibernate.metamodel.mapping.NaturalIdMapping;
2828
import org.hibernate.query.internal.SimpleQueryOptions;
2929
import org.hibernate.query.spi.QueryOptions;
30+
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
3031
import org.hibernate.reactive.loader.ast.spi.ReactiveNaturalIdLoader;
3132
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
3233
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
@@ -38,7 +39,6 @@
3839
import org.hibernate.sql.ast.tree.select.QuerySpec;
3940
import org.hibernate.sql.ast.tree.select.SelectStatement;
4041
import org.hibernate.sql.exec.internal.BaseExecutionContext;
41-
import org.hibernate.sql.exec.internal.CallbackImpl;
4242
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
4343
import org.hibernate.sql.exec.spi.Callback;
4444
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
@@ -335,7 +335,7 @@ public NaturalIdLoaderWithOptionsExecutionContext(
335335
QueryOptions queryOptions) {
336336
super( session );
337337
this.queryOptions = queryOptions;
338-
callback = new CallbackImpl();
338+
callback = new ReactiveCallbackImpl();
339339
}
340340

341341
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleIdLoadPlan.java

+14-10
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@
2020
import org.hibernate.query.internal.SimpleQueryOptions;
2121
import org.hibernate.query.spi.QueryOptions;
2222
import org.hibernate.query.spi.QueryParameterBindings;
23+
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
2324
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
2425
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
2526
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
2627
import org.hibernate.sql.ast.tree.select.SelectStatement;
27-
import org.hibernate.sql.exec.internal.CallbackImpl;
2828
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
2929
import org.hibernate.sql.exec.spi.Callback;
3030
import org.hibernate.sql.exec.spi.ExecutionContext;
3131
import org.hibernate.sql.exec.spi.JdbcParameterBindings;
3232
import org.hibernate.sql.exec.spi.JdbcParametersList;
3333

34+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
35+
3436
public class ReactiveSingleIdLoadPlan<T> extends SingleIdLoadPlan<CompletionStage<T>> {
3537

3638
public ReactiveSingleIdLoadPlan(
@@ -61,7 +63,7 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
6163
}
6264
assert offset == getJdbcParameters().size();
6365
final QueryOptions queryOptions = new SimpleQueryOptions( getLockOptions(), readOnly );
64-
final Callback callback = new CallbackImpl();
66+
final ReactiveCallbackImpl callback = new ReactiveCallbackImpl();
6567
EntityMappingType loadable = (EntityMappingType) getLoadable();
6668
ExecutionContext executionContext = executionContext(
6769
restrictedValue,
@@ -74,17 +76,19 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
7476
// FIXME: Should we get this from jdbcServices.getSelectExecutor()?
7577
return StandardReactiveSelectExecutor.INSTANCE
7678
.list( getJdbcSelect(), jdbcParameterBindings, executionContext, getRowTransformer(), resultConsumer( singleResultExpected ) )
77-
.thenApply( this::extractEntity )
78-
.thenApply( entity -> {
79-
invokeAfterLoadActions( callback, session, entity );
80-
return (T) entity;
81-
} );
79+
.thenCompose( list -> {
80+
Object entity = extractEntity( list );
81+
return invokeAfterLoadActions( callback, session, entity )
82+
.thenApply( v -> (T) entity );
83+
}
84+
);
8285
}
8386

84-
private <G> void invokeAfterLoadActions(Callback callback, SharedSessionContractImplementor session, G entity) {
85-
if ( entity != null && getLoadable() != null) {
86-
callback.invokeAfterLoadActions( entity, (EntityMappingType) getLoadable(), session );
87+
private <G> CompletionStage<Void> invokeAfterLoadActions(ReactiveCallbackImpl callback, SharedSessionContractImplementor session, G entity) {
88+
if ( entity != null && getLoadable() != null ) {
89+
return callback.invokeReactiveLoadActions( entity, (EntityMappingType) getLoadable(), session );
8790
}
91+
return voidFuture();
8892
}
8993

9094
private Object extractEntity(List<?> list) {

hibernate-reactive-core/src/main/java/org/hibernate/reactive/loader/ast/internal/ReactiveSingleUniqueKeyEntityLoaderStandard.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import org.hibernate.metamodel.mapping.SingularAttributeMapping;
2525
import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping;
2626
import org.hibernate.query.spi.QueryOptions;
27+
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
2728
import org.hibernate.reactive.loader.ast.spi.ReactiveSingleUniqueKeyEntityLoader;
2829
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
2930
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
3031
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
3132
import org.hibernate.sql.ast.tree.select.SelectStatement;
3233
import org.hibernate.sql.exec.internal.BaseExecutionContext;
33-
import org.hibernate.sql.exec.internal.CallbackImpl;
3434
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
3535
import org.hibernate.sql.exec.spi.Callback;
3636
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
@@ -179,7 +179,7 @@ public SingleUKEntityLoaderExecutionContext(SharedSessionContractImplementor ses
179179
super( session );
180180
//Careful, readOnly is possibly null
181181
this.queryOptions = readOnly == null ? QueryOptions.NONE : readOnly ? QueryOptions.READ_ONLY : QueryOptions.READ_WRITE;
182-
callback = new CallbackImpl();
182+
callback = new ReactiveCallbackImpl();
183183
}
184184

185185
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
import org.hibernate.query.sqm.internal.SqmInterpretationsKey.InterpretationsKeySource;
3131
import org.hibernate.query.sqm.tree.SqmStatement;
3232
import org.hibernate.query.sqm.tree.select.SqmSelectStatement;
33+
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
3334
import org.hibernate.reactive.logging.impl.Log;
3435
import org.hibernate.reactive.logging.impl.LoggerFactory;
3536
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
3637
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
3738
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
3839
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
40+
import org.hibernate.sql.exec.spi.Callback;
3941
import org.hibernate.sql.results.internal.TupleMetadata;
4042

4143
import jakarta.persistence.NoResultException;
@@ -76,6 +78,8 @@ public class ReactiveAbstractSelectionQuery<R> {
7678
private final Function<List<R>, R> uniqueElement;
7779
private final InterpretationsKeySource interpretationsKeySource;
7880

81+
private Callback callback;
82+
7983
// I'm sure we can avoid some of this by making some methods public in ORM,
8084
// but this allows me to prototype faster. We can refactor the code later.
8185
public ReactiveAbstractSelectionQuery(
@@ -363,4 +367,11 @@ public void enableFetchProfile(String profileName) {
363367
}
364368
fetchProfiles.add( profileName );
365369
}
370+
371+
public Callback getCallback() {
372+
if ( callback == null ) {
373+
callback = new ReactiveCallbackImpl();
374+
}
375+
return callback;
376+
}
366377
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
4747
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
4848
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
49+
import org.hibernate.sql.exec.spi.Callback;
4950
import org.hibernate.type.BasicTypeReference;
5051

5152
import jakarta.persistence.AttributeConverter;
@@ -192,6 +193,11 @@ public R getSingleResultOrNull() {
192193
return selectionQueryDelegate.getSingleResultOrNull();
193194
}
194195

196+
@Override
197+
public Callback getCallback() {
198+
return selectionQueryDelegate.getCallback();
199+
}
200+
195201
@Override
196202
public CompletionStage<R> getReactiveSingleResultOrNull() {
197203
return selectionQueryDelegate.getReactiveSingleResultOrNull();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveQuerySqmImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.hibernate.reactive.query.sqm.mutation.spi.ReactiveSqmMultiTableMutationStrategy;
6262
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
6363
import org.hibernate.reactive.session.ReactiveSqmQueryImplementor;
64+
import org.hibernate.sql.exec.spi.Callback;
6465
import org.hibernate.transform.ResultTransformer;
6566

6667
import jakarta.persistence.CacheRetrieveMode;
@@ -174,6 +175,11 @@ public R getSingleResultOrNull() {
174175
return selectionQueryDelegate.getSingleResultOrNull();
175176
}
176177

178+
@Override
179+
public Callback getCallback() {
180+
return selectionQueryDelegate.getCallback();
181+
}
182+
177183
@Override
178184
public R uniqueResult() {
179185
return selectionQueryDelegate.uniqueResult();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.hibernate.query.sqm.tree.select.SqmSelectStatement;
4040
import org.hibernate.reactive.query.spi.ReactiveAbstractSelectionQuery;
4141
import org.hibernate.reactive.query.sqm.ReactiveSqmSelectionQuery;
42+
import org.hibernate.sql.exec.spi.Callback;
4243

4344
import jakarta.persistence.CacheRetrieveMode;
4445
import jakarta.persistence.CacheStoreMode;
@@ -218,6 +219,11 @@ public CompletionStage<Long> getReactiveResultCount() {
218219
.getReactiveResultsCount( getSqmStatement().createCountQuery(), this );
219220
}
220221

222+
@Override
223+
public Callback getCallback() {
224+
return selectionQueryDelegate.getCallback();
225+
}
226+
221227
@Override
222228
public List<R> getResultList() {
223229
return selectionQueryDelegate.getResultList();

0 commit comments

Comments
 (0)