Skip to content

Commit 96a4121

Browse files
mipo256mp911de
authored andcommitted
Add fetchSize to ReactiveSelectOperationSupport.
Closes #1652 Original pull request: #1898
1 parent 7cf81ae commit 96a4121

File tree

5 files changed

+82
-14
lines changed

5 files changed

+82
-14
lines changed

Diff for: spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/core/R2dbcEntityTemplate.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
* @author Jose Luis Leon
9595
* @author Robert Heim
9696
* @author Sebastian Wieland
97+
* @author Mikhail Polivakha
9798
* @since 1.1
9899
*/
99100
public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware {
@@ -312,14 +313,14 @@ public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessEx
312313
Assert.notNull(entityClass, "Entity class must not be null");
313314

314315
SqlIdentifier tableName = getTableName(entityClass);
315-
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all);
316+
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all, null);
316317
}
317318

318319
@SuppressWarnings("unchecked")
319320
<T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIdentifier tableName,
320-
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler) {
321+
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler, @Nullable Integer fetchSize) {
321322

322-
RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType);
323+
RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType, fetchSize);
323324

324325
P result = resultHandler.apply(fetchSpec);
325326

@@ -331,7 +332,7 @@ <T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIde
331332
}
332333

333334
private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdentifier tableName,
334-
Class<T> returnType) {
335+
Class<T> returnType, @Nullable Integer fetchSize) {
335336

336337
StatementMapper statementMapper = dataAccessStrategy.getStatementMapper().forType(entityType);
337338

@@ -358,13 +359,17 @@ private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdent
358359

359360
PreparedOperation<?> operation = statementMapper.getMappedObject(selectSpec);
360361

361-
return getRowsFetchSpec(databaseClient.sql(operation), entityType, returnType);
362+
return getRowsFetchSpec(
363+
databaseClient.sql(operation).filter((statement) -> statement.fetchSize(Optional.ofNullable(fetchSize).orElse(0))),
364+
entityType,
365+
returnType
366+
);
362367
}
363368

364369
@Override
365370
public <T> Mono<T> selectOne(Query query, Class<T> entityClass) throws DataAccessException {
366371
return doSelect(query.isLimited() ? query : query.limit(2), entityClass, getTableName(entityClass), entityClass,
367-
RowsFetchSpec::one);
372+
RowsFetchSpec::one, null);
368373
}
369374

370375
@Override

Diff for: spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperation.java

+10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* <pre>
3737
* <code>
3838
* select(Human.class)
39+
* .withFetchSize(10)
3940
* .from("star_wars")
4041
* .as(Jedi.class)
4142
* .matching(query(where("firstname").is("luke")))
@@ -44,6 +45,7 @@
4445
* </pre>
4546
*
4647
* @author Mark Paluch
48+
* @author Mikhail Polivakha
4749
* @since 1.1
4850
*/
4951
public interface ReactiveSelectOperation {
@@ -115,6 +117,14 @@ interface SelectWithProjection<T> extends SelectWithQuery<T> {
115117
*/
116118
interface SelectWithQuery<T> extends TerminatingSelect<T> {
117119

120+
/**
121+
* Specifies the fetch size for this query
122+
*
123+
* @param fetchSize
124+
* @return
125+
*/
126+
SelectWithQuery<T> withFetchSize(int fetchSize);
127+
118128
/**
119129
* Set the {@link Query} used as a filter in the {@code SELECT} statement.
120130
*

Diff for: spring-data-r2dbc/src/main/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationSupport.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* Implementation of {@link ReactiveSelectOperation}.
2929
*
3030
* @author Mark Paluch
31+
* @author Mikhail Polivakha
3132
* @since 1.1
3233
*/
3334
class ReactiveSelectOperationSupport implements ReactiveSelectOperation {
@@ -43,7 +44,7 @@ public <T> ReactiveSelect<T> select(Class<T> domainType) {
4344

4445
Assert.notNull(domainType, "DomainType must not be null");
4546

46-
return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null);
47+
return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null, null);
4748
}
4849

4950
static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
@@ -54,38 +55,49 @@ static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
5455
private final Query query;
5556
private final @Nullable SqlIdentifier tableName;
5657

58+
private final @Nullable Integer fetchSize;
59+
5760
ReactiveSelectSupport(R2dbcEntityTemplate template, Class<?> domainType, Class<T> returnType, Query query,
58-
@Nullable SqlIdentifier tableName) {
61+
@Nullable SqlIdentifier tableName, @Nullable Integer fetchSize) {
5962

6063
this.template = template;
6164
this.domainType = domainType;
6265
this.returnType = returnType;
6366
this.query = query;
6467
this.tableName = tableName;
68+
this.fetchSize = fetchSize;
6569
}
6670

6771
@Override
6872
public SelectWithProjection<T> from(SqlIdentifier tableName) {
6973

7074
Assert.notNull(tableName, "Table name must not be null");
7175

72-
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
76+
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
7377
}
7478

7579
@Override
7680
public <R> SelectWithQuery<R> as(Class<R> returnType) {
7781

7882
Assert.notNull(returnType, "ReturnType must not be null");
7983

80-
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
84+
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
85+
}
86+
87+
@Override
88+
public SelectWithQuery<T> withFetchSize(int fetchSize) {
89+
90+
Assert.notNull(returnType, "FetchSize must not be null");
91+
92+
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
8193
}
8294

8395
@Override
8496
public TerminatingSelect<T> matching(Query query) {
8597

8698
Assert.notNull(query, "Query must not be null");
8799

88-
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
100+
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
89101
}
90102

91103
@Override
@@ -100,17 +112,17 @@ public Mono<Boolean> exists() {
100112

101113
@Override
102114
public Mono<T> first() {
103-
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first);
115+
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first, fetchSize);
104116
}
105117

106118
@Override
107119
public Mono<T> one() {
108-
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one);
120+
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one, fetchSize);
109121
}
110122

