Skip to content

Commit f53d037

Browse files
committed
#189 - Accept StatementFilterFunction in DatabaseClient.
We now accept StatementFilterFunction and ExecuteFunction via DatabaseClient to filter Statement execution. StatementFilterFunctions can be used to pre-process the statement or post-process Result objects. databaseClient.execute(…) .filter((s, next) -> next.execute(s.returnGeneratedValues("my_id"))) .filter((s, next) -> next.execute(s.fetchSize(25))) databaseClient.execute(…) .filter(s -> s.returnGeneratedValues("my_id")) .filter(s -> s.fetchSize(25))
1 parent 08a69cf commit f53d037

File tree

8 files changed

+399
-39
lines changed

8 files changed

+399
-39
lines changed

Diff for: src/main/asciidoc/new-features.adoc

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
== What's New in Spring Data R2DBC 1.1.0 RELEASE
66

77
* Introduction of `R2dbcEntityTemplate` for entity-oriented operations.
8-
* Support interface projections with `DatabaseClient.as(…)`
8+
* Support interface projections with `DatabaseClient.as(…)`.
9+
* Support for `ExecuteFunction` and `StatementFilterFunction` via `DatabaseClient.filter(…)`.
910

1011
[[new-features.1-0-0-RELEASE]]
1112
== What's New in Spring Data R2DBC 1.0.0 RELEASE

Diff for: src/main/java/org/springframework/data/r2dbc/core/DatabaseClient.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.r2dbc.spi.ConnectionFactory;
1919
import io.r2dbc.spi.Row;
2020
import io.r2dbc.spi.RowMetadata;
21+
import io.r2dbc.spi.Statement;
2122
import reactor.core.publisher.Mono;
2223

2324
import java.util.Arrays;
@@ -26,6 +27,7 @@
2627
import java.util.function.Consumer;
2728
import java.util.function.Function;
2829
import java.util.function.Supplier;
30+
import java.util.function.UnaryOperator;
2931

3032
import org.reactivestreams.Publisher;
3133

@@ -37,6 +39,7 @@
3739
import org.springframework.data.r2dbc.query.Update;
3840
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
3941
import org.springframework.data.relational.core.sql.SqlIdentifier;
42+
import org.springframework.util.Assert;
4043

4144
/**
4245
* A non-blocking, reactive client for performing database calls requests with Reactive Streams back pressure. Provides
@@ -142,6 +145,16 @@ interface Builder {
142145
*/
143146
Builder exceptionTranslator(R2dbcExceptionTranslator exceptionTranslator);
144147

