Skip to content

Commit 3ace16f

Browse files
DavideDyrodiere
authored andcommitted
[#1909] Run queries for schema creation using the pool
Before we were creating a connection and then ignoring it for each query required to update the schema or collect metatada. Now the method for running queries outside the "current" transaction is in the SqlClientPool.
1 parent 3417888 commit 3ace16f

File tree

7 files changed

+130
-67
lines changed

7 files changed

+130
-67
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java

-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.List;
1212
import java.util.concurrent.CompletionStage;
1313

14-
1514
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1615

1716
import io.vertx.sqlclient.spi.DatabaseMetadata;
@@ -192,11 +191,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
192191
: delegate.selectJdbc( sql, paramValues );
193192
}
194193

195-
@Override
196-
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
197-
return delegate.selectJdbcOutsideTransaction( sql, paramValues );
198-
}
199-
200194
public <T> CompletionStage<T> selectIdentifier(String sql, Object[] paramValues, Class<T> idClass) {
201195
// Do not want to execute the batch here
202196
// because we want to be able to select

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java

-17
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,6 @@ interface Expectation {
6060

6161
CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues);
6262

63-
/**
64-
* This method is intended to be used only for queries returning
65-
* a ResultSet that must be executed outside of any "current"
66-
* transaction (i.e with autocommit=true).
67-
* <p/>
68-
* For example, it would be appropriate to use this method when
69-
* performing queries on information_schema or system tables in
70-
* order to obtain metadata information about catalogs, schemas,
71-
* tables, etc.
72-
*
73-
* @param sql - the query to execute outside of a transaction
74-
* @param paramValues - a non-null array of parameter values
75-
*
76-
* @return the CompletionStage<ResultSet> from executing the query.
77-
*/
78-
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);
79-
8063
<T> CompletionStage<T> insertAndSelectIdentifier(String sql, Object[] paramValues, Class<T> idClass, String idColumnName);
8164
CompletionStage<ResultSet> insertAndSelectIdentifierAsResultSet(String sql, Object[] paramValues, Class<?> idClass, String idColumnName);
8265

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java

+4
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
1111
import org.hibernate.service.Service;
1212

13+
import java.sql.ResultSet;
1314
import java.util.concurrent.CompletionStage;
1415

16+
1517
/**
1618
* A Hibernate {@link Service} that provides access to pooled
1719
* {@link ReactiveConnection reactive connections}.
@@ -63,6 +65,8 @@ public interface ReactiveConnectionPool extends Service {
6365
*/
6466
CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExceptionHelper sqlExceptionHelper);
6567

68+
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);
69+
6670
/**
6771
* The shutdown of the pool is actually asynchronous but the
6872
* core service registry won't return the {@link CompletionStage}.

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java

+48
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,26 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.util.Objects;
811
import java.util.concurrent.CompletionStage;
912

13+
import org.hibernate.engine.jdbc.internal.FormatStyle;
1014
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1115
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
16+
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1217
import org.hibernate.reactive.mutiny.Mutiny;
1318
import org.hibernate.reactive.stage.Stage;
1419
import org.hibernate.reactive.util.impl.CompletionStages;
1520

21+
import io.vertx.sqlclient.DatabaseException;
1622
import io.vertx.sqlclient.Pool;
23+
import io.vertx.sqlclient.Row;
24+
import io.vertx.sqlclient.RowSet;
25+
import io.vertx.sqlclient.Tuple;
26+
27+
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
1728

1829
/**
1930
* A pool of reactive connections backed by a Vert.x {@link Pool}.
@@ -82,4 +93,41 @@ public SqlExceptionHelper getSqlExceptionHelper() {
8293
public CompletionStage<Void> getCloseFuture() {
8394
return CompletionStages.voidFuture();
8495
}
96+
97+
98+
@Override
99+
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
100+
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
101+
.thenApply( ResultSetAdaptor::new );
102+
}
103+
104+
public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
105+
feedback( sql );
106+
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
107+
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
108+
}
109+
110+
/**
111+
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
112+
*/
113+
private <T> T convertException(T rows, String sql, Throwable sqlException) {
114+
if ( sqlException == null ) {
115+
return rows;
116+
}
117+
if ( sqlException instanceof DatabaseException ) {
118+
DatabaseException de = (DatabaseException) sqlException;
119+
sqlException = sqlExceptionHelper
120+
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
121+
}
122+
return rethrow( sqlException );
123+
}
124+
125+
private void feedback(String sql) {
126+
Objects.requireNonNull( sql, "SQL query cannot be null" );
127+
// DDL already gets formatted by the client, so don't reformat it
128+
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )
129+
? FormatStyle.BASIC
130+
: FormatStyle.NONE;
131+
sqlStatementLogger.logStatement( sql, formatStyle.getFormatter() );
132+
}
85133
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

-12
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
128128
.thenApply( ResultSetAdaptor::new );
129129
}
130130

131-
@Override
132-
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
133-
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
134-
.thenApply( ResultSetAdaptor::new );
135-
}
136-
137131
@Override
138132
public CompletionStage<Void> execute(String sql) {
139133
return preparedQuery( sql )
@@ -278,12 +272,6 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
278272
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
279273
}
280274

281-
public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
282-
feedback( sql );
283-
return pool.preparedQuery( sql ).execute( parameters ).toCompletionStage()
284-
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
285-
}
286-
287275
private void feedback(String sql) {
288276
Objects.requireNonNull( sql, "SQL query cannot be null" );
289277
// DDL already gets formatted by the client, so don't reformat it

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java

+61
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,29 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.util.Objects;
811
import java.util.concurrent.CompletableFuture;
912
import java.util.concurrent.CompletionStage;
1013
import java.util.function.Consumer;
1114

15+
import org.hibernate.engine.jdbc.internal.FormatStyle;
1216
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1317
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
18+
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1419
import org.hibernate.reactive.pool.ReactiveConnection;
1520
import org.hibernate.reactive.pool.ReactiveConnectionPool;
1621

1722
import io.vertx.core.Future;
23+
import io.vertx.sqlclient.DatabaseException;
1824
import io.vertx.sqlclient.Pool;
25+
import io.vertx.sqlclient.Row;
26+
import io.vertx.sqlclient.RowSet;
1927
import io.vertx.sqlclient.SqlConnection;
28+
import io.vertx.sqlclient.Tuple;
29+
30+
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
2031

2132
/**
2233
* A pool of reactive connections backed by a supplier of
@@ -99,6 +110,56 @@ private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, Sql
99110
);
100111
}
101112

113+
/**
114+
* This method is intended to be used only for queries returning
115+
* a ResultSet that must be executed outside any "current"
116+
* transaction (i.e. with autocommit=true).
117+
* <p/>
118+
* For example, it would be appropriate to use this method when
119+
* performing queries on information_schema or system tables in
120+
* order to obtain metadata information about catalogs, schemas,
121+
* tables, etc.
122+
*
123+
* @param sql - the query to execute outside a transaction
124+
* @param paramValues - a non-null array of parameter values
125+
*
126+
* @return the CompletionStage<ResultSet> from executing the query.
127+
*/
128+
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
129+
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
130+
.thenApply( ResultSetAdaptor::new );
131+
}
132+
133+
private CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
134+
feedback( sql );
135+
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
136+
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
137+
}
138+
139+
/**
140+
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
141+
*/
142+
private <T> T convertException(T rows, String sql, Throwable sqlException) {
143+
if ( sqlException == null ) {
144+
return rows;
145+
}
146+
if ( sqlException instanceof DatabaseException ) {
147+
DatabaseException de = (DatabaseException) sqlException;
148+
sqlException = getSqlExceptionHelper()
149+
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
150+
}
151+
return rethrow( sqlException );
152+
}
153+
154+
private void feedback(String sql) {
155+
Objects.requireNonNull( sql, "SQL query cannot be null" );
156+
// DDL already gets formatted by the client, so don't reformat it
157+
FormatStyle formatStyle = getSqlStatementLogger().isFormat() && !sql.contains( System.lineSeparator() )
158+
? FormatStyle.BASIC
159+
: FormatStyle.NONE;
160+
getSqlStatementLogger().logStatement( sql, formatStyle.getFormatter() );
161+
}
162+
102163
/**
103164
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
104165
*/

hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java

+17-32
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import java.util.Calendar;
3535
import java.util.Map;
3636
import java.util.Properties;
37-
import java.util.concurrent.CompletionStage;
3837
import java.util.concurrent.Executor;
3938

4039
import org.hibernate.boot.model.relational.SqlStringGenerationContext;
4140
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
42-
import org.hibernate.reactive.pool.ReactiveConnection;
4341
import org.hibernate.reactive.pool.ReactiveConnectionPool;
4442
import org.hibernate.reactive.pool.impl.Parameters;
4543
import org.hibernate.resource.transaction.spi.DdlTransactionIsolator;
@@ -48,11 +46,10 @@
4846
import org.hibernate.tool.schema.internal.exec.JdbcContext;
4947

5048
import static org.hibernate.reactive.util.impl.CompletionStages.logSqlException;
51-
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
5249

5350
public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {
5451

55-
private final ReactiveConnectionPool service;
52+
private final ReactiveConnectionPool connectionPool;
5653

5754
public ReactiveImprovedExtractionContextImpl(
5855
ServiceRegistry registry,
@@ -65,54 +62,42 @@ public ReactiveImprovedExtractionContextImpl(
6562
NoopDdlTransactionIsolator.INSTANCE,
6663
databaseObjectAccess
6764
);
68-
service = registry.getService( ReactiveConnectionPool.class );
65+
connectionPool = registry.getService( ReactiveConnectionPool.class );
6966
}
7067

7168
@Override
7269
public <T> T getQueryResults(
7370
String queryString,
7471
Object[] positionalParameters,
7572
ResultSetProcessor<T> resultSetProcessor) throws SQLException {
76-
77-
final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();
78-
79-
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
73+
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters )) {
8074
return resultSetProcessor.process( resultSet );
8175
}
82-
finally {
83-
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
84-
connectionStage
85-
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
86-
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );
87-
88-
}
89-
}
90-
91-
private static ReactiveConnection ignoreException(ReactiveConnection reactiveConnection, Throwable throwable) {
92-
return reactiveConnection;
93-
}
94-
95-
private static CompletionStage<Void> closeConnection(ReactiveConnection connection) {
96-
// Avoid NullPointerException if we couldn't create a connection
97-
return connection != null ? connection.close() : voidFuture();
9876
}
9977

10078
private ResultSet getQueryResultSet(
10179
String queryString,
102-
Object[] positionalParameters,
103-
CompletionStage<ReactiveConnection> connectionStage) {
80+
Object[] positionalParameters) {
10481
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
105-
final Parameters parametersDialectSpecific = Parameters.instance(
106-
getJdbcEnvironment().getDialect()
107-
);
82+
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
10883
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
109-
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
84+
return connectionPool
85+
// DDL needs to run outside the current transaction. For example:
86+
// - increment on a table-based id generator should happen outside the current tx.
87+
// - not all databases support transactional DDL
88+
.selectJdbcOutsideTransaction( queryToUse, parametersToUse )
11089
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
111-
.thenApply(ResultSetWorkaround::new)
90+
.thenApply( ResultSetWorkaround::new )
91+
// During schema migration, errors are ignored
92+
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
11293
.toCompletableFuture()
11394
.join();
11495
}
11596

97+
private static <T> T ignoreException(T result, Throwable throwable) {
98+
return result;
99+
}
100+
116101
private static class NoopDdlTransactionIsolator implements DdlTransactionIsolator {
117102
static final NoopDdlTransactionIsolator INSTANCE = new NoopDdlTransactionIsolator();
118103

0 commit comments

Comments
 (0)