111123
@Override
112124
public Flux<T> all() {
113-
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all);
125+
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all, fetchSize);
114126
}
115127

116128
private SqlIdentifier getTableName() {

Diff for: spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/core/ReactiveSelectOperationUnitTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.r2dbc.spi.test.MockResult;
2525
import io.r2dbc.spi.test.MockRow;
2626
import io.r2dbc.spi.test.MockRowMetadata;
27+
import reactor.core.publisher.Flux;
2728
import reactor.test.StepVerifier;
2829

2930
import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +39,7 @@
3839
* Unit test for {@link ReactiveSelectOperation}.
3940
*
4041
* @author Mark Paluch
42+
* @author Mikhail Polivakha
4143
*/
4244
public class ReactiveSelectOperationUnitTests {
4345

@@ -242,6 +244,32 @@ void shouldSelectCount() {
242244
assertThat(statement.getSql()).isEqualTo("SELECT COUNT(*) FROM person WHERE person.THE_NAME = $1");
243245
}
244246

247+
@Test // gh-1652
248+
void shouldBeAbleToProvideFetchSize() {
249+
MockRowMetadata metadata = MockRowMetadata.builder()
250+
.columnMetadata(MockColumnMetadata.builder().name("id").type(R2dbcType.INTEGER).build())
251+
.build();
252+
MockResult result = MockResult.builder()
253+
.row(MockRow.builder().identified("id", Object.class, "Walter").metadata(metadata).build())
254+
.build();
255+
256+
recorder.addStubbing(s -> s.startsWith("SELECT"), result);
257+
258+
entityTemplate.select(Person.class) //
259+
.withFetchSize(10)
260+
.matching(query(where("name").is("Walter")).limit(10).offset(20)) //
261+
.all() //
262+
.as(StepVerifier::create) //
263+
.expectNextCount(1) //
264+
.verifyComplete();
265+
266+
StatementRecorder.RecordedStatement statement = recorder.getCreatedStatement(s -> s.startsWith("SELECT"));
267+
268+
assertThat(statement.getSql())
269+
.isEqualTo("SELECT person.* FROM person WHERE person.THE_NAME = $1 LIMIT 10 OFFSET 20");
270+
assertThat(statement.getFetchSize()).isEqualTo(10);
271+
}
272+
245273
static class Person {
246274

247275
@Id String id;

Diff for: spring-data-r2dbc/src/test/java/org/springframework/data/r2dbc/testing/StatementRecorder.java

+13
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* Recorder utility for R2DBC {@link Statement}s. Allows stubbing and introspection.
4949
*
5050
* @author Mark Paluch
51+
* @author Mikhail Polivakha
5152
*/
5253
public class StatementRecorder implements ConnectionFactory {
5354

@@ -273,6 +274,8 @@ public class RecordedStatement implements Statement {
273274

274275
private final List<Result> results;
275276

277+
private int fetchSize;
278+
276279
private final Map<Object, Parameter> bindings = new LinkedHashMap<>();
277280

278281
public RecordedStatement(String sql, Result result) {
@@ -292,6 +295,10 @@ public String getSql() {
292295
return sql;
293296
}
294297

298+
public int getFetchSize() {
299+
return fetchSize;
300+
}
301+
295302
@Override
296303
public Statement add() {
297304
return this;
@@ -321,6 +328,12 @@ public Statement bindNull(String identifier, Class<?> type) {
321328
return this;
322329
}
323330

331+
@Override
332+
public Statement fetchSize(int rows) {
333+
fetchSize = rows;
334+
return this;
335+
}
336+
324337
@Override
325338
public Flux<Result> execute() {
326339
return Flux.fromIterable(results).doOnSubscribe(subscription -> executedStatements.add(this));

0 commit comments

Comments
 (0)