Skip to content

Reactive find with lock in Quarkus with reactive hibernate #2117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.engine.spi.Status;
import org.hibernate.event.service.spi.EventListenerGroup;
import org.hibernate.event.spi.PostLoadEvent;
import org.hibernate.event.spi.PostLoadEventListener;
import org.hibernate.persister.collection.CollectionPersister;
import org.hibernate.persister.entity.EntityPersister;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.results.graph.entity.EntityInitializer;
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingState;
import org.hibernate.sql.results.spi.LoadContexts;

import static java.lang.invoke.MethodHandles.lookup;
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

/**
Expand Down Expand Up @@ -456,7 +462,7 @@

@Override
public void postLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> loadedConsumer) {
delegate.postLoad( processingState, loadedConsumer );
throw LOG.nonReactiveMethodCall( "reactivePostLoad(JdbcValuesSourceProcessingState, Consumer<EntityHolder>) )" );
}

@Internal
Expand Down Expand Up @@ -710,4 +716,76 @@
public NaturalIdResolutions getNaturalIdResolutions() {
return delegate.getNaturalIdResolutions();
}

/**
* Reactive version of {@link StatefulPersistenceContext#postLoad(JdbcValuesSourceProcessingState, Consumer)}
*
*/
public CompletionStage<Void> reactivePostLoad(JdbcValuesSourceProcessingState processingState, Consumer<EntityHolder> holderConsumer) {
final ReactiveCallbackImpl callback = (ReactiveCallbackImpl) processingState.getExecutionContext().getCallback();

if ( processingState.getLoadingEntityHolders() != null ) {
final EventListenerGroup<PostLoadEventListener> listenerGroup =
getSession().getFactory().getEventListenerGroups().eventListenerGroup_POST_LOAD;
final PostLoadEvent postLoadEvent = processingState.getPostLoadEvent();
return loop(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Hibernate ORM the code looks like this:

if ( processingState.getLoadingEntityHolders() != null ) {
	final EventListenerGroup<PostLoadEventListener> listenerGroup =
			getSession().getFactory().getEventListenerGroups().eventListenerGroup_POST_LOAD;
	final PostLoadEvent postLoadEvent = processingState.getPostLoadEvent();
	for ( final EntityHolder holder : processingState.getLoadingEntityHolders() ) {
		processLoadedEntityHolder( holder, listenerGroup, postLoadEvent, callback, holderConsumer );
	}
	processingState.getLoadingEntityHolders().clear();
}
if ( processingState.getReloadedEntityHolders() != null ) {
	for ( final EntityHolder holder : processingState.getReloadedEntityHolders() ) {
		processLoadedEntityHolder( holder, null, null, callback, holderConsumer );
	}
	processingState.getReloadedEntityHolders().clear();
}

There is not a return at the end of the first if-then block. It means that the two conditions getLoadingEntityHolders() != null and processingState.getReloadedEntityHolders() != null could both be true and both blocks should be evaluated.

Did you add a return loop( ... because you understand what's going on or is this an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably it can be avoided

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

processingState.getLoadingEntityHolders(), entityHolder ->
processLoadedEntityHolder(
entityHolder,
listenerGroup,
postLoadEvent,
callback,
holderConsumer
))
.thenAccept( v -> processingState.getLoadingEntityHolders().clear() );
}
if ( processingState.getReloadedEntityHolders() != null ) {
return loop(
processingState.getLoadingEntityHolders(), entityHolder ->
processLoadedEntityHolder(
entityHolder,
null,
null,
callback,
holderConsumer
))
.thenAccept( v -> processingState.getLoadingEntityHolders().clear() );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it should be using getReloadedEntityHolders

}
return voidFuture();
}

/**
* Reactive version of {@link StatefulPersistenceContext#processLoadedEntityHolder(EntityHolder, EventListenerGroup, PostLoadEvent, Callback, Consumer)}
*/
private CompletionStage<Void> processLoadedEntityHolder(
EntityHolder holder,
EventListenerGroup<PostLoadEventListener> listenerGroup,
PostLoadEvent postLoadEvent,
ReactiveCallbackImpl callback,
Consumer<EntityHolder> holderConsumer) {
if ( holderConsumer != null ) {
holderConsumer.accept( holder );
}
if ( holder.getEntity() == null ) {
// It's possible that we tried to load an entity and found out it doesn't exist,
// in which case we added an entry with a null proxy and entity.
// Remove that empty entry on post load to avoid unwanted side effects
getEntitiesByKey().remove( holder.getEntityKey() );
}
else {
if ( postLoadEvent != null ) {
postLoadEvent.reset();
postLoadEvent.setEntity( holder.getEntity() )
.setId( holder.getEntityKey().getIdentifier() )
.setPersister( holder.getDescriptor() );
listenerGroup.fireEventOnEachListener( postLoadEvent, PostLoadEventListener::onPostLoad );
if ( callback != null ) {
return callback
.invokeReactiveLoadActions( holder.getEntity(), holder.getDescriptor(), getSession() )
.thenAccept( v -> holder.resetEntityInitialier() );
}
}
}
return voidFuture();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.loader.ast.spi.AfterLoadAction;
import org.hibernate.metamodel.mapping.EntityMappingType;
import org.hibernate.reactive.loader.ast.spi.ReactiveAfterLoadAction;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.sql.exec.spi.Callback;

import static java.lang.invoke.MethodHandles.lookup;
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
import static org.hibernate.reactive.util.impl.CompletionStages.loop;

/**
* Reactive equivalent of {@link org.hibernate.sql.exec.internal.CallbackImpl}
*/
public class ReactiveCallbackImpl implements Callback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove some code here if

  • ReactiveAfterLoadAction extends AfterLoadAction
  • ReactiveCallBackImpl extends CallbackImpl
  • We add a method getAfterLoadActions() in CallBackImpl

But, we can do it in a separate PR, otherwise we will have to wait for the next ORM release.

private static final Log LOG = make( Log.class, lookup() );

private final List<ReactiveAfterLoadAction> afterLoadActions;

public ReactiveCallbackImpl() {
this.afterLoadActions = new ArrayList<>( 1 );
}

@Override
public void registerAfterLoadAction(AfterLoadAction afterLoadAction) {
throw LOG.nonReactiveMethodCall( "registerReactiveAfterLoadAction(ReactiveCallbackImpl)" );
}

public void registerReactiveAfterLoadAction(ReactiveAfterLoadAction afterLoadAction) {
afterLoadActions.add( afterLoadAction );
}

@Override
public void invokeAfterLoadActions(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session) {
throw LOG.nonReactiveMethodCall( "invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)" );
}

/**
* Reactive version of {@link org.hibernate.sql.exec.internal.CallbackImpl#invokeAfterLoadActions(Object, EntityMappingType, SharedSessionContractImplementor)}
*/
public CompletionStage<Void> invokeReactiveLoadActions(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session) {
return loop(
afterLoadActions, afterLoadAction ->
afterLoadAction.reactiveAfterLoad( entity, entityMappingType, session )
);
}

@Override
public boolean hasAfterLoadActions() {
return !afterLoadActions.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hibernate.metamodel.mapping.NaturalIdMapping;
import org.hibernate.query.internal.SimpleQueryOptions;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.loader.ast.spi.ReactiveNaturalIdLoader;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
Expand All @@ -38,7 +39,6 @@
import org.hibernate.sql.ast.tree.select.QuerySpec;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.BaseExecutionContext;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
Expand Down Expand Up @@ -335,7 +335,7 @@ public NaturalIdLoaderWithOptionsExecutionContext(
QueryOptions queryOptions) {
super( session );
this.queryOptions = queryOptions;
callback = new CallbackImpl();
callback = new ReactiveCallbackImpl();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
import org.hibernate.query.internal.SimpleQueryOptions;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.query.spi.QueryParameterBindings;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.resource.jdbc.spi.LogicalConnectionImplementor;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.ExecutionContext;
import org.hibernate.sql.exec.spi.JdbcParameterBindings;
import org.hibernate.sql.exec.spi.JdbcParametersList;

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveSingleIdLoadPlan<T> extends SingleIdLoadPlan<CompletionStage<T>> {

public ReactiveSingleIdLoadPlan(
Expand Down Expand Up @@ -61,7 +63,7 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
}
assert offset == getJdbcParameters().size();
final QueryOptions queryOptions = new SimpleQueryOptions( getLockOptions(), readOnly );
final Callback callback = new CallbackImpl();
final ReactiveCallbackImpl callback = new ReactiveCallbackImpl();
EntityMappingType loadable = (EntityMappingType) getLoadable();
ExecutionContext executionContext = executionContext(
restrictedValue,
Expand All @@ -74,17 +76,19 @@ public CompletionStage<T> load(Object restrictedValue, Object entityInstance, Bo
// FIXME: Should we get this from jdbcServices.getSelectExecutor()?
return StandardReactiveSelectExecutor.INSTANCE
.list( getJdbcSelect(), jdbcParameterBindings, executionContext, getRowTransformer(), resultConsumer( singleResultExpected ) )
.thenApply( this::extractEntity )
.thenApply( entity -> {
invokeAfterLoadActions( callback, session, entity );
return (T) entity;
} );
.thenCompose( list -> {
Object entity = extractEntity( list );
return invokeAfterLoadActions( callback, session, entity )
.thenApply( v -> (T) entity );
}
);
}

private <T> void invokeAfterLoadActions(Callback callback, SharedSessionContractImplementor session, T entity) {
if ( entity != null && getLoadable() != null) {
callback.invokeAfterLoadActions( entity, (EntityMappingType) getLoadable(), session );
private <T> CompletionStage<Void> invokeAfterLoadActions(ReactiveCallbackImpl callback, SharedSessionContractImplementor session, T entity) {
if ( entity != null && getLoadable() != null ) {
return callback.invokeReactiveLoadActions( entity, (EntityMappingType) getLoadable(), session );
}
return voidFuture();
}

private Object extractEntity(List<?> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hibernate.metamodel.mapping.SingularAttributeMapping;
import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.loader.ast.spi.ReactiveSingleUniqueKeyEntityLoader;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.tree.select.SelectStatement;
import org.hibernate.sql.exec.internal.BaseExecutionContext;
import org.hibernate.sql.exec.internal.CallbackImpl;
import org.hibernate.sql.exec.internal.JdbcParameterBindingsImpl;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.exec.spi.JdbcOperationQuerySelect;
Expand Down Expand Up @@ -179,7 +179,7 @@ public SingleUKEntityLoaderExecutionContext(SharedSessionContractImplementor ses
super( session );
//Careful, readOnly is possibly null
this.queryOptions = readOnly == null ? QueryOptions.NONE : readOnly ? QueryOptions.READ_ONLY : QueryOptions.READ_WRITE;
callback = new CallbackImpl();
callback = new ReactiveCallbackImpl();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.loader.ast.spi;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.metamodel.mapping.EntityMappingType;

/**
* Reactive version of {@link org.hibernate.loader.ast.spi.AfterLoadAction}
*/
public interface ReactiveAfterLoadAction {
/**
* @see org.hibernate.loader.ast.spi.AfterLoadAction#afterLoad(Object, EntityMappingType, SharedSessionContractImplementor)
*
* The action trigger - the {@code entity} is being loaded
*/
CompletionStage<Void> reactiveAfterLoad(
Object entity,
EntityMappingType entityMappingType,
SharedSessionContractImplementor session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.hibernate.reactive.query.spi;

import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -30,12 +33,14 @@
import org.hibernate.query.sqm.internal.SqmInterpretationsKey.InterpretationsKeySource;
import org.hibernate.query.sqm.tree.SqmStatement;
import org.hibernate.query.sqm.tree.select.SqmSelectStatement;
import org.hibernate.reactive.engine.impl.ReactiveCallbackImpl;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.sql.results.internal.TupleMetadata;

import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -76,6 +81,8 @@ public class ReactiveAbstractSelectionQuery<R> {
private final Function<List<R>, R> uniqueElement;
private final InterpretationsKeySource interpretationsKeySource;

private Callback callback;

// I'm sure we can avoid some of this by making some methods public in ORM,
// but this allows me to prototype faster. We can refactor the code later.
public ReactiveAbstractSelectionQuery(
Expand Down Expand Up @@ -363,4 +370,11 @@ public void enableFetchProfile(String profileName) {
}
fetchProfiles.add( profileName );
}

public Callback getCallback() {
if ( callback == null ) {
callback = new ReactiveCallbackImpl();
}
return callback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.type.BasicTypeReference;

import jakarta.persistence.AttributeConverter;
Expand Down Expand Up @@ -192,6 +193,11 @@ public R getSingleResultOrNull() {
return selectionQueryDelegate.getSingleResultOrNull();
}

@Override
public Callback getCallback() {
return selectionQueryDelegate.getCallback();
}

@Override
public CompletionStage<R> getReactiveSingleResultOrNull() {
return selectionQueryDelegate.getReactiveSingleResultOrNull();
Expand Down
Loading
Loading