Skip to content

Commit 64854c5

Browse files
GH-2692 - Add basic support for keyset based pagination and scrolling.
Discussed in spring-projects/spring-data-commons#2151 and implemented in spring-projects/spring-data-commons#2787 we can build on top and provide basic support for both imperative and reactive repositories. The support will be available only on the repository level in the first iteration, think ```java import java.util.UUID; import org.springframework.data.domain.ScrollPosition; import org.springframework.data.domain.Sort; import org.springframework.data.domain.Window; import org.springframework.data.neo4j.repository.Neo4jRepository; public interface ScrollingRepository extends Neo4jRepository<ScrollingEntity, UUID> { Window<ScrollingEntity> findTop4By(Sort sort, ScrollPosition position); } ``` and other derived finder methods that have a limit and a stable sort. If requested, further support can be added to the templates, too. Closes #2691.
1 parent 97c7b5b commit 64854c5

22 files changed

+976
-53
lines changed

src/main/java/org/springframework/data/neo4j/core/mapping/Constants.java

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public final class Constants {
4444
public static final String NAME_OF_INTERNAL_ID = "__internalNeo4jId__";
4545
public static final String NAME_OF_ELEMENT_ID = "__elementId__";
4646

47+
public static final String NAME_OF_ADDITIONAL_SORT = "__stable_uniq_sort__";
48+
4749
/**
4850
* Indicates the list of dynamic labels.
4951
*/

src/main/java/org/springframework/data/neo4j/repository/query/AbstractNeo4jQuery.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.springframework.data.support.PageableExecutionUtils;
4848
import org.springframework.lang.Nullable;
4949
import org.springframework.util.Assert;
50-
import org.springframework.util.StringUtils;
5150

5251
/**
5352
* Base class for {@link RepositoryQuery} implementations for Neo4j.
@@ -80,7 +79,7 @@ public QueryMethod getQueryMethod() {
8079
@Override
8180
public final Object execute(Object[] parameters) {
8281

83-
boolean incrementLimit = queryMethod.isSliceQuery() && !queryMethod.getQueryAnnotation().map(q -> q.countQuery()).filter(StringUtils::hasText).isPresent();
82+
boolean incrementLimit = queryMethod.incrementLimit();
8483
Neo4jParameterAccessor parameterAccessor = new Neo4jParameterAccessor(
8584
(Neo4jQueryMethod.Neo4jParameters) this.queryMethod.getParameters(),
8685
parameters);
@@ -91,8 +90,7 @@ public final Object execute(Object[] parameters) {
9190
PropertyFilterSupport.getInputProperties(resultProcessor, factory, mappingContext), parameterAccessor,
9291
null, getMappingFunction(resultProcessor), incrementLimit ? l -> l + 1 : UnaryOperator.identity());
9392

94-
Object rawResult = new Neo4jQueryExecution.DefaultQueryExecution(neo4jOperations).execute(preparedQuery,
95-
queryMethod.isCollectionLikeQuery() || queryMethod.isPageQuery() || queryMethod.isSliceQuery());
93+
Object rawResult = new Neo4jQueryExecution.DefaultQueryExecution(neo4jOperations).execute(preparedQuery, queryMethod.asCollectionQuery());
9694

9795
Converter<Object, Object> preparingConverter = OptionalUnwrappingConverter.INSTANCE;
9896
if (returnedType.isProjecting()) {
@@ -107,6 +105,8 @@ public final Object execute(Object[] parameters) {
107105
rawResult = createPage(parameterAccessor, (List<?>) rawResult);
108106
} else if (queryMethod.isSliceQuery()) {
109107
rawResult = createSlice(incrementLimit, parameterAccessor, (List<?>) rawResult);
108+
} else if (queryMethod.isScrollQuery()) {
109+
rawResult = createWindow(resultProcessor, incrementLimit, parameterAccessor, (List<?>) rawResult, preparedQuery.getQueryFragmentsAndParameters());
110110
}
111111
return resultProcessor.processResult(rawResult, preparingConverter);
112112
}

src/main/java/org/springframework/data/neo4j/repository/query/AbstractReactiveNeo4jQuery.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Collection;
1919
import java.util.function.BiFunction;
2020
import java.util.function.Supplier;
21+
import java.util.function.UnaryOperator;
2122

2223
import org.neo4j.driver.types.MapAccessor;
2324
import org.neo4j.driver.types.TypeSystem;
@@ -37,6 +38,8 @@
3738
import org.springframework.lang.Nullable;
3839
import org.springframework.util.Assert;
3940

41+
import reactor.core.publisher.Flux;
42+
4043
/**
4144
* Base class for {@link RepositoryQuery} implementations for Neo4j.
4245
*
@@ -67,16 +70,17 @@ public QueryMethod getQueryMethod() {
6770
@Override
6871
public final Object execute(Object[] parameters) {
6972

73+
boolean incrementLimit = queryMethod.incrementLimit();
7074
Neo4jParameterAccessor parameterAccessor = new Neo4jParameterAccessor((Neo4jQueryMethod.Neo4jParameters) this.queryMethod.getParameters(), parameters);
7175
ResultProcessor resultProcessor = queryMethod.getResultProcessor().withDynamicProjection(parameterAccessor);
7276

7377
ReturnedType returnedType = resultProcessor.getReturnedType();
7478
PreparedQuery<?> preparedQuery = prepareQuery(returnedType.getReturnedType(),
7579
PropertyFilterSupport.getInputProperties(resultProcessor, factory, mappingContext), parameterAccessor,
76-
null, getMappingFunction(resultProcessor));
80+
null, getMappingFunction(resultProcessor), incrementLimit ? l -> l + 1 : UnaryOperator.identity());
7781

7882
Object rawResult = new Neo4jQueryExecution.ReactiveQueryExecution(neo4jOperations).execute(preparedQuery,
79-
queryMethod.isCollectionLikeQuery());
83+
queryMethod.asCollectionQuery());
8084

8185
Converter<Object, Object> preparingConverter = OptionalUnwrappingConverter.INSTANCE;
8286
if (returnedType.isProjecting()) {
@@ -87,10 +91,16 @@ public final Object execute(Object[] parameters) {
8791
(EntityInstanceWithSource) OptionalUnwrappingConverter.INSTANCE.convert(source));
8892
}
8993

94+
if (queryMethod.isScrollQuery()) {
95+
rawResult = ((Flux<?>) rawResult).collectList().map(rawResultList ->
96+
createWindow(resultProcessor, incrementLimit, parameterAccessor, rawResultList, preparedQuery.getQueryFragmentsAndParameters()));
97+
}
98+
9099
return resultProcessor.processResult(rawResult, preparingConverter);
91100
}
92101

93102
protected abstract <T extends Object> PreparedQuery<T> prepareQuery(Class<T> returnedType,
94103
Collection<PropertyFilter.ProjectedPath> includedProperties, Neo4jParameterAccessor parameterAccessor,
95-
@Nullable Neo4jQueryType queryType, @Nullable Supplier<BiFunction<TypeSystem, MapAccessor, ?>> mappingFunction);
104+
@Nullable Neo4jQueryType queryType, @Nullable Supplier<BiFunction<TypeSystem, MapAccessor, ?>> mappingFunction,
105+
@Nullable UnaryOperator<Integer> limitModifier);
96106
}

src/main/java/org/springframework/data/neo4j/repository/query/CypherAdapterUtils.java

+92-7
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,29 @@
1818
import static org.neo4j.cypherdsl.core.Cypher.property;
1919

2020
import java.util.Collection;
21+
import java.util.HashMap;
22+
import java.util.LinkedHashMap;
23+
import java.util.Map;
24+
import java.util.function.BiFunction;
2125
import java.util.function.Function;
2226
import java.util.stream.Collectors;
2327

2428
import org.apiguardian.api.API;
29+
import org.neo4j.cypherdsl.core.Condition;
30+
import org.neo4j.cypherdsl.core.Conditions;
2531
import org.neo4j.cypherdsl.core.Cypher;
2632
import org.neo4j.cypherdsl.core.Expression;
2733
import org.neo4j.cypherdsl.core.Functions;
2834
import org.neo4j.cypherdsl.core.SortItem;
2935
import org.neo4j.cypherdsl.core.StatementBuilder;
3036
import org.neo4j.cypherdsl.core.SymbolicName;
37+
import org.neo4j.driver.Value;
38+
import org.springframework.data.domain.KeysetScrollPosition;
3139
import org.springframework.data.domain.Pageable;
3240
import org.springframework.data.domain.Sort;
3341
import org.springframework.data.neo4j.core.mapping.Constants;
34-
import org.springframework.data.neo4j.core.mapping.GraphPropertyDescription;
42+
import org.springframework.data.neo4j.core.mapping.Neo4jPersistentEntity;
43+
import org.springframework.data.neo4j.core.mapping.Neo4jPersistentProperty;
3544
import org.springframework.data.neo4j.core.mapping.NodeDescription;
3645

3746
/**
@@ -51,6 +60,7 @@ public final class CypherAdapterUtils {
5160
*/
5261
public static Function<Sort.Order, SortItem> sortAdapterFor(NodeDescription<?> nodeDescription) {
5362
return order -> {
63+
5464
String domainProperty = order.getProperty();
5565
boolean propertyIsQualified = domainProperty.contains(".");
5666
SymbolicName root;
@@ -61,12 +71,21 @@ public static Function<Sort.Order, SortItem> sortAdapterFor(NodeDescription<?> n
6171
root = Cypher.name(domainProperty.substring(0, indexOfSeparator));
6272
domainProperty = domainProperty.substring(indexOfSeparator + 1);
6373
}
64-
String graphProperty = nodeDescription.getGraphProperty(domainProperty)
65-
.map(GraphPropertyDescription::getPropertyName).orElseThrow(() -> new IllegalStateException(
66-
String.format("Cannot order by the unknown graph property: '%s'", order.getProperty())));
67-
Expression expression = property(root, graphProperty);
68-
if (order.isIgnoreCase()) {
69-
expression = Functions.toLower(expression);
74+
75+
var optionalGraphProperty = nodeDescription.getGraphProperty(domainProperty);
76+
if (optionalGraphProperty.isEmpty()) {
77+
throw new IllegalStateException(String.format("Cannot order by the unknown graph property: '%s'", order.getProperty()));
78+
}
79+
var graphProperty = optionalGraphProperty.get();
80+
Expression expression;
81+
if (graphProperty.isInternalIdProperty()) {
82+
// Not using the id expression here, as the root will be referring to the constructed map being returned.
83+
expression = property(root, Constants.NAME_OF_INTERNAL_ID);
84+
} else {
85+
expression = property(root, graphProperty.getPropertyName());
86+
if (order.isIgnoreCase()) {
87+
expression = Functions.toLower(expression);
88+
}
7089
}
7190
SortItem sortItem = Cypher.sort(expression);
7291

@@ -78,6 +97,72 @@ public static Function<Sort.Order, SortItem> sortAdapterFor(NodeDescription<?> n
7897
};
7998
}
8099

100+
public static Condition combineKeysetIntoCondition(Neo4jPersistentEntity<?> entity, KeysetScrollPosition scrollPosition, Sort sort) {
101+
102+
var incomingKeys = scrollPosition.getKeys();
103+
var orderedKeys = new LinkedHashMap<String, Object>();
104+
105+
record PropertyAndOrder(Neo4jPersistentProperty property, Sort.Order order) {
106+
}
107+
var propertyAndDirection = new HashMap<String, PropertyAndOrder>();
108+
109+
sort.forEach(order -> {
110+
var property = entity.getRequiredPersistentProperty(order.getProperty());
111+
var propertyName = property.getPropertyName();
112+
propertyAndDirection.put(propertyName, new PropertyAndOrder(property, order));
113+
114+
if (incomingKeys.containsKey(propertyName)) {
115+
orderedKeys.put(propertyName, incomingKeys.get(propertyName));
116+
}
117+
});
118+
if (incomingKeys.containsKey(Constants.NAME_OF_ADDITIONAL_SORT)) {
119+
orderedKeys.put(Constants.NAME_OF_ADDITIONAL_SORT, incomingKeys.get(Constants.NAME_OF_ADDITIONAL_SORT));
120+
}
121+
122+
var root = Constants.NAME_OF_TYPED_ROOT_NODE.apply(entity);
123+
124+
var resultingCondition = Conditions.noCondition();
125+
// This is the next equality pair if previous sort key was equal
126+
var nextEquals = Conditions.noCondition();
127+
// This is the condition for when all the sort orderedKeys are equal, and we must filter via id
128+
var allEqualsWithArtificialSort = Conditions.noCondition();
129+
130+
for (Map.Entry<String, Object> entry : orderedKeys.entrySet()) {
131+
132+
var k = entry.getKey();
133+
var v = entry.getValue();
134+
if (v == null || (v instanceof Value value && value.isNull())) {
135+
throw new IllegalStateException("Cannot resume from KeysetScrollPosition. Offending key: '%s' is 'null'".formatted(k));
136+
}
137+
var parameter = Cypher.anonParameter(v);
138+
139+
Expression expression;
140+
141+
var scrollDirection = scrollPosition.getDirection();
142+
if (Constants.NAME_OF_ADDITIONAL_SORT.equals(k)) {
143+
expression = entity.getIdExpression();
144+
var comparatorFunction = getComparatorFunction(scrollDirection == KeysetScrollPosition.Direction.Forward ? Sort.Direction.ASC : Sort.Direction.DESC, scrollDirection);
145+
allEqualsWithArtificialSort = allEqualsWithArtificialSort.and(comparatorFunction.apply(expression, parameter));
146+
} else {
147+
var p = propertyAndDirection.get(k);
148+
expression = p.property.isIdProperty() ? entity.getIdExpression() : root.property(k);
149+
150+
var comparatorFunction = getComparatorFunction(p.order.getDirection(), scrollDirection);
151+
resultingCondition = resultingCondition.or(nextEquals.and(comparatorFunction.apply(expression, parameter)));
152+
nextEquals = expression.eq(parameter);
153+
allEqualsWithArtificialSort = allEqualsWithArtificialSort.and(nextEquals);
154+
}
155+
}
156+
return resultingCondition.or(allEqualsWithArtificialSort);
157+
}
158+
159+
private static BiFunction<Expression, Expression, Condition> getComparatorFunction(Sort.Direction sortDirection, KeysetScrollPosition.Direction scrollDirection) {
160+
if (scrollDirection == KeysetScrollPosition.Direction.Backward) {
161+
return sortDirection.isAscending() ? Expression::lte : Expression::gte;
162+
}
163+
return sortDirection.isAscending() ? Expression::gt : Expression::lt;
164+
}
165+
81166
/**
82167
* Converts a Spring Data sort to an equivalent list of {@link SortItem sort items}.
83168
*

src/main/java/org/springframework/data/neo4j/repository/query/CypherQueryCreator.java

+47-15
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@
4545
import org.neo4j.cypherdsl.core.RelationshipPattern;
4646
import org.neo4j.cypherdsl.core.SortItem;
4747
import org.neo4j.driver.types.Point;
48+
import org.springframework.data.domain.KeysetScrollPosition;
49+
import org.springframework.data.domain.OffsetScrollPosition;
4850
import org.springframework.data.domain.Pageable;
4951
import org.springframework.data.domain.Range;
52+
import org.springframework.data.domain.ScrollPosition;
5053
import org.springframework.data.domain.Sort;
5154
import org.springframework.data.geo.Box;
5255
import org.springframework.data.geo.Circle;
@@ -64,6 +67,7 @@
6467
import org.springframework.data.neo4j.core.mapping.PropertyFilter;
6568
import org.springframework.data.neo4j.core.mapping.RelationshipDescription;
6669
import org.springframework.data.neo4j.core.schema.TargetNode;
70+
import org.springframework.data.repository.query.QueryMethod;
6771
import org.springframework.data.repository.query.parser.AbstractQueryCreator;
6872
import org.springframework.data.repository.query.parser.Part;
6973
import org.springframework.data.repository.query.parser.PartTree;
@@ -82,6 +86,7 @@
8286
final class CypherQueryCreator extends AbstractQueryCreator<QueryFragmentsAndParameters, Condition> {
8387

8488
private final Neo4jMappingContext mappingContext;
89+
private final QueryMethod queryMethod;
8590

8691
private final Class<?> domainType;
8792
private final NodeDescription<?> nodeDescription;
@@ -99,6 +104,8 @@ final class CypherQueryCreator extends AbstractQueryCreator<QueryFragmentsAndPar
99104

100105
private final Pageable pagingParameter;
101106

107+
private final ScrollPosition scrollPosition;
108+
102109
/**
103110
* Stores the number of max results, if the {@link PartTree tree} is limiting.
104111
*/
@@ -113,18 +120,21 @@ final class CypherQueryCreator extends AbstractQueryCreator<QueryFragmentsAndPar
113120

114121
private final List<PropertyPathWrapper> propertyPathWrappers;
115122

123+
private final boolean keysetRequiresSort;
124+
116125
/**
117126
* Can be used to modify the limit of a paged or sliced query.
118127
*/
119128
private final UnaryOperator<Integer> limitModifier;
120129

121-
CypherQueryCreator(Neo4jMappingContext mappingContext, Class<?> domainType, Neo4jQueryType queryType, PartTree tree,
130+
CypherQueryCreator(Neo4jMappingContext mappingContext, QueryMethod queryMethod, Class<?> domainType, Neo4jQueryType queryType, PartTree tree,
122131
Neo4jParameterAccessor actualParameters, Collection<PropertyFilter.ProjectedPath> includedProperties,
123132
BiFunction<Object, Neo4jPersistentPropertyConverter<?>, Object> parameterConversion,
124133
UnaryOperator<Integer> limitModifier) {
125134

126135
super(tree, actualParameters);
127136
this.mappingContext = mappingContext;
137+
this.queryMethod = queryMethod;
128138

129139
this.domainType = domainType;
130140
this.nodeDescription = this.mappingContext.getRequiredNodeDescription(this.domainType);
@@ -139,6 +149,7 @@ final class CypherQueryCreator extends AbstractQueryCreator<QueryFragmentsAndPar
139149
this.parameterConversion = parameterConversion;
140150

141151
this.pagingParameter = actualParameters.getPageable();
152+
this.scrollPosition = actualParameters.getScrollPosition();
142153
this.limitModifier = limitModifier;
143154

144155
AtomicInteger symbolicNameIndex = new AtomicInteger();
@@ -148,6 +159,7 @@ final class CypherQueryCreator extends AbstractQueryCreator<QueryFragmentsAndPar
148159
mappingContext.getPersistentPropertyPath(part.getProperty())))
149160
.collect(Collectors.toList());
150161

162+
this.keysetRequiresSort = queryMethod.isScrollQuery() && actualParameters.getScrollPosition() instanceof KeysetScrollPosition;
151163
}
152164

153165
private class PropertyPathWrapper {
@@ -260,7 +272,12 @@ protected QueryFragmentsAndParameters complete(@Nullable Condition condition, So
260272
.collect(Collectors.toMap(p -> p.nameOrIndex, p -> parameterConversion.apply(p.value, p.conversionOverride)));
261273

262274
QueryFragments queryFragments = createQueryFragments(condition, sort);
263-
return new QueryFragmentsAndParameters(nodeDescription, queryFragments, convertedParameters);
275+
276+
var theSort = pagingParameter.getSort().and(sort);
277+
if (keysetRequiresSort && theSort.isUnsorted()) {
278+
throw new UnsupportedOperationException("Unsorted keyset based scrolling is not supported.");
279+
}
280+
return new QueryFragmentsAndParameters(nodeDescription, queryFragments, convertedParameters, theSort);
264281
}
265282

266283
@NonNull
@@ -280,15 +297,12 @@ private QueryFragments createQueryFragments(@Nullable Condition condition, Sort
280297
}
281298
}
282299

283-
// closing action: add the condition and path match
284-
queryFragments.setCondition(conditionFragment);
285-
286300
if (!relationshipChain.isEmpty()) {
287301
queryFragments.setMatchOn(relationshipChain);
288302
} else {
289303
queryFragments.addMatchOn(startNode);
290304
}
291-
/// end of initial filter query creation
305+
// end of initial filter query creation
292306

293307
if (queryType == Neo4jQueryType.COUNT) {
294308
queryFragments.setReturnExpression(Functions.count(Cypher.asterisk()), true);
@@ -298,20 +312,38 @@ private QueryFragments createQueryFragments(@Nullable Condition condition, Sort
298312
queryFragments.setDeleteExpression(Constants.NAME_OF_TYPED_ROOT_NODE.apply(nodeDescription));
299313
queryFragments.setReturnExpression(Functions.count(Constants.NAME_OF_TYPED_ROOT_NODE.apply(nodeDescription)), true);
300314
} else {
315+
316+
var theSort = pagingParameter.getSort().and(sort);
317+
318+
if (pagingParameter.isUnpaged() && scrollPosition == null && maxResults != null) {
319+
queryFragments.setLimit(limitModifier.apply(maxResults.intValue()));
320+
} else if (scrollPosition instanceof KeysetScrollPosition keysetScrollPosition) {
321+
322+
Neo4jPersistentEntity<?> entity = (Neo4jPersistentEntity<?>) nodeDescription;
323+
// Enforce sorting by something that is hopefully stable comparable (looking at Neo4j's id() with tears in my eyes).
324+
theSort = theSort.and(Sort.by(entity.getRequiredIdProperty().getName()).ascending());
325+
326+
queryFragments.setLimit(limitModifier.apply(maxResults.intValue()));
327+
if (!keysetScrollPosition.isInitial()) {
328+
conditionFragment = conditionFragment.and(CypherAdapterUtils.combineKeysetIntoCondition(entity, keysetScrollPosition, theSort));
329+
}
330+
331+
queryFragments.setRequiresReverseSort(keysetScrollPosition.getDirection() == KeysetScrollPosition.Direction.Backward);
332+
} else if (scrollPosition instanceof OffsetScrollPosition offsetScrollPosition) {
333+
queryFragments.setSkip(offsetScrollPosition.getOffset());
334+
queryFragments.setLimit(limitModifier.apply(pagingParameter.isUnpaged() ? maxResults.intValue() : pagingParameter.getPageSize()));
335+
}
336+
301337
queryFragments.setReturnBasedOn(nodeDescription, includedProperties, isDistinct);
302338
queryFragments.setOrderBy(Stream
303339
.concat(sortItems.stream(),
304-
pagingParameter.getSort().and(sort).stream().map(CypherAdapterUtils.sortAdapterFor(nodeDescription)))
340+
theSort.stream().map(CypherAdapterUtils.sortAdapterFor(nodeDescription)))
305341
.collect(Collectors.toList()));
306-
if (pagingParameter.isUnpaged()) {
307-
queryFragments.setLimit(maxResults);
308-
} else {
309-
long skip = pagingParameter.getOffset();
310-
int pageSize = pagingParameter.getPageSize();
311-
queryFragments.setSkip(skip);
312-
queryFragments.setLimit(limitModifier.apply(pageSize));
313-
}
314342
}
343+
344+
// closing action: add the condition and path match
345+
queryFragments.setCondition(conditionFragment);
346+
315347
return queryFragments;
316348
}
317349

0 commit comments

Comments
 (0)