Skip to content

Commit 40d08dd

Browse files
committed
[hibernate#1436] Test behaviour on cancel signal
1 parent cb6c926 commit 40d08dd

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed

hibernate-reactive-core/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ dependencies {
3434
testImplementation "io.vertx:vertx-mssql-client:${vertxSqlClientVersion}"
3535
testImplementation "io.vertx:vertx-oracle-client:${vertxSqlClientVersion}"
3636

37+
// Metrics
38+
testImplementation "io.vertx:vertx-micrometer-metrics:${vertxSqlClientVersion}"
39+
3740
// Optional dependency of vertx-pg-client, essential when connecting via SASL SCRAM
3841
testImplementation 'com.ongres.scram:client:2.1'
3942

hibernate-reactive-core/src/test/java/org/hibernate/reactive/BaseReactiveTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@
3636
import org.junit.jupiter.api.extension.ExtendWith;
3737
import org.junit.jupiter.api.extension.RegisterExtension;
3838

39+
import io.micrometer.core.instrument.Metrics;
40+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3941
import io.smallrye.mutiny.Uni;
4042
import io.vertx.core.Promise;
4143
import io.vertx.core.VertxOptions;
4244
import io.vertx.junit5.RunTestOnContext;
4345
import io.vertx.junit5.Timeout;
4446
import io.vertx.junit5.VertxExtension;
4547
import io.vertx.junit5.VertxTestContext;
48+
import io.vertx.micrometer.MicrometerMetricsOptions;
4649
import jakarta.persistence.criteria.CriteriaQuery;
4750

4851
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -73,7 +76,9 @@ public abstract class BaseReactiveTest {
7376
static RunTestOnContext testOnContext = new RunTestOnContext( vertxOptions() );
7477

7578
private static VertxOptions vertxOptions() {
79+
Metrics.addRegistry( new SimpleMeterRegistry() );
7680
return new VertxOptions()
81+
.setMetricsOptions( new MicrometerMetricsOptions().setEnabled( true ) )
7782
.setBlockedThreadCheckInterval( 10 )
7883
.setBlockedThreadCheckIntervalUnit( MINUTES );
7984
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import java.util.Collection;
9+
import java.util.List;
10+
import java.util.Objects;
11+
import java.util.Queue;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ConcurrentLinkedQueue;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
import java.util.stream.IntStream;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
import org.jboss.logging.Logger;
22+
23+
import io.micrometer.core.instrument.Metrics;
24+
import io.smallrye.mutiny.subscription.Cancellable;
25+
import io.vertx.junit5.VertxTestContext;
26+
import jakarta.persistence.Entity;
27+
import jakarta.persistence.Id;
28+
import jakarta.persistence.Table;
29+
30+
import static java.util.Arrays.stream;
31+
import static java.util.concurrent.CompletableFuture.allOf;
32+
import static java.util.concurrent.CompletableFuture.runAsync;
33+
import static java.util.stream.Stream.concat;
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
public class CancelSignalTest extends BaseReactiveTest {
37+
private static final Logger LOG = Logger.getLogger( CancelSignalTest.class );
38+
39+
@Override
40+
protected Collection<Class<?>> annotatedEntities() {
41+
return List.of( GuineaPig.class );
42+
}
43+
44+
@Test
45+
public void cleanupConnectionWhenCancelSignal(VertxTestContext context) {
46+
// larger than 'sql pool size' to check entering the 'pool waiting queue'
47+
int executeSize = 10;
48+
CountDownLatch firstSessionWaiter = new CountDownLatch( 1 );
49+
Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>();
50+
51+
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize );
52+
// Create some jobs that are going to be cancelled asynchronously
53+
CompletableFuture[] withSessionFutures = IntStream
54+
.range( 0, executeSize )
55+
.mapToObj( i -> runAsync(
56+
() -> {
57+
CountDownLatch countDownLatch = new CountDownLatch( 1 );
58+
Cancellable cancellable = getMutinySessionFactory()
59+
.withSession( s -> {
60+
LOG.debug( "start withSession: " + i );
61+
sleep( 100 );
62+
firstSessionWaiter.countDown();
63+
return s.find( GuineaPig.class, 1 );
64+
} )
65+
.onTermination().invoke( () -> {
66+
countDownLatch.countDown();
67+
LOG.debug( "future " + i + " terminated" );
68+
} )
69+
.subscribe().with( item -> LOG.debug( "end withSession: " + i ) );
70+
cancellableQueue.add( cancellable );
71+
await( countDownLatch );
72+
},
73+
withSessionExecutor
74+
) )
75+
.toArray( CompletableFuture[]::new );
76+
77+
// Create jobs that are going to cancel the previous ones
78+
ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize );
79+
CompletableFuture[] cancelFutures = IntStream
80+
.range( 0, executeSize )
81+
.mapToObj( i -> runAsync(
82+
() -> {
83+
await( firstSessionWaiter );
84+
cancellableQueue.poll().cancel();
85+
sleep( 500 );
86+
},
87+
cancelExecutor
88+
) )
89+
.toArray( CompletableFuture[]::new );
90+
91+
CompletableFuture<Void> allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) )
92+
.toArray( CompletableFuture[]::new )
93+
);
94+
95+
// Test that there shouldn't be any pending process
96+
test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) );
97+
}
98+
99+
private static double sqlPendingMetric() {
100+
return Metrics.globalRegistry.find( "vertx.sql.processing.pending" )
101+
.gauge()
102+
.value();
103+
}
104+
105+
private static void await(CountDownLatch latch) {
106+
try {
107+
latch.await();
108+
}
109+
catch (InterruptedException e) {
110+
throw new RuntimeException( e );
111+
}
112+
}
113+
114+
private static void sleep(int millis) {
115+
try {
116+
// Add sleep to create a test that delays processing
117+
Thread.sleep( millis );
118+
}
119+
catch (InterruptedException e) {
120+
throw new RuntimeException( e );
121+
}
122+
}
123+
124+
@Entity(name = "GuineaPig")
125+
@Table(name = "Pig")
126+
public static class GuineaPig {
127+
@Id
128+
private Integer id;
129+
private String name;
130+
131+
public GuineaPig() {
132+
}
133+
134+
public GuineaPig(Integer id, String name) {
135+
this.id = id;
136+
this.name = name;
137+
}
138+
139+
public Integer getId() {
140+
return id;
141+
}
142+
143+
public void setId(Integer id) {
144+
this.id = id;
145+
}
146+
147+
public String getName() {
148+
return name;
149+
}
150+
151+
public void setName(String name) {
152+
this.name = name;
153+
}
154+
155+
@Override
156+
public String toString() {
157+
return id + ": " + name;
158+
}
159+
160+
@Override
161+
public boolean equals(Object o) {
162+
if ( this == o ) {
163+
return true;
164+
}
165+
if ( o == null || getClass() != o.getClass() ) {
166+
return false;
167+
}
168+
GuineaPig guineaPig = (GuineaPig) o;
169+
return Objects.equals( name, guineaPig.name );
170+
}
171+
172+
@Override
173+
public int hashCode() {
174+
return Objects.hash( name );
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)