Skip to content

Commit e883454

Browse files
committed
Fix
1 parent a935f9f commit e883454

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/AbstractReactiveInformationSchemaBasedExtractorImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,14 @@ protected <T> T processTableResultSet(
233233
}
234234
sb.append( " ) " );
235235
}
236-
return getExtractionContext().getQueryResults( sb.toString(), parameterValues.toArray(), processor );
236+
ReactiveImprovedExtractionContextImpl extractionContext = (ReactiveImprovedExtractionContextImpl) getExtractionContext();
237+
try {
238+
return extractionContext.push()
239+
.getQueryResults( sb.toString(), parameterValues.toArray(), processor );
240+
}
241+
finally {
242+
extractionContext.pop();
243+
}
237244
}
238245

239246
@Override
@@ -288,7 +295,14 @@ protected <T> T processColumnsResultSet(
288295

289296
sb.append( " order by table_catalog, table_schema, table_name, column_name, ordinal_position" );
290297

291-
return getExtractionContext().getQueryResults( sb.toString(), parameterValues.toArray(), processor );
298+
ReactiveImprovedExtractionContextImpl extractionContext = (ReactiveImprovedExtractionContextImpl) getExtractionContext();
299+
try {
300+
return extractionContext.push()
301+
.getQueryResults( sb.toString(), parameterValues.toArray(), processor );
302+
}
303+
finally {
304+
extractionContext.pop();
305+
}
292306
}
293307

294308
/**

hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Properties;
3737
import java.util.concurrent.CompletionStage;
3838
import java.util.concurrent.Executor;
39+
import java.util.concurrent.atomic.AtomicInteger;
3940

4041
import org.hibernate.boot.model.relational.SqlStringGenerationContext;
4142
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
@@ -53,6 +54,8 @@
5354
public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {
5455

5556
private final ReactiveConnectionPool service;
57+
private final AtomicInteger level = new AtomicInteger( 0 );
58+
private CompletionStage<ReactiveConnection> connectionStage;
5659

5760
public ReactiveImprovedExtractionContextImpl(
5861
ServiceRegistry registry,
@@ -68,23 +71,35 @@ public ReactiveImprovedExtractionContextImpl(
6871
service = registry.getService( ReactiveConnectionPool.class );
6972
}
7073

71-
@Override
72-
public <T> T getQueryResults(
73-
String queryString,
74-
Object[] positionalParameters,
75-
ResultSetProcessor<T> resultSetProcessor) throws SQLException {
74+
public ReactiveImprovedExtractionContextImpl push() {
75+
int currentLevel = level.getAndIncrement();
76+
if ( currentLevel == 0 ) {
77+
connectionStage = service.getConnection();
78+
}
79+
return this;
80+
}
7681

77-
final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();
82+
public ReactiveImprovedExtractionContextImpl pop() {
83+
int currentLevel = level.decrementAndGet();
84+
if ( currentLevel == 0 && connectionStage != null ) {
85+
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
86+
connectionStage
87+
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
88+
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection )
89+
.toCompletableFuture()
90+
.join();
91+
}
92+
return this;
93+
}
7894

95+
@Override
96+
public <T> T getQueryResults( String queryString, Object[] positionalParameters, ResultSetProcessor<T> resultSetProcessor) throws SQLException {
97+
push();
7998
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
8099
return resultSetProcessor.process( resultSet );
81100
}
82101
finally {
83-
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
84-
connectionStage
85-
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
86-
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );
87-
102+
pop();
88103
}
89104
}
90105

@@ -102,13 +117,12 @@ private ResultSet getQueryResultSet(
102117
Object[] positionalParameters,
103118
CompletionStage<ReactiveConnection> connectionStage) {
104119
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
105-
final Parameters parametersDialectSpecific = Parameters.instance(
106-
getJdbcEnvironment().getDialect()
107-
);
120+
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
108121
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
109-
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
122+
return connectionStage
123+
.thenCompose( c -> c.selectJdbc( queryToUse, parametersToUse ) )
110124
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
111-
.thenApply(ResultSetWorkaround::new)
125+
.thenApply( ResultSetWorkaround::new )
112126
.toCompletableFuture()
113127
.join();
114128
}

0 commit comments

Comments
 (0)