Skip to content

Commit 0e99e99

Browse files
committed
support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockModes
These LockModes force a version check or upgrade right at the end of the transaction. This required building infrastructure for reactive before/after transaction completion events. Fixes #201
1 parent d11e49c commit 0e99e99

12 files changed

+379
-23
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java

+46-11
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,20 @@ public void registerProcess(BeforeTransactionCompletionProcess process) {
517517
beforeTransactionProcesses.register( process );
518518
}
519519

520+
public void registerProcess(ReactiveAfterTransactionCompletionProcess process) {
521+
if ( afterTransactionProcesses == null ) {
522+
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
523+
}
524+
afterTransactionProcesses.registerReactive( process );
525+
}
526+
527+
public void registerProcess(ReactiveBeforeTransactionCompletionProcess process) {
528+
if ( beforeTransactionProcesses == null ) {
529+
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
530+
}
531+
beforeTransactionProcesses.registerReactive( process );
532+
}
533+
520534
/**
521535
* Perform all currently queued entity-insertion actions.
522536
*
@@ -576,25 +590,27 @@ private void prepareActions(ExecutableList<?> queue) throws HibernateException {
576590
*
577591
* @param success Was the transaction successful.
578592
*/
579-
public void afterTransactionCompletion(boolean success) {
593+
public CompletionStage<Void> afterTransactionCompletion(boolean success) {
580594
if ( !isTransactionCoordinatorShared ) {
581595
// Execute completion actions only in transaction owner (aka parent session).
582596
if ( afterTransactionProcesses != null ) {
583-
afterTransactionProcesses.afterTransactionCompletion( success );
597+
return afterTransactionProcesses.afterTransactionCompletion( success );
584598
}
585599
}
600+
return CompletionStages.voidFuture();
586601
}
587602

588603
/**
589604
* Execute any registered {@link org.hibernate.action.spi.BeforeTransactionCompletionProcess}
590605
*/
591-
public void beforeTransactionCompletion() {
606+
public CompletionStage<Void> beforeTransactionCompletion() {
592607
if ( !isTransactionCoordinatorShared ) {
593608
// Execute completion actions only in transaction owner (aka parent session).
594609
if ( beforeTransactionProcesses != null ) {
595-
beforeTransactionProcesses.beforeTransactionCompletion();
610+
return beforeTransactionProcesses.beforeTransactionCompletion();
596611
}
597612
}
613+
return CompletionStages.voidFuture();
598614
}
599615

600616
/**
@@ -917,11 +933,12 @@ public void serialize(ObjectOutputStream oos) throws IOException {
917933
}
918934
}
919935

920-
private abstract static class AbstractTransactionCompletionProcessQueue<T> {
936+
private abstract static class AbstractTransactionCompletionProcessQueue<T,U> {
921937
protected SessionImplementor session;
922938
// Concurrency handling required when transaction completion process is dynamically registered
923939
// inside event listener (HHH-7478).
924940
protected Queue<T> processes = new ConcurrentLinkedQueue<>();
941+
protected Queue<U> reactiveProcesses = new ConcurrentLinkedQueue<>();
925942

926943
private AbstractTransactionCompletionProcessQueue(SessionImplementor session) {
927944
this.session = session;
@@ -934,21 +951,29 @@ public void register(T process) {
934951
processes.add( process );
935952
}
936953

954+
public void registerReactive(U process) {
955+
if ( process == null ) {
956+
return;
957+
}
958+
reactiveProcesses.add( process );
959+
}
960+
937961
public boolean hasActions() {
938-
return !processes.isEmpty();
962+
return !processes.isEmpty() && !reactiveProcesses.isEmpty();
939963
}
940964
}
941965

942966
/**
943967
* Encapsulates behavior needed for before transaction processing
944968
*/
945969
private static class BeforeTransactionCompletionProcessQueue
946-
extends AbstractTransactionCompletionProcessQueue<BeforeTransactionCompletionProcess> {
970+
extends AbstractTransactionCompletionProcessQueue<BeforeTransactionCompletionProcess,
971+
ReactiveBeforeTransactionCompletionProcess> {
947972
private BeforeTransactionCompletionProcessQueue(SessionImplementor session) {
948973
super( session );
949974
}
950975

951-
public void beforeTransactionCompletion() {
976+
public CompletionStage<Void> beforeTransactionCompletion() {
952977
while ( !processes.isEmpty() ) {
953978
try {
954979
processes.poll().doBeforeTransactionCompletion( session );
@@ -960,15 +985,20 @@ public void beforeTransactionCompletion() {
960985
throw new AssertionFailure( "Unable to perform beforeTransactionCompletion callback", e );
961986
}
962987
}
988+
return CompletionStages.loop(
989+
reactiveProcesses,
990+
process -> process.doBeforeTransactionCompletion( session )
991+
).whenComplete( (v, e) -> reactiveProcesses.clear() );
963992
}
964993
}
965994

966995
/**
967996
* Encapsulates behavior needed for after transaction processing
968997
*/
969998
private static class AfterTransactionCompletionProcessQueue
970-
extends AbstractTransactionCompletionProcessQueue<AfterTransactionCompletionProcess> {
971-
private Set<Serializable> querySpacesToInvalidate = new HashSet<>();
999+
extends AbstractTransactionCompletionProcessQueue<AfterTransactionCompletionProcess,
1000+
ReactiveAfterTransactionCompletionProcess> {
1001+
private final Set<Serializable> querySpacesToInvalidate = new HashSet<>();
9721002

9731003
private AfterTransactionCompletionProcessQueue(SessionImplementor session) {
9741004
super( session );
@@ -978,7 +1008,7 @@ public void addSpaceToInvalidate(Serializable space) {
9781008
querySpacesToInvalidate.add( space );
9791009
}
9801010

981-
public void afterTransactionCompletion(boolean success) {
1011+
public CompletionStage<Void> afterTransactionCompletion(boolean success) {
9821012
while ( !processes.isEmpty() ) {
9831013
try {
9841014
processes.poll().doAfterTransactionCompletion( success, session );
@@ -999,6 +1029,11 @@ public void afterTransactionCompletion(boolean success) {
9991029
);
10001030
}
10011031
querySpacesToInvalidate.clear();
1032+
1033+
return CompletionStages.loop(
1034+
reactiveProcesses,
1035+
process -> process.doAfterTransactionCompletion( success, session )
1036+
).whenComplete( (v, e) -> reactiveProcesses.clear() );
10021037
}
10031038
}
10041039

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: LGPL-2.1-or-later
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine;
7+
8+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
9+
10+
import java.util.concurrent.CompletionStage;
11+
12+
/**
13+
* Contract representing some process that needs to occur during after transaction completion.
14+
*
15+
* @author Steve Ebersole
16+
*/
17+
public interface ReactiveAfterTransactionCompletionProcess {
18+
/**
19+
* Perform whatever processing is encapsulated here after completion of the transaction.
20+
*
21+
* @param success Did the transaction complete successfully? True means it did.
22+
* @param session The session on which the transaction is completing.
23+
*/
24+
CompletionStage<Void> doAfterTransactionCompletion(boolean success, SharedSessionContractImplementor session);
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: LGPL-2.1-or-later
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine;
7+
8+
import org.hibernate.engine.spi.SessionImplementor;
9+
10+
import java.util.concurrent.CompletionStage;
11+
12+
/**
13+
* Contract representing some process that needs to occur during before transaction completion.
14+
*
15+
* @author Steve Ebersole
16+
*/
17+
public interface ReactiveBeforeTransactionCompletionProcess {
18+
/**
19+
* Perform whatever processing is encapsulated here before completion of the transaction.
20+
*
21+
* @param session The session on which the transaction is preparing to complete.
22+
*/
23+
CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session);
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: LGPL-2.1-or-later
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine.impl;
7+
8+
import org.hibernate.LockMode;
9+
import org.hibernate.LockOptions;
10+
import org.hibernate.engine.spi.EntityEntry;
11+
import org.hibernate.engine.spi.SessionImplementor;
12+
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
13+
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
14+
import org.hibernate.reactive.util.impl.CompletionStages;
15+
16+
import java.util.concurrent.CompletionStage;
17+
18+
/**
19+
* A BeforeTransactionCompletionProcess impl to verify and increment an entity version as party
20+
* of before-transaction-completion processing
21+
*
22+
* @author Scott Marlow
23+
* @author Gavin King
24+
*/
25+
public class ReactiveEntityIncrementVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
26+
private final Object object;
27+
28+
/**
29+
* Constructs an EntityIncrementVersionProcess for the given entity.
30+
*
31+
* @param object The entity instance
32+
*/
33+
public ReactiveEntityIncrementVersionProcess(Object object) {
34+
this.object = object;
35+
}
36+
37+
/**
38+
* Perform whatever processing is encapsulated here before completion of the transaction.
39+
*
40+
* @param session The session on which the transaction is preparing to complete.
41+
*/
42+
@Override
43+
public CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session) {
44+
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
45+
// Don't increment version for an entity that is not in the PersistenceContext;
46+
if ( entry == null ) {
47+
return CompletionStages.voidFuture();
48+
}
49+
50+
return ( (ReactiveEntityPersister) entry.getPersister() )
51+
.lockReactive(
52+
entry.getId(),
53+
entry.getVersion(),
54+
object,
55+
new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT),
56+
session
57+
);
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: LGPL-2.1-or-later
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.engine.impl;
7+
8+
import org.hibernate.dialect.lock.OptimisticEntityLockException;
9+
import org.hibernate.engine.spi.EntityEntry;
10+
import org.hibernate.engine.spi.SessionImplementor;
11+
import org.hibernate.pretty.MessageHelper;
12+
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
13+
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
14+
import org.hibernate.reactive.util.impl.CompletionStages;
15+
16+
import java.util.concurrent.CompletionStage;
17+
18+
/**
19+
* A BeforeTransactionCompletionProcess impl to verify an entity version as part of
20+
* before-transaction-completion processing
21+
*
22+
* @author Scott Marlow
23+
* @author Gavin King
24+
*/
25+
public class ReactiveEntityVerifyVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
26+
private final Object object;
27+
28+
/**
29+
* Constructs an EntityVerifyVersionProcess
30+
*
31+
* @param object The entity instance
32+
*/
33+
public ReactiveEntityVerifyVersionProcess(Object object) {
34+
this.object = object;
35+
}
36+
37+
@Override
38+
public CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session) {
39+
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
40+
// Don't check version for an entity that is not in the PersistenceContext;
41+
if ( entry == null ) {
42+
return CompletionStages.voidFuture();
43+
}
44+
45+
return ( (ReactiveEntityPersister) entry.getPersister() )
46+
.reactiveGetCurrentVersion( entry.getId(), session )
47+
.thenAccept( latestVersion -> {
48+
if ( !entry.getVersion().equals( latestVersion ) ) {
49+
throw new OptimisticEntityLockException(
50+
object,
51+
"Newer version [" + latestVersion +
52+
"] of entity [" + MessageHelper.infoString( entry.getEntityName(), entry.getId() ) +
53+
"] found in database"
54+
);
55+
}
56+
} );
57+
}
58+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,15 @@ protected CompletionStage<Void> upgradeLock(Object object, EntityEntry entry,
164164
ck = null;
165165
}
166166

167-
return ((ReactiveEntityPersister) persister).lockReactive(
168-
entry.getId(),
169-
entry.getVersion(),
170-
object,
171-
lockOptions,
172-
source
173-
).thenAccept( v -> entry.setLockMode(requestedLockMode) )
167+
return ((ReactiveEntityPersister) persister)
168+
.lockReactive(
169+
entry.getId(),
170+
entry.getVersion(),
171+
object,
172+
lockOptions,
173+
source
174+
)
175+
.thenAccept( v -> entry.setLockMode(requestedLockMode) )
174176
.whenComplete( (r, e) -> {
175177
// the database now holds a lock + the object is flushed from the cache,
176178
// so release the soft lock

0 commit comments

Comments
 (0)