148+
/**
149+
* Configures a {@link ExecuteFunction} to execute {@link Statement} objects.
150+
*
151+
* @param executeFunction must not be {@literal null}.
152+
* @return {@code this} {@link Builder}.
153+
* @since 1.1
154+
* @see Statement#execute()
155+
*/
156+
Builder executeFunction(ExecuteFunction executeFunction);
157+
145158
/**
146159
* Configures a {@link ReactiveDataAccessStrategy}.
147160
*
@@ -186,7 +199,7 @@ interface Builder {
186199
/**
187200
* Contract for specifying a SQL call along with options leading to the exchange.
188201
*/
189-
interface GenericExecuteSpec extends BindSpec<GenericExecuteSpec> {
202+
interface GenericExecuteSpec extends BindSpec<GenericExecuteSpec>, StatementFilterSpec<GenericExecuteSpec> {
190203

191204
/**
192205
* Define the target type the result should be mapped to. <br />
@@ -231,7 +244,7 @@ interface GenericExecuteSpec extends BindSpec<GenericExecuteSpec> {
231244
/**
232245
* Contract for specifying a SQL call along with options leading to the exchange.
233246
*/
234-
interface TypedExecuteSpec<T> extends BindSpec<TypedExecuteSpec<T>> {
247+
interface TypedExecuteSpec<T> extends BindSpec<TypedExecuteSpec<T>>, StatementFilterSpec<TypedExecuteSpec<T>> {
235248

236249
/**
237250
* Define the target type the result should be mapped to. <br />
@@ -866,4 +879,31 @@ interface BindSpec<S extends BindSpec<S>> {
866879
*/
867880
S bindNull(String name, Class<?> type);
868881
}
882+
883+
/**
884+
* Contract for applying a {@link StatementFilterFunction}.
885+
*
886+
* @since 1.1
887+
*/
888+
interface StatementFilterSpec<S extends StatementFilterSpec<S>> {
889+
890+
/**
891+
* Add the given filter to the end of the filter chain.
892+
*
893+
* @param filter the filter to be added to the chain.
894+
*/
895+
default S filter(UnaryOperator<Statement> filter) {
896+
897+
Assert.notNull(filter, "Statement FilterFunction must not be null!");
898+
899+
return filter((statement, next) -> next.execute(filter.apply(statement)));
900+
}
901+
902+
/**
903+
* Add the given filter to the end of the filter chain.
904+
*
905+
* @param filter the filter to be added to the chain.
906+
*/
907+
S filter(StatementFilterFunction filter);
908+
}
869909
}

Diff for: src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java

+65-34
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
7878

7979
private final R2dbcExceptionTranslator exceptionTranslator;
8080

81+
private final ExecuteFunction executeFunction;
82+
8183
private final ReactiveDataAccessStrategy dataAccessStrategy;
8284

8385
private final boolean namedParameters;
@@ -87,11 +89,12 @@ class DefaultDatabaseClient implements DatabaseClient, ConnectionAccessor {
8789
private final ProjectionFactory projectionFactory;
8890

8991
DefaultDatabaseClient(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator,
90-
ReactiveDataAccessStrategy dataAccessStrategy, boolean namedParameters, ProjectionFactory projectionFactory,
91-
DefaultDatabaseClientBuilder builder) {
92+
ExecuteFunction executeFunction, ReactiveDataAccessStrategy dataAccessStrategy, boolean namedParameters,
93+
ProjectionFactory projectionFactory, DefaultDatabaseClientBuilder builder) {
9294

9395
this.connector = connector;
9496
this.exceptionTranslator = exceptionTranslator;
97+
this.executeFunction = executeFunction;
9598
this.dataAccessStrategy = dataAccessStrategy;
9699
this.namedParameters = namedParameters;
97100
this.projectionFactory = projectionFactory;
@@ -264,25 +267,26 @@ protected DataAccessException translateException(String task, @Nullable String s
264267
* Customization hook.
265268
*/
266269
protected <T> DefaultTypedExecuteSpec<T> createTypedExecuteSpec(Map<Integer, SettableValue> byIndex,
267-
Map<String, SettableValue> byName, Supplier<String> sqlSupplier, Class<T> typeToRead) {
268-
return new DefaultTypedExecuteSpec<>(byIndex, byName, sqlSupplier, typeToRead);
270+
Map<String, SettableValue> byName, Supplier<String> sqlSupplier, StatementFilterFunction filterFunction,
271+
Class<T> typeToRead) {
272+
return new DefaultTypedExecuteSpec<>(byIndex, byName, sqlSupplier, filterFunction, typeToRead);
269273
}
270274

271275
/**
272276
* Customization hook.
273277
*/
274278
protected <T> DefaultTypedExecuteSpec<T> createTypedExecuteSpec(Map<Integer, SettableValue> byIndex,
275-
Map<String, SettableValue> byName, Supplier<String> sqlSupplier,
279+
Map<String, SettableValue> byName, Supplier<String> sqlSupplier, StatementFilterFunction filterFunction,
276280
BiFunction<Row, RowMetadata, T> mappingFunction) {
277-
return new DefaultTypedExecuteSpec<>(byIndex, byName, sqlSupplier, mappingFunction);
281+
return new DefaultTypedExecuteSpec<>(byIndex, byName, sqlSupplier, filterFunction, mappingFunction);
278282
}
279283

280284
/**
281285
* Customization hook.
282286
*/
283287
protected ExecuteSpecSupport createGenericExecuteSpec(Map<Integer, SettableValue> byIndex,
284-
Map<String, SettableValue> byName, Supplier<String> sqlSupplier) {
285-
return new DefaultGenericExecuteSpec(byIndex, byName, sqlSupplier);
288+
Map<String, SettableValue> byName, Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
289+
return new DefaultGenericExecuteSpec(byIndex, byName, sqlSupplier, filterFunction);
286290
}
287291

288292
/**
@@ -327,19 +331,22 @@ class ExecuteSpecSupport {
327331
final Map<Integer, SettableValue> byIndex;
328332
final Map<String, SettableValue> byName;
329333
final Supplier<String> sqlSupplier;
334+
final StatementFilterFunction filterFunction;
330335

331336
ExecuteSpecSupport(Supplier<String> sqlSupplier) {
332337

333338
this.byIndex = Collections.emptyMap();
334339
this.byName = Collections.emptyMap();
335340
this.sqlSupplier = sqlSupplier;
341+
this.filterFunction = StatementFilterFunctions.empty();
336342
}
337343

338344
ExecuteSpecSupport(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
339-
Supplier<String> sqlSupplier) {
345+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
340346
this.byIndex = byIndex;
341347
this.byName = byName;
342348
this.sqlSupplier = sqlSupplier;
349+
this.filterFunction = filterFunction;
343350
}
344351

345352
<T> FetchSpec<T> exchange(Supplier<String> sqlSupplier, BiFunction<Row, RowMetadata, T> mappingFunction) {
@@ -404,7 +411,7 @@ <T> FetchSpec<T> exchange(Supplier<String> sqlSupplier, BiFunction<Row, RowMetad
404411
return statement;
405412
};
406413

407-
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, executeFunction);
414+
Function<Connection, Flux<Result>> resultFunction = toFunction(sql, filterFunction, executeFunction);
408415

409416
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
410417
sql, //
@@ -426,7 +433,7 @@ public ExecuteSpecSupport bind(int index, Object value) {
426433
byIndex.put(index, SettableValue.fromOrEmpty(value, value.getClass()));
427434
}
428435

429-
return createInstance(byIndex, this.byName, this.sqlSupplier);
436+
return createInstance(byIndex, this.byName, this.sqlSupplier, this.filterFunction);
430437
}
431438

432439
public ExecuteSpecSupport bindNull(int index, Class<?> type) {
@@ -436,7 +443,7 @@ public ExecuteSpecSupport bindNull(int index, Class<?> type) {
436443
Map<Integer, SettableValue> byIndex = new LinkedHashMap<>(this.byIndex);
437444
byIndex.put(index, SettableValue.empty(type));
438445

439-
return createInstance(byIndex, this.byName, this.sqlSupplier);
446+
return createInstance(byIndex, this.byName, this.sqlSupplier, this.filterFunction);
440447
}
441448

442449
public ExecuteSpecSupport bind(String name, Object value) {
@@ -455,7 +462,7 @@ public ExecuteSpecSupport bind(String name, Object value) {
455462
byName.put(name, SettableValue.fromOrEmpty(value, value.getClass()));
456463
}
457464

458-
return createInstance(this.byIndex, byName, this.sqlSupplier);
465+
return createInstance(this.byIndex, byName, this.sqlSupplier, this.filterFunction);
459466
}
460467

461468
public ExecuteSpecSupport bindNull(String name, Class<?> type) {
@@ -466,7 +473,14 @@ public ExecuteSpecSupport bindNull(String name, Class<?> type) {
466473
Map<String, SettableValue> byName = new LinkedHashMap<>(this.byName);
467474
byName.put(name, SettableValue.empty(type));
468475

469-
return createInstance(this.byIndex, byName, this.sqlSupplier);
476+
return createInstance(this.byIndex, byName, this.sqlSupplier, this.filterFunction);
477+
}
478+
479+
public ExecuteSpecSupport filter(StatementFilterFunction filter) {
480+
481+
Assert.notNull(filter, "Statement FilterFunction must not be null!");
482+
483+
return createInstance(this.byIndex, byName, this.sqlSupplier, this.filterFunction.andThen(filter));
470484
}
471485

472486
private void assertNotPreparedOperation() {
@@ -476,8 +490,8 @@ private void assertNotPreparedOperation() {
476490
}
477491

478492
protected ExecuteSpecSupport createInstance(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
479-
Supplier<String> sqlSupplier) {
480-
return new ExecuteSpecSupport(byIndex, byName, sqlSupplier);
493+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
494+
return new ExecuteSpecSupport(byIndex, byName, sqlSupplier, filterFunction);
481495
}
482496
}
483497

@@ -487,8 +501,8 @@ protected ExecuteSpecSupport createInstance(Map<Integer, SettableValue> byIndex,
487501
protected class DefaultGenericExecuteSpec extends ExecuteSpecSupport implements GenericExecuteSpec {
488502

489503
DefaultGenericExecuteSpec(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
490-
Supplier<String> sqlSupplier) {
491-
super(byIndex, byName, sqlSupplier);
504+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
505+
super(byIndex, byName, sqlSupplier, filterFunction);
492506
}
493507

494508
DefaultGenericExecuteSpec(Supplier<String> sqlSupplier) {
@@ -500,7 +514,7 @@ public <R> TypedExecuteSpec<R> as(Class<R> resultType) {
500514

501515
Assert.notNull(resultType, "Result type must not be null!");
502516

503-
return createTypedExecuteSpec(this.byIndex, this.byName, this.sqlSupplier, resultType);
517+
return createTypedExecuteSpec(this.byIndex, this.byName, this.sqlSupplier, this.filterFunction, resultType);
504518
}
505519

506520
@Override
@@ -549,10 +563,15 @@ public DefaultGenericExecuteSpec bindNull(String name, Class<?> type) {
549563
return (DefaultGenericExecuteSpec) super.bindNull(name, type);
550564
}
551565

566+
@Override
567+
public DefaultGenericExecuteSpec filter(StatementFilterFunction filter) {
568+
return (DefaultGenericExecuteSpec) super.filter(filter);
569+
}
570+
552571
@Override
553572
protected ExecuteSpecSupport createInstance(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
554-
Supplier<String> sqlSupplier) {
555-
return createGenericExecuteSpec(byIndex, byName, sqlSupplier);
573+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
574+
return createGenericExecuteSpec(byIndex, byName, sqlSupplier, filterFunction);
556575
}
557576
}
558577

@@ -566,9 +585,9 @@ protected class DefaultTypedExecuteSpec<T> extends ExecuteSpecSupport implements
566585
private final BiFunction<Row, RowMetadata, T> mappingFunction;
567586

568587
DefaultTypedExecuteSpec(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
569-
Supplier<String> sqlSupplier, Class<T> typeToRead) {
588+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction, Class<T> typeToRead) {
570589

571-
super(byIndex, byName, sqlSupplier);
590+
super(byIndex, byName, sqlSupplier, filterFunction);
572591

573592
this.typeToRead = typeToRead;
574593

@@ -581,9 +600,10 @@ protected class DefaultTypedExecuteSpec<T> extends ExecuteSpecSupport implements
581600
}
582601

583602
DefaultTypedExecuteSpec(Map<Integer, SettableValue> byIndex, Map<String, SettableValue> byName,
584-
Supplier<String> sqlSupplier, BiFunction<Row, RowMetadata, T> mappingFunction) {
603+
Supplier<String> sqlSupplier, StatementFilterFunction filterFunction,
604+
BiFunction<Row, RowMetadata, T> mappingFunction) {
585605

586-
super(byIndex, byName, sqlSupplier);
606+
super(byIndex, byName, sqlSupplier, filterFunction);
587607

588608
this.typeToRead = null;
589609
this.mappingFunction = mappingFunction;
@@ -594,7 +614,7 @@ public <R> TypedExecuteSpec<R> as(Class<R> resultType) {
594614

595615
Assert.notNull(resultType, "Result type must not be null!");
596616

597-
return createTypedExecuteSpec(this.byIndex, this.byName, this.sqlSupplier, resultType);
617+
return createTypedExecuteSpec(this.byIndex, this.byName, this.sqlSupplier, this.filterFunction, resultType);
598618
}
599619

600620
@Override
@@ -643,10 +663,15 @@ public DefaultTypedExecuteSpec<T> bindNull(String name, Class<?> type) {
643663
return (DefaultTypedExecuteSpec<T>) super.bindNull(name, type);
644664
}
645665

666+
@Override
667+
public DefaultTypedExecuteSpec<T> filter(StatementFilterFunction filter) {
668+
return (DefaultTypedExecuteSpec<T>) super.filter(filter);
669+
}
670+
646671
@Override
647672
protected DefaultTypedExecuteSpec<T> createInstance(Map<Integer, SettableValue> byIndex,
648-
Map<String, SettableValue> byName, Supplier<String> sqlSupplier) {
649-
return createTypedExecuteSpec(byIndex, byName, sqlSupplier, this.typeToRead);
673+
Map<String, SettableValue> byName, Supplier<String> sqlSupplier, StatementFilterFunction filterFunction) {
674+
return createTypedExecuteSpec(byIndex, byName, sqlSupplier, filterFunction, this.typeToRead);
650675
}
651676
}
652677

@@ -735,7 +760,8 @@ <R> FetchSpec<R> execute(PreparedOperation<?> preparedOperation, BiFunction<Row,
735760

736761
String sql = getRequiredSql(preparedOperation);
737762
Function<Connection, Statement> selectFunction = wrapPreparedOperation(sql, preparedOperation);
738-
Function<Connection, Flux<Result>> resultFunction = DefaultDatabaseClient.toExecuteFunction(sql, selectFunction);
763+
Function<Connection, Flux<Result>> resultFunction = toFunction(sql, StatementFilterFunctions.empty(),
764+
selectFunction);
739765

740766
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
741767
sql, //
@@ -1432,7 +1458,8 @@ private <R> FetchSpec<R> exchangeInsert(BiFunction<Row, RowMetadata, R> mappingF
14321458
String sql = getRequiredSql(operation);
14331459
Function<Connection, Statement> insertFunction = wrapPreparedOperation(sql, operation)
14341460
.andThen(statement -> statement.returnGeneratedValues());
1435-
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, insertFunction);
1461+
Function<Connection, Flux<Result>> resultFunction = toFunction(sql, StatementFilterFunctions.empty(),
1462+
insertFunction);
14361463

14371464
return new DefaultSqlResult<>(this, //
14381465
sql, //
@@ -1445,7 +1472,8 @@ private UpdatedRowsFetchSpec exchangeUpdate(PreparedOperation<?> operation) {
14451472

14461473
String sql = getRequiredSql(operation);
14471474
Function<Connection, Statement> executeFunction = wrapPreparedOperation(sql, operation);
1448-
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, executeFunction);
1475+
Function<Connection, Flux<Result>> resultFunction = toFunction(sql, StatementFilterFunctions.empty(),
1476+
executeFunction);
14491477

14501478
return new DefaultSqlResult<>(this, //
14511479
sql, //
@@ -1476,12 +1504,15 @@ private Function<Connection, Statement> wrapPreparedOperation(String sql, Prepar
14761504
};
14771505
}
14781506

1479-
private static Function<Connection, Flux<Result>> toExecuteFunction(String sql,
1480-
Function<Connection, Statement> executeFunction) {
1507+
private Function<Connection, Flux<Result>> toFunction(String sql, StatementFilterFunction filterFunction,
1508+
Function<Connection, Statement> statementFactory) {
14811509

14821510
return it -> {
14831511

1484-
Flux<Result> from = Flux.defer(() -> executeFunction.apply(it).execute()).cast(Result.class);
1512+
Flux<Result> from = Flux.defer(() -> {
1513+
Statement statement = statementFactory.apply(it);
1514+
return filterFunction.filter(statement, executeFunction);
1515+
}).cast(Result.class);
14851516
return from.checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
14861517
};
14871518
}

0 commit comments

Comments
 (0)