Skip to content

Commit a3781a4

Browse files
mp911dejhoeller
authored andcommitted
Align with R2DBC 0.9 API changes
Adopt to R2DBC Parameter type, deprecate our own one in favor of the R2DBC type. Add support for extensible transaction definitions and support to consume Readable and result segments. Return Long instead of Integer in DatabaseClient update in preparation for R2DBC 1.0 changes.
1 parent fd34533 commit a3781a4

20 files changed

+286
-177
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ configure([rootProject] + javaProjects) { project ->
346346
// "https://junit.org/junit5/docs/5.8.2/api/",
347347
"https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/",
348348
"https://javadoc.io/static/io.rsocket/rsocket-core/1.1.1/",
349-
"https://r2dbc.io/spec/0.8.5.RELEASE/api/",
349+
"https://r2dbc.io/spec/0.9.1.RELEASE/api/",
350350
// The external Javadoc link for JSR 305 must come last to ensure that types from
351351
// JSR 250 (such as @PostConstruct) are still supported. This is due to the fact
352352
// that JSR 250 and JSR 305 both define types in javax.annotation, which results

spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java

Lines changed: 91 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import io.r2dbc.spi.Connection;
2222
import io.r2dbc.spi.ConnectionFactory;
2323
import io.r2dbc.spi.IsolationLevel;
24+
import io.r2dbc.spi.Option;
2425
import io.r2dbc.spi.R2dbcException;
2526
import io.r2dbc.spi.Result;
27+
import org.reactivestreams.Publisher;
2628
import reactor.core.publisher.Mono;
2729

2830
import org.springframework.beans.factory.InitializingBean;
@@ -47,7 +49,7 @@
4749
* <p><b>Note: The {@code ConnectionFactory} that this transaction manager
4850
* operates on needs to return independent {@code Connection}s.</b>
4951
* The {@code Connection}s may come from a pool (the typical case), but the
50-
* {@code ConnectionFactory} must not return scoped scoped {@code Connection}s
52+
* {@code ConnectionFactory} must not return scoped {@code Connection}s
5153
* or the like. This transaction manager will associate {@code Connection}
5254
* with context-bound transactions itself, according to the specified propagation
5355
* behavior. It assumes that a separate, independent {@code Connection} can
@@ -72,6 +74,11 @@
7274
* synchronizations (if synchronization is generally active), assuming resources
7375
* operating on the underlying R2DBC {@code Connection}.
7476
*
77+
* <p>Spring's {@code TransactionDefinition} attributes are carried forward to R2DBC drivers
78+
* using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. Subclasses may
79+
* override {@link #createTransactionDefinition(TransactionDefinition)} to customize
80+
* transaction definitions for vendor-specific attributes.
81+
*
7582
* @author Mark Paluch
7683
* @since 5.3
7784
* @see ConnectionFactoryUtils#getConnection(ConnectionFactory)
@@ -203,7 +210,8 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
203210
}
204211

205212
return connectionMono.flatMap(con -> {
206-
return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction()))
213+
return prepareTransactionalConnection(con, definition, transaction)
214+
.then(Mono.from(doBegin(definition, con)))
207215
.doOnSuccess(v -> {
208216
txObject.getConnectionHolder().setTransactionActive(true);
209217
Duration timeout = determineTimeout(definition);
@@ -230,6 +238,31 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
230238
}).then();
231239
}
232240

241+
private Publisher<Void> doBegin(TransactionDefinition definition, Connection con) {
242+
io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition);
243+
if (logger.isDebugEnabled()) {
244+
logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]");
245+
}
246+
return con.beginTransaction(transactionDefinition);
247+
}
248+
249+
/**
250+
* Determine the transaction definition from our {@code TransactionDefinition}.
251+
* Can be overridden to wrap the R2DBC {@code TransactionDefinition} to adjust or
252+
* enhance transaction attributes.
253+
* @param definition the transaction definition
254+
* @return the actual transaction definition to use
255+
* @since 6.0
256+
* @see io.r2dbc.spi.TransactionDefinition
257+
*/
258+
protected io.r2dbc.spi.TransactionDefinition createTransactionDefinition(TransactionDefinition definition) {
259+
// Apply specific isolation level, if any.
260+
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
261+
return new ExtendedTransactionDefinition(definition.getName(), definition.isReadOnly(),
262+
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? isolationLevelToUse : null,
263+
determineTimeout(definition));
264+
}
265+
233266
/**
234267
* Determine the actual timeout to use for the given definition.
235268
* Will fall back to this manager's default timeout if the
@@ -375,21 +408,6 @@ protected Mono<Void> prepareTransactionalConnection(
375408
.then();
376409
}
377410

378-
// Apply specific isolation level, if any.
379-
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
380-
if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
381-
382-
if (logger.isDebugEnabled()) {
383-
logger.debug("Changing isolation level of R2DBC Connection [" + con + "] to " + isolationLevelToUse.asSql());
384-
}
385-
IsolationLevel currentIsolation = con.getTransactionIsolationLevel();
386-
if (!currentIsolation.asSql().equalsIgnoreCase(isolationLevelToUse.asSql())) {
387-
388-
txObject.setPreviousIsolationLevel(currentIsolation);
389-
prepare = prepare.then(Mono.from(con.setTransactionIsolationLevel(isolationLevelToUse)));
390-
}
391-
}
392-
393411
// Switch to manual commit if necessary. This is very expensive in some R2DBC drivers,
394412
// so we don't want to do it unnecessarily (for example if we've explicitly
395413
// configured the connection pool to set it already).
@@ -436,6 +454,62 @@ protected RuntimeException translateException(String task, R2dbcException ex) {
436454
}
437455

438456

457+
/**
458+
* Extended R2DBC transaction definition object providing transaction attributes
459+
* to R2DBC drivers when starting a transaction.
460+
*/
461+
private record ExtendedTransactionDefinition(@Nullable String transactionName,
462+
boolean readOnly,
463+
@Nullable IsolationLevel isolationLevel,
464+
Duration lockWaitTimeout) implements io.r2dbc.spi.TransactionDefinition {
465+
466+
private ExtendedTransactionDefinition(@Nullable String transactionName, boolean readOnly,
467+
@Nullable IsolationLevel isolationLevel, Duration lockWaitTimeout) {
468+
this.transactionName = transactionName;
469+
this.readOnly = readOnly;
470+
this.isolationLevel = isolationLevel;
471+
this.lockWaitTimeout = lockWaitTimeout;
472+
}
473+
474+
475+
@Override
476+
@SuppressWarnings("unchecked")
477+
public <T> T getAttribute(Option<T> option) {
478+
return (T) doGetValue(option);
479+
}
480+
481+
@Nullable
482+
private Object doGetValue(Option<?> option) {
483+
if (io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL.equals(option)) {
484+
return this.isolationLevel;
485+
}
486+
if (io.r2dbc.spi.TransactionDefinition.NAME.equals(option)) {
487+
return this.transactionName;
488+
}
489+
if (io.r2dbc.spi.TransactionDefinition.READ_ONLY.equals(option)) {
490+
return this.readOnly;
491+
}
492+
if (io.r2dbc.spi.TransactionDefinition.LOCK_WAIT_TIMEOUT.equals(option)
493+
&& !this.lockWaitTimeout.isZero()) {
494+
return this.lockWaitTimeout;
495+
}
496+
return null;
497+
}
498+
499+
@Override
500+
public String toString() {
501+
StringBuilder sb = new StringBuilder();
502+
sb.append(getClass().getSimpleName());
503+
sb.append(" [transactionName='").append(this.transactionName).append('\'');
504+
sb.append(", readOnly=").append(this.readOnly);
505+
sb.append(", isolationLevel=").append(this.isolationLevel);
506+
sb.append(", lockWaitTimeout=").append(this.lockWaitTimeout);
507+
sb.append(']');
508+
return sb.toString();
509+
}
510+
}
511+
512+
439513
/**
440514
* ConnectionFactory transaction object, representing a ConnectionHolder.
441515
* Used as transaction object by R2dbcTransactionManager.

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/BindParameterSource.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.r2dbc.core;
1818

19-
import org.springframework.lang.Nullable;
19+
import io.r2dbc.spi.Parameter;
2020

2121
/**
2222
* Interface that defines common functionality for objects
@@ -44,24 +44,13 @@ interface BindParameterSource {
4444
boolean hasValue(String paramName);
4545

4646
/**
47-
* Return the parameter value for the requested named parameter.
47+
* Return the parameter for the requested named parameter.
4848
* @param paramName the name of the parameter
49-
* @return the value of the specified parameter (can be {@code null})
49+
* @return the specified parameter
5050
* @throws IllegalArgumentException if there is no value
5151
* for the requested parameter
5252
*/
53-
@Nullable
54-
Object getValue(String paramName) throws IllegalArgumentException;
55-
56-
/**
57-
* Determine the type for the specified named parameter.
58-
* @param paramName the name of the parameter
59-
* @return the type of the specified parameter, or
60-
* {@link Object#getClass()} if not known.
61-
*/
62-
default Class<?> getType(String paramName) {
63-
return Object.class;
64-
}
53+
Parameter getValue(String paramName) throws IllegalArgumentException;
6554

6655
/**
6756
* Return the parameter names of the underlying parameter source.

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/ColumnMapRowMapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.r2dbc.core;
1818

19-
import java.util.Collection;
19+
import java.util.List;
2020
import java.util.Map;
2121
import java.util.function.BiFunction;
2222

@@ -55,12 +55,12 @@ public class ColumnMapRowMapper implements BiFunction<Row, RowMetadata, Map<Stri
5555
@SuppressWarnings("deprecation") // getColumnNames() is deprecated as of R2DBC 0.9
5656
@Override
5757
public Map<String, Object> apply(Row row, RowMetadata rowMetadata) {
58-
Collection<String> columns = rowMetadata.getColumnNames();
58+
List<? extends ColumnMetadata> columns = rowMetadata.getColumnMetadatas();
5959
int columnCount = columns.size();
6060
Map<String, Object> mapOfColValues = createColumnMap(columnCount);
6161
int index = 0;
62-
for (String column : columns) {
63-
String key = getColumnKey(column);
62+
for (ColumnMetadata column : columns) {
63+
String key = getColumnKey(column.getName());
6464
Object obj = getColumnValue(row, index++);
6565
mapOfColValues.put(key, obj);
6666
}

spring-r2dbc/src/main/java/org/springframework/r2dbc/core/DatabaseClient.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,12 +20,17 @@
2020
import java.util.function.BiFunction;
2121
import java.util.function.Consumer;
2222
import java.util.function.Function;
23+
import java.util.function.Predicate;
2324
import java.util.function.Supplier;
2425

2526
import io.r2dbc.spi.ConnectionFactory;
27+
import io.r2dbc.spi.Readable;
28+
import io.r2dbc.spi.Result;
2629
import io.r2dbc.spi.Row;
2730
import io.r2dbc.spi.RowMetadata;
2831
import io.r2dbc.spi.Statement;
32+
import org.reactivestreams.Publisher;
33+
import reactor.core.publisher.Flux;
2934
import reactor.core.publisher.Mono;
3035

3136
import org.springframework.r2dbc.core.binding.BindMarkersFactory;
@@ -157,9 +162,9 @@ interface GenericExecuteSpec {
157162

158163
/**
159164
* Bind a non-{@code null} value to a parameter identified by its
160-
* {@code index}. {@code value} can be either a scalar value or {@link Parameter}.
165+
* {@code index}. {@code value} can be either a scalar value or {@link io.r2dbc.spi.Parameter}.
161166
* @param index zero based index to bind the parameter to
162-
* @param value either a scalar value or {@link Parameter}
167+
* @param value either a scalar value or {@link io.r2dbc.spi.Parameter}
163168
*/
164169
GenericExecuteSpec bind(int index, Object value);
165170

@@ -213,14 +218,12 @@ default GenericExecuteSpec filter(Function<? super Statement, ? extends Statemen
213218

214219
/**
215220
* Configure a result mapping {@link Function function} and enter the execution stage.
216-
* @param mappingFunction a function that maps from {@link Row} to the result type
221+
* @param mappingFunction a function that maps from {@link Readable} to the result type
217222
* @param <R> the result type
218223
* @return a {@link FetchSpec} for configuration what to fetch
224+
* @since 6.0
219225
*/
220-
default <R> RowsFetchSpec<R> map(Function<Row, R> mappingFunction) {
221-
Assert.notNull(mappingFunction, "Mapping function must not be null");
222-
return map((row, rowMetadata) -> mappingFunction.apply(row));
223-
}
226+
<R> RowsFetchSpec<R> map(Function<? super Readable, R> mappingFunction);
224227

225228
/**
226229
* Configure a result mapping {@link BiFunction function} and enter the execution stage.
@@ -231,6 +234,17 @@ default <R> RowsFetchSpec<R> map(Function<Row, R> mappingFunction) {
231234
*/
232235
<R> RowsFetchSpec<R> map(BiFunction<Row, RowMetadata, R> mappingFunction);
233236

237+
/**
238+
* Perform the SQL call and apply {@link BiFunction function} to the {@link Result}.
239+
* @param mappingFunction a function that maps from {@link Result} into a result publisher
240+
* @param <R> the result type
241+
* @return a {@link Flux} emitting mapped elements
242+
* @since 6.0
243+
* @see Result#filter(Predicate)
244+
* @see Result#flatMap(Function)
245+
*/
246+
<R> Flux<R> flatMap(Function<Result, Publisher<R>> mappingFunction);
247+
234248
/**
235249
* Perform the SQL call and retrieve the result by entering the execution stage.
236250
*/

0 commit comments

Comments
 (0)