diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/boot/impl/ReactiveIntegrator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/boot/impl/ReactiveIntegrator.java index 56dc5891c..e85b9c26e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/boot/impl/ReactiveIntegrator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/boot/impl/ReactiveIntegrator.java @@ -49,6 +49,7 @@ private void attachEventContextManagingListenersIfRequired(SessionFactoryService eventListenerRegistry.getEventListenerGroup( EventType.MERGE ).appendListener( new DefaultReactiveMergeEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.DELETE ).appendListener( new DefaultReactiveDeleteEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.REFRESH ).appendListener( new DefaultReactiveRefreshEventListener() ); + eventListenerRegistry.getEventListenerGroup( EventType.LOCK ).appendListener( new DefaultReactiveLockEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.LOAD ).appendListener( new DefaultReactiveLoadEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.INIT_COLLECTION ).appendListener( new DefaultReactiveInitializeCollectionEventListener() ); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/CascadingActions.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/CascadingActions.java index e7429ced3..f8695cd88 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/CascadingActions.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/CascadingActions.java @@ -6,6 +6,7 @@ package org.hibernate.reactive.engine.impl; import org.hibernate.HibernateException; +import org.hibernate.LockMode; import org.hibernate.event.internal.MergeContext; import org.hibernate.event.spi.EventSource; import org.hibernate.internal.CoreMessageLogger; @@ -42,7 +43,7 @@ private CascadingActions() { public static final CascadingAction DELETE = new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.DELETE) { @Override - public CompletionStage cascade( + public CompletionStage cascade( EventSource session, Object child, String entityName, @@ -61,7 +62,7 @@ public CompletionStage cascade( public static final CascadingAction PERSIST = new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.PERSIST) { @Override - public CompletionStage cascade( + public CompletionStage cascade( EventSource session, Object child, String entityName, @@ -81,7 +82,7 @@ public CompletionStage cascade( public static final CascadingAction PERSIST_ON_FLUSH = new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.PERSIST_ON_FLUSH) { @Override - public CompletionStage cascade( + public CompletionStage cascade( EventSource session, Object child, String entityName, @@ -99,7 +100,7 @@ public CompletionStage cascade( public static final CascadingAction MERGE = new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.MERGE) { @Override - public CompletionStage cascade( + public CompletionStage cascade( EventSource session, Object child, String entityName, @@ -118,7 +119,7 @@ public CompletionStage cascade( public static final CascadingAction REFRESH = new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.REFRESH) { @Override - public CompletionStage cascade( + public CompletionStage cascade( EventSource session, Object child, String entityName, @@ -130,6 +131,24 @@ public CompletionStage cascade( } }; + /** + * @see org.hibernate.Session#lock(Object, org.hibernate.LockMode) + */ + public static final CascadingAction LOCK = + new BaseCascadingAction(org.hibernate.engine.spi.CascadingActions.LOCK) { + @Override + public CompletionStage cascade( + EventSource session, + Object child, + String entityName, + LockMode context, + boolean isCascadeDeleteEnabled) + throws HibernateException { + LOG.tracev("Cascading to lock: {0}", entityName); + return session.unwrap(ReactiveSession.class).reactiveLock(child, context); + } + }; + public abstract static class BaseCascadingAction implements CascadingAction { private final org.hibernate.engine.spi.CascadingAction delegate; diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/AbstractReactiveSaveEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/AbstractReactiveSaveEventListener.java index ad2978ebe..63d609f1f 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/AbstractReactiveSaveEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/AbstractReactiveSaveEventListener.java @@ -52,8 +52,7 @@ * @see DefaultReactivePersistOnFlushEventListener * @see DefaultReactiveMergeEventListener */ -abstract class AbstractReactiveSaveEventListener - implements CallbackRegistryConsumer { +abstract class AbstractReactiveSaveEventListener implements CallbackRegistryConsumer { private static final CoreMessageLogger LOG = CoreLogging.messageLogger( AbstractReactiveSaveEventListener.class ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveAutoFlushEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveAutoFlushEventListener.java index 396d1fadb..785b5b23b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveAutoFlushEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveAutoFlushEventListener.java @@ -24,7 +24,8 @@ import org.jboss.logging.Logger; -public class DefaultReactiveAutoFlushEventListener extends AbstractReactiveFlushingEventListener implements ReactiveAutoFlushEventListener, AutoFlushEventListener { +public class DefaultReactiveAutoFlushEventListener extends AbstractReactiveFlushingEventListener + implements ReactiveAutoFlushEventListener, AutoFlushEventListener { private static final CoreMessageLogger LOG = Logger.getMessageLogger( CoreMessageLogger.class, DefaultReactiveAutoFlushEventListener.class.getName() ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveDeleteEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveDeleteEventListener.java index f4748b0f9..36310b36b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveDeleteEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveDeleteEventListener.java @@ -49,6 +49,7 @@ */ public class DefaultReactiveDeleteEventListener implements DeleteEventListener, ReactiveDeleteEventListener, CallbackRegistryConsumer, JpaBootstrapSensitive { + private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveDeleteEventListener.class ); private CallbackRegistry callbackRegistry; diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveFlushEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveFlushEventListener.java index 4d8fcc050..4db5e5fd4 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveFlushEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveFlushEventListener.java @@ -19,7 +19,9 @@ /** * A reactific {@link org.hibernate.event.internal.DefaultFlushEventListener}. */ -public class DefaultReactiveFlushEventListener extends AbstractReactiveFlushingEventListener implements ReactiveFlushEventListener, FlushEventListener { +public class DefaultReactiveFlushEventListener extends AbstractReactiveFlushingEventListener + implements ReactiveFlushEventListener, FlushEventListener { + private static final CoreMessageLogger LOG = Logger.getMessageLogger( CoreMessageLogger.class, DefaultReactiveFlushEventListener.class.getName() diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java new file mode 100644 index 000000000..28de7f3bc --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java @@ -0,0 +1,192 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.event.impl; + +import org.hibernate.HibernateException; +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.ObjectDeletedException; +import org.hibernate.TransientObjectException; +import org.hibernate.cache.spi.access.EntityDataAccess; +import org.hibernate.cache.spi.access.SoftLock; +import org.hibernate.engine.internal.CascadePoint; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.PersistenceContext; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.engine.spi.Status; +import org.hibernate.event.internal.AbstractReassociateEventListener; +import org.hibernate.event.internal.DefaultLockEventListener; +import org.hibernate.event.spi.EventSource; +import org.hibernate.event.spi.LockEvent; +import org.hibernate.event.spi.LockEventListener; +import org.hibernate.internal.CoreMessageLogger; +import org.hibernate.persister.entity.EntityPersister; +import org.hibernate.pretty.MessageHelper; +import org.hibernate.reactive.engine.impl.Cascade; +import org.hibernate.reactive.engine.impl.CascadingActions; +import org.hibernate.reactive.engine.impl.ForeignKeys; +import org.hibernate.reactive.event.spi.ReactiveLockEventListener; +import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.jboss.logging.Logger; + +import java.io.Serializable; +import java.util.concurrent.CompletionStage; + +public class DefaultReactiveLockEventListener extends AbstractReassociateEventListener + implements LockEventListener, ReactiveLockEventListener { + + private static final CoreMessageLogger log = Logger.getMessageLogger( + CoreMessageLogger.class, + DefaultLockEventListener.class.getName() + ); + + @Override + public CompletionStage reactiveOnLock(LockEvent event) throws HibernateException { + if ( event.getObject() == null ) { + throw new NullPointerException( "attempted to lock null" ); + } + + if ( event.getLockMode() == LockMode.WRITE ) { + throw new HibernateException( "Invalid lock mode for lock()" ); + } + + if ( event.getLockMode() == LockMode.UPGRADE_SKIPLOCKED ) { + log.explicitSkipLockedLockCombo(); + } + + SessionImplementor source = event.getSession(); + final PersistenceContext persistenceContext = source.getPersistenceContextInternal(); + Object entity = persistenceContext.unproxyAndReassociate( event.getObject() ); + //TODO: if object was an uninitialized proxy, this is inefficient, + // resulting in two SQL selects + + EntityEntry entry = persistenceContext.getEntry(entity); + CompletionStage stage; + if (entry==null) { + final EntityPersister persister = source.getEntityPersister( event.getEntityName(), entity ); + final Serializable id = persister.getIdentifier( entity, source ); + stage = ForeignKeys.isNotTransient( event.getEntityName(), entity, Boolean.FALSE, source ) + .thenApply( + trans -> { + if (!trans) { + throw new TransientObjectException( + "cannot lock an unsaved transient instance: " + + persister.getEntityName() + ); + } + + EntityEntry e = reassociate(event, entity, id, persister); + cascadeOnLock(event, persister, entity); + return e; + } ); + + } + else { + stage = CompletionStages.completedFuture(entry); + } + + return stage.thenCompose( e -> upgradeLock( entity, e, event.getLockOptions(), event.getSession() ) ); + } + + private void cascadeOnLock(LockEvent event, EntityPersister persister, Object entity) { + EventSource source = event.getSession(); + final PersistenceContext persistenceContext = source.getPersistenceContextInternal(); + persistenceContext.incrementCascadeLevel(); + try { + new Cascade( + CascadingActions.LOCK, + CascadePoint.AFTER_LOCK, + persister, + entity, + event.getLockOptions(), + source + ).cascade(); + } + finally { + persistenceContext.decrementCascadeLevel(); + } + } + + /** + * Performs a pessimistic lock upgrade on a given entity, if needed. + * + * @param object The entity for which to upgrade the lock. + * @param entry The entity's EntityEntry instance. + * @param lockOptions contains the requested lock mode. + * @param source The session which is the source of the event being processed. + */ + protected CompletionStage upgradeLock(Object object, EntityEntry entry, + LockOptions lockOptions, + EventSource source) { + + LockMode requestedLockMode = lockOptions.getLockMode(); + if ( requestedLockMode.greaterThan( entry.getLockMode() ) ) { + // The user requested a "greater" (i.e. more restrictive) form of + // pessimistic lock + + if ( entry.getStatus() != Status.MANAGED ) { + throw new ObjectDeletedException( + "attempted to lock a deleted instance", + entry.getId(), + entry.getPersister().getEntityName() + ); + } + + final EntityPersister persister = entry.getPersister(); + + if ( log.isTraceEnabled() ) { + log.tracev( + "Locking {0} in mode: {1}", + MessageHelper.infoString( persister, entry.getId(), source.getFactory() ), + requestedLockMode + ); + } + + final boolean cachingEnabled = persister.canWriteToCache(); + final SoftLock lock; + final Object ck; + if ( cachingEnabled ) { + EntityDataAccess cache = persister.getCacheAccessStrategy(); + ck = cache.generateCacheKey( + entry.getId(), + persister, + source.getFactory(), + source.getTenantIdentifier() + ); + lock = cache.lockItem( source, ck, entry.getVersion() ); + } + else { + lock = null; + ck = null; + } + + return ((ReactiveEntityPersister) persister).lockReactive( + entry.getId(), + entry.getVersion(), + object, + lockOptions, + source + ).thenAccept( v -> entry.setLockMode(requestedLockMode) ) + .whenComplete( (r, e) -> { + // the database now holds a lock + the object is flushed from the cache, + // so release the soft lock + if ( cachingEnabled ) { + persister.getCacheAccessStrategy().unlockItem( source, ck, lock ); + } + } ); + + } + else { + return CompletionStages.nullFuture(); + } + } + + @Override + public void onLock(LockEvent event) throws HibernateException { + throw new UnsupportedOperationException(); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveMergeEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveMergeEventListener.java index 870f7f9fb..638c8162e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveMergeEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveMergeEventListener.java @@ -35,7 +35,8 @@ /** * A reactific {@link org.hibernate.event.internal.DefaultMergeEventListener}. */ -public class DefaultReactiveMergeEventListener extends AbstractReactiveSaveEventListener implements ReactiveMergeEventListener, MergeEventListener { +public class DefaultReactiveMergeEventListener extends AbstractReactiveSaveEventListener + implements ReactiveMergeEventListener, MergeEventListener { private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveMergeEventListener.class ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePersistEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePersistEventListener.java index 584d3e10c..2c51be1de 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePersistEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePersistEventListener.java @@ -39,6 +39,7 @@ public class DefaultReactivePersistEventListener extends AbstractReactiveSaveEventListener implements PersistEventListener, ReactivePersistEventListener, CallbackRegistryConsumer { + private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactivePersistEventListener.class ); @Override diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveRefreshEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveRefreshEventListener.java index 7f6fa8f61..732802c63 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveRefreshEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveRefreshEventListener.java @@ -38,7 +38,9 @@ /** * A reactific {@link org.hibernate.event.internal.DefaultRefreshEventListener}. */ -public class DefaultReactiveRefreshEventListener implements RefreshEventListener, ReactiveRefreshEventListener { +public class DefaultReactiveRefreshEventListener + implements RefreshEventListener, ReactiveRefreshEventListener { + private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveRefreshEventListener.class ); public CompletionStage reactiveOnRefresh(RefreshEvent event) throws HibernateException { diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/spi/ReactiveLockEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/spi/ReactiveLockEventListener.java new file mode 100644 index 000000000..71f37683b --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/spi/ReactiveLockEventListener.java @@ -0,0 +1,27 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.event.spi; + +import org.hibernate.HibernateException; +import org.hibernate.event.spi.LockEvent; + +import java.io.Serializable; +import java.util.concurrent.CompletionStage; + +/** + * Defines the contract for handling of lock events generated from a session. + * + * @author Steve Ebersole + */ +public interface ReactiveLockEventListener extends Serializable { + + /** + * Handle the given lock event. + * + * @param event The lock event to be handled. + */ + CompletionStage reactiveOnLock(LockEvent event) throws HibernateException; +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java index 6929b8f8c..2f514e22d 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java @@ -334,6 +334,21 @@ interface Session extends AutoCloseable { */ Uni refresh(Object... entities); + /** + * Obtain the specified lock level upon the given object. For example, this + * may be used to perform a version check with {@link LockMode#READ}, or to + * upgrade to a pessimistic lock with {@link LockMode#PESSIMISTIC_WRITE}. + * This operation cascades to associated instances if the association is + * mapped with {@code cascade="lock"}. + * + * Note that the optimistic lock modes {@link LockMode#OPTIMISTIC} and + * {@link LockMode#OPTIMISTIC_FORCE_INCREMENT} are not currently supported. + * + * @param entity a persistent or transient instance + * @param lockMode the lock level + */ + Uni lock(Object entity, LockMode lockMode); + /** * Force this session to flush asynchronously. Must be called at the * end of a unit of work, before committing the transaction and closing diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java index c7e03e6f9..92526bef0 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java @@ -157,6 +157,12 @@ public Uni refresh(Object... entity) { .map( v-> this ); } + @Override + public Uni lock(Object entity, LockMode lockMode) { + return Uni.createFrom().completionStage( delegate.reactiveLock( entity, lockMode ) ) + .map( v -> this ); + } + @Override public Mutiny.Query createQuery(String jpql, Class resultType) { return new MutinyQueryImpl<>( delegate.createReactiveQuery( jpql, resultType ) ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java index 99823684c..87919aa50 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java @@ -7,8 +7,10 @@ import org.hibernate.HibernateException; import org.hibernate.JDBCException; +import org.hibernate.LockMode; import org.hibernate.LockOptions; import org.hibernate.Session; +import org.hibernate.StaleObjectStateException; import org.hibernate.dialect.Dialect; import org.hibernate.dialect.PostgreSQL81Dialect; import org.hibernate.engine.OptimisticLockStyle; @@ -25,15 +27,17 @@ import org.hibernate.jdbc.Expectations; import org.hibernate.persister.entity.AbstractEntityPersister; import org.hibernate.persister.entity.JoinedSubclassEntityPersister; +import org.hibernate.persister.entity.Lockable; import org.hibernate.persister.entity.MultiLoadOptions; import org.hibernate.persister.entity.OuterJoinLoadable; -import org.hibernate.pretty.MessageHelper; import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor; import org.hibernate.reactive.loader.entity.impl.ReactiveDynamicBatchingEntityLoaderBuilder; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.session.ReactiveSession; +import org.hibernate.reactive.sql.impl.Update; import org.hibernate.reactive.util.impl.CompletionStages; import org.hibernate.sql.Delete; +import org.hibernate.sql.SimpleSelect; import org.hibernate.tuple.InMemoryValueGenerationStrategy; import org.hibernate.type.Type; import org.jboss.logging.Logger; @@ -41,10 +45,14 @@ import java.io.Serializable; import java.sql.SQLException; import java.sql.Types; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import static org.hibernate.pretty.MessageHelper.infoString; import static org.hibernate.reactive.adaptor.impl.QueryParametersAdaptor.toParameterArray; +import static org.hibernate.reactive.sql.impl.Parameters.createDialectParameterGenerator; import static org.hibernate.reactive.sql.impl.Parameters.processParameters; /** @@ -65,7 +73,7 @@ * @see ReactiveUnionSubclassEntityPersister * @see ReactiveSingleTableEntityPersister */ -public interface ReactiveAbstractEntityPersister extends ReactiveEntityPersister, OuterJoinLoadable { +public interface ReactiveAbstractEntityPersister extends ReactiveEntityPersister, OuterJoinLoadable, Lockable { Logger log = Logger.getLogger( JoinedSubclassEntityPersister.class ); /** @@ -201,7 +209,7 @@ default CompletionStage insertReactive( } if ( log.isTraceEnabled() ) { - log.tracev( "Inserting entity: {0}", MessageHelper.infoString(delegate(), id, delegate().getFactory() ) ); + log.tracev( "Inserting entity: {0}", infoString(delegate(), id, delegate().getFactory() ) ); if ( j == 0 && delegate().isVersioned() ) { log.tracev( "Version: {0}", Versioning.getVersion( fields, delegate()) ); } @@ -260,7 +268,7 @@ default CompletionStage insertReactive( sql = processParameters( sql, session.getFactory().getJdbcServices().getDialect() ); if ( log.isTraceEnabled() ) { - log.tracev( "Inserting entity: {0}", MessageHelper.infoString(delegate()) ); + log.tracev( "Inserting entity: {0}", infoString(delegate()) ); if ( delegate().isVersioned() ) { log.tracev( "Version: {0}", Versioning.getVersion( fields, delegate()) ); } @@ -326,7 +334,7 @@ default CompletionStage deleteReactive( // } if ( log.isTraceEnabled() ) { - log.tracev( "Deleting entity: {0}", MessageHelper.infoString(delegate(), id, delegate().getFactory() ) ); + log.tracev( "Deleting entity: {0}", infoString(delegate(), id, delegate().getFactory() ) ); if ( useVersion ) { log.tracev( "Version: {0}", version ); } @@ -511,7 +519,7 @@ default CompletionStage updateReactive( final boolean useVersion = j == 0 && delegate().isVersioned(); if ( log.isTraceEnabled() ) { - log.tracev( "Updating entity: {0}", MessageHelper.infoString(delegate(), id, delegate().getFactory() ) ); + log.tracev( "Updating entity: {0}", infoString(delegate(), id, delegate().getFactory() ) ); if ( useVersion ) { log.tracev( "Existing version: {0} -> New version:{1}", oldVersion, fields[delegate().getVersionProperty()] ); } @@ -613,7 +621,7 @@ else if ( isAllOrDirtyOptimisticLocking() && oldFields != null ) { catch (SQLException e) { throw delegate().getFactory().getSQLExceptionHelper().convert( e, - "could not update: " + MessageHelper.infoString(delegate(), id, delegate().getFactory() ), + "could not update: " + infoString(delegate(), id, delegate().getFactory() ), sql ); } @@ -785,13 +793,151 @@ default String[] getSQLUpdateStrings(boolean byRowId, boolean lazy) { lazy ? delegate().getSQLLazyUpdateStrings() : delegate().getSQLUpdateStrings(); } + default String generateSelectLockString(LockOptions lockOptions) { + final SessionFactoryImplementor factory = getFactory(); + Dialect dialect = factory.getJdbcServices().getDialect(); + Supplier generator = createDialectParameterGenerator(dialect); + final SimpleSelect select = new SimpleSelect(dialect) + .setLockOptions( lockOptions ) + .setTableName( getRootTableName() ) + .addColumn( getRootTableIdentifierColumnNames()[0] ) + .addCondition( getRootTableIdentifierColumnNames(), "=" + generator.get() ); + if ( isVersioned() ) { + select.addCondition( getVersionColumnName(), "=" + generator.get() ); + } + if ( factory.getSessionFactoryOptions().isCommentsEnabled() ) { + select.setComment( lockOptions.getLockMode() + " lock " + getEntityName() ); + } + return select.toStatementString(); + } + + default String generateUpdateLockString(LockOptions lockOptions) { + final SessionFactoryImplementor factory = getFactory(); + Dialect dialect = factory.getJdbcServices().getDialect(); + final Update update = new Update(dialect); + update.setTableName( getRootTableName() ); + update.addPrimaryKeyColumns( getRootTableIdentifierColumnNames() ); + update.setVersionColumnName( getVersionColumnName() ); + update.addColumn( getVersionColumnName() ); + if ( factory.getSessionFactoryOptions().isCommentsEnabled() ) { + update.setComment( lockOptions.getLockMode() + " lock " + getEntityName() ); + } + return update.toStatementString(); + } + + @Override + default CompletionStage lockReactive( + Serializable id, + Object version, + Object object, + LockOptions lockOptions, + SharedSessionContractImplementor session) throws HibernateException { + + LockMode lockMode = lockOptions.getLockMode(); + + Object nextVersion = nextVersionForLock( lockMode, id, version, object, session ); + + String sql; + boolean writeLock; + switch (lockMode) { + case READ: + case PESSIMISTIC_READ: + case PESSIMISTIC_WRITE: + case UPGRADE_NOWAIT: + case UPGRADE_SKIPLOCKED: + case UPGRADE: + sql = generateSelectLockString( lockOptions ); + writeLock = false; + break; + case PESSIMISTIC_FORCE_INCREMENT: + case FORCE: + case WRITE: + sql = generateUpdateLockString( lockOptions ); + writeLock = true; + break; + case NONE: + return CompletionStages.nullFuture(); + default: + throw new IllegalArgumentException("lock mode not supported"); + } + + PreparedStatementAdaptor statement = new PreparedStatementAdaptor(); + try { + int offset = 1; + if ( writeLock ) { + getVersionType().nullSafeSet( statement, nextVersion, offset, session ); + offset++; + } + getIdentifierType().nullSafeSet( statement, id, offset, session ); + offset += getIdentifierType().getColumnSpan( getFactory() ); + if ( isVersioned() ) { + getVersionType().nullSafeSet( statement, version, offset, session ); + } + } + catch ( SQLException e) { + throw new HibernateException( e ); + } + Object[] parameters = statement.getParametersAsArray(); + + ReactiveConnection connection = getReactiveConnection( session ); + CompletionStage lock; + if (writeLock) { + lock = connection.update(sql, parameters).thenApply(affected -> affected > 0); + } + else { + lock = connection.select(sql, parameters).thenApply(Iterator::hasNext); + } + + return lock.thenAccept( found -> { + if (!found) { + throw new StaleObjectStateException( getEntityName(), id ); + } + } ).handle( (r ,e) -> { + if (e instanceof SQLException) { + throw session.getJdbcServices().getSqlExceptionHelper().convert( + (SQLException) e, + "could not lock: " + + infoString( this, id, session.getFactory() ), + sql + ); + } + return CompletionStages.returnOrRethrow(e, r); + } ); + } + + default Object nextVersionForLock(LockMode lockMode, Serializable id, Object version, Object entity, + SharedSessionContractImplementor session) { + if ( lockMode == LockMode.PESSIMISTIC_FORCE_INCREMENT ) { + if ( !isVersioned() ) { + throw new IllegalArgumentException("increment locks not supported for unversioned entity"); + } + + Object nextVersion = getVersionType().next( version, session); + + if ( log.isTraceEnabled() ) { + log.trace( + "Forcing version increment [" + infoString( this, id, getFactory() ) + "; " + + getVersionType().toLoggableString( version, getFactory() ) + " -> " + + getVersionType().toLoggableString( nextVersion, getFactory() ) + "]" + ); + } + + session.getPersistenceContextInternal().getEntry( entity ).forceLocked( entity, nextVersion ); + + return nextVersion; + } + else { + return version; + } + } + default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session) { return reactiveLoad( id, optionalObject, lockOptions, session, null ); } default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session, Boolean readOnly) { if ( log.isTraceEnabled() ) { - log.tracev( "Fetching entity: {0}", MessageHelper.infoString( this, id, getFactory() ) ); + log.tracev( "Fetching entity: {0}", infoString( this, id, getFactory() ) ); } return getAppropriateLoader( lockOptions, session ).load( id, optionalObject, session, lockOptions, readOnly ); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java index 0fff79e72..a234d2896 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java @@ -73,16 +73,35 @@ CompletionStage updateReactive( SharedSessionContractImplementor session) throws HibernateException; + /** + * Obtain a pessimistic lock without blocking + */ + CompletionStage lockReactive( + Serializable id, + Object version, + Object object, + LockOptions lockOptions, + SharedSessionContractImplementor session) + throws HibernateException; + CompletionStage> reactiveMultiLoad( Serializable[] ids, SessionImplementor session, MultiLoadOptions loadOptions); - CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session); + CompletionStage reactiveLoad(Serializable id, + Object optionalObject, + LockOptions lockOptions, + SharedSessionContractImplementor session); - CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session, Boolean readOnly); + CompletionStage reactiveLoad(Serializable id, + Object optionalObject, + LockOptions lockOptions, + SharedSessionContractImplementor session, + Boolean readOnly); - ReactiveUniqueEntityLoader getAppropriateLoader(LockOptions lockOptions, SharedSessionContractImplementor session); + ReactiveUniqueEntityLoader getAppropriateLoader(LockOptions lockOptions, + SharedSessionContractImplementor session); CompletionStage reactiveIsTransient(Object entity, SessionImplementor session); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java index f1fa3e1bf..0834b86bc 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java @@ -67,6 +67,8 @@ public interface ReactiveSession { CompletionStage reactiveRefresh(Object child, IdentitySet refreshedAlready); + CompletionStage reactiveLock(Object entity, LockMode lockMode); + ReactiveQuery createReactiveNativeQuery(String sqlString); ReactiveNativeQuery createReactiveNativeQuery(String sqlString, String resultSetMapping); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java index cadf3dc1e..2ee67ed51 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java @@ -35,6 +35,7 @@ import org.hibernate.event.spi.InitializeCollectionEvent; import org.hibernate.event.spi.LoadEvent; import org.hibernate.event.spi.LoadEventListener; +import org.hibernate.event.spi.LockEvent; import org.hibernate.event.spi.MergeEvent; import org.hibernate.event.spi.PersistEvent; import org.hibernate.event.spi.RefreshEvent; @@ -64,6 +65,7 @@ import org.hibernate.reactive.event.spi.ReactiveDeleteEventListener; import org.hibernate.reactive.event.spi.ReactiveFlushEventListener; import org.hibernate.reactive.event.spi.ReactiveLoadEventListener; +import org.hibernate.reactive.event.spi.ReactiveLockEventListener; import org.hibernate.reactive.event.spi.ReactiveMergeEventListener; import org.hibernate.reactive.event.spi.ReactivePersistEventListener; import org.hibernate.reactive.event.spi.ReactiveRefreshEventListener; @@ -827,6 +829,29 @@ else if (e != null) { }); } + @Override + public CompletionStage reactiveLock(Object object, LockMode lockMode) { + checkOpen(); + return fireLock( new LockEvent( object, lockMode, this ) ); + } + + private CompletionStage fireLock(LockEvent event) { + pulseTransactionCoordinator(); + + return fire( event, EventType.LOCK, (ReactiveLockEventListener l) -> l::reactiveOnLock ) + .handle( (v, e) -> { + delayedAfterCompletion(); + + if (e instanceof RuntimeException) { + throw getExceptionConverter().convert( (RuntimeException) e ); + } + else if (e != null) { + return CompletionStages.rethrow(e); + } + return v; + }); + } + @Override public CompletionStage reactiveGet( Class entityClass, diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java index cdd9cc8aa..5ed87bfcd 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java @@ -335,6 +335,21 @@ interface Session extends AutoCloseable { */ CompletionStage refresh(Object... entities); + /** + * Obtain the specified lock level upon the given object. For example, this + * may be used to perform a version check with {@link LockMode#READ}, or to + * upgrade to a pessimistic lock with {@link LockMode#PESSIMISTIC_WRITE}. + * This operation cascades to associated instances if the association is + * mapped with {@code cascade="lock"}. + * + * Note that the optimistic lock modes {@link LockMode#OPTIMISTIC} and + * {@link LockMode#OPTIMISTIC_FORCE_INCREMENT} are not currently supported. + * + * @param entity a persistent or transient instance + * @param lockMode the lock level + */ + CompletionStage lock(Object entity, LockMode lockMode); + /** * Force this session to flush asynchronously. Must be called at the * end of a unit of work, before committing the transaction and closing diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index 306d8a655..cf7a456b4 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -148,6 +148,11 @@ public CompletionStage refresh(Object... entity) { return applyToAll( e -> delegate.reactiveRefresh( e, LockMode.NONE ), entity ).thenApply( v -> this ); } + @Override + public CompletionStage lock(Object entity, LockMode lockMode) { + return delegate.reactiveLock( entity, lockMode ).thenApply( v -> this ); + } + @Override public Stage.Query createQuery(String jpql, Class resultType) { return new StageQueryImpl<>( delegate.createReactiveQuery( jpql, resultType ) ); diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index da9b674aa..bcbf7ad50 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -16,6 +16,7 @@ import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; +import javax.persistence.Version; import javax.persistence.metamodel.EntityType; import java.util.Objects; import java.util.concurrent.CompletionStage; @@ -146,6 +147,74 @@ public void reactiveFindRefreshWithLock(TestContext context) { ); } + @Test + public void reactiveFindThenUpgradeLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( pig -> session.lock(pig, LockMode.PESSIMISTIC_READ).thenApply( v -> pig ) ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_READ ); + } ) + ) + ); + } + + @Test + public void reactiveFindThenWriteLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( pig -> session.lock(pig, LockMode.PESSIMISTIC_WRITE).thenApply( v -> pig ) ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_WRITE ); + context.assertEquals( actualPig.version, 0 ); + } ) + ) + ); + } + + @Test + public void reactiveFindThenForceLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( pig -> session.lock(pig, LockMode.PESSIMISTIC_FORCE_INCREMENT).thenApply( v -> pig ) ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_FORCE_INCREMENT ); + context.assertEquals( actualPig.version, 1 ); + } ) + .thenCompose( v -> session.createQuery("select version from GuineaPig").getSingleResult() ) + .thenAccept( version -> context.assertEquals(1, version) ) + .thenAccept( v -> session.close() ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( pig -> session.lock(pig, LockMode.PESSIMISTIC_FORCE_INCREMENT).thenApply( v -> pig ) ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_FORCE_INCREMENT ); + context.assertEquals( actualPig.version, 2 ); + } ) + .thenCompose( v -> session.createQuery("select version from GuineaPig").getSingleResult() ) + .thenAccept( version -> context.assertEquals(2, version) ) + .thenAccept( v -> session.close() ) + ) + ); + } + @Test public void reactiveQueryWithLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); @@ -283,7 +352,7 @@ public void reactiveUpdate(TestContext context) { public void testMetamodel(TestContext context) { EntityType pig = getSessionFactory().getMetamodel().entity(GuineaPig.class); context.assertNotNull(pig); - context.assertEquals( 2, pig.getAttributes().size() ); + context.assertEquals( 3, pig.getAttributes().size() ); context.assertEquals( "GuineaPig", pig.getName() ); } @@ -299,6 +368,8 @@ public static class GuineaPig { @Id private Integer id; private String name; + @Version + private int version; public GuineaPig() { }