Skip to content

Commit 9a0b4b3

Browse files
committed
Add limit option to ScanCursor.
ScanCursor can be now limited to a maximum number of elements to be retrieved. Closes #1575
1 parent 08c14e6 commit 9a0b4b3

File tree

5 files changed

+157
-3
lines changed

5 files changed

+157
-3
lines changed

src/main/java/org/springframework/data/redis/core/ConvertingCursor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,12 @@ public long getPosition() {
122122
return delegate.getPosition();
123123
}
124124

125+
/*
126+
* (non-Javadoc)
127+
* @see org.springframework.data.redis.core.Cursor#limit(long)
128+
*/
129+
@Override
130+
public Cursor<T> limit(long count) {
131+
return new ConvertingCursor<>(delegate.limit(count), converter);
132+
}
125133
}

src/main/java/org/springframework/data/redis/core/Cursor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
package org.springframework.data.redis.core;
1717

1818
import java.io.Closeable;
19-
import java.util.Iterator;
19+
20+
import org.springframework.data.redis.util.BoundedIterator;
2021

2122
/**
23+
* Cursor abstraction to scan over the keyspace or elements within a data structure using a variant of a {@code SCAN}
24+
* command.
25+
*
2226
* @author Christoph Strobl
27+
* @author Mark Paluch
2328
* @param <T>
2429
* @since 1.4
2530
*/
26-
public interface Cursor<T> extends Iterator<T>, Closeable {
31+
public interface Cursor<T> extends BoundedIterator<T>, Closeable {
2732

2833
/**
2934
* Get the reference cursor. <br>
@@ -50,4 +55,14 @@ public interface Cursor<T> extends Iterator<T>, Closeable {
5055
*/
5156
long getPosition();
5257

58+
/**
59+
* Limit the maximum number of elements to be returned from this cursor. The returned cursor object can be used to
60+
* iterate over the remaining items and to {@link #close() release} associated resources. The returned cursor is not
61+
* attached to the state of {@code this} cursor and this object should be no longer used.
62+
*
63+
* @return a new {@link Cursor} with detached iteration state.
64+
* @since 2.5
65+
*/
66+
@Override
67+
Cursor<T> limit(long count);
5368
}

src/main/java/org/springframework/data/redis/core/ScanCursor.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.springframework.dao.InvalidDataAccessApiUsageException;
2424
import org.springframework.lang.Nullable;
25+
import org.springframework.util.Assert;
2526
import org.springframework.util.CollectionUtils;
2627

2728
/**
@@ -43,6 +44,7 @@ public abstract class ScanCursor<T> implements Cursor<T> {
4344
private @Nullable Iterator<T> delegate;
4445
private @Nullable final ScanOptions scanOptions;
4546
private long position;
47+
private final long limit;
4648

4749
/**
4850
* Crates new {@link ScanCursor} with {@code id=0} and {@link ScanOptions#NONE}
@@ -81,6 +83,23 @@ public ScanCursor(long cursorId, ScanOptions options) {
8183
this.cursorId = cursorId;
8284
this.state = CursorState.READY;
8385
this.delegate = Collections.<T> emptyList().iterator();
86+
this.limit = -1;
87+
}
88+
89+
/**
90+
* Crates a new {@link ScanCursor}.
91+
*
92+
* @param source
93+
* @param limit
94+
* @since 2.5
95+
*/
96+
private ScanCursor(ScanCursor<T> source, long limit) {
97+
98+
this.scanOptions = source.scanOptions;
99+
this.cursorId = source.cursorId;
100+
this.state = source.state;
101+
this.delegate = source.delegate;
102+
this.limit = limit;
84103
}
85104

86105
private void scan(long cursorId) {
@@ -178,6 +197,10 @@ public boolean hasNext() {
178197

179198
assertCursorIsOpen();
180199

200+
if (limit != -1 && getPosition() > limit - 1) {
201+
return false;
202+
}
203+
181204
while (!delegate.hasNext() && !CursorState.FINISHED.equals(state)) {
182205
scan(cursorId);
183206
}
@@ -283,10 +306,43 @@ public long getPosition() {
283306
return position;
284307
}
285308

309+
/*
310+
* (non-Javadoc)
311+
* @see org.springframework.data.redis.core.Cursor#limit(long)
312+
*/
313+
@Override
314+
public ScanCursor<T> limit(long count) {
315+
316+
Assert.isTrue(count >= 0, "Count must be greater or equal to zero");
317+
318+
return new ScanCursorWrapper<>(this, count);
319+
}
320+
286321
/**
287322
* @author Thomas Darimont
288323
*/
289324
enum CursorState {
290325
READY, OPEN, FINISHED, CLOSED;
291326
}
327+
328+
/**
329+
* Wrapper for a concrete {@link ScanCursor} forwarding {@link #doScan(long, ScanOptions)}.
330+
*
331+
* @param <T>
332+
* @since 2.5
333+
*/
334+
private static class ScanCursorWrapper<T> extends ScanCursor<T> {
335+
336+
private final ScanCursor<T> delegate;
337+
338+
public ScanCursorWrapper(ScanCursor<T> delegate, long limit) {
339+
super(delegate, limit);
340+
this.delegate = delegate;
341+
}
342+
343+
@Override
344+
protected ScanIteration<T> doScan(long cursorId, ScanOptions options) {
345+
return delegate.doScan(cursorId, options);
346+
}
347+
}
292348
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.util;
17+
18+
import java.util.Iterator;
19+
20+
/**
21+
* Extension to {@link Iterator} that can be {@link #limit(long) limited} to a maximum number of items.
22+
*
23+
* @author Mark Paluch
24+
* @since 2.5
25+
*/
26+
public interface BoundedIterator<T> extends Iterator<T> {
27+
28+
/**
29+
* Limit the maximum number of elements to return. The limit is only applied to the returned instance and not applied
30+
* to {@code this} iterator.
31+
*
32+
* @param count the maximum number of elements of iterator to return. Must be greater or equal to zero.
33+
* @return a new instance of {@link BoundedIterator} with {@code count} applied.
34+
*/
35+
BoundedIterator<T> limit(long count);
36+
}

src/test/java/org/springframework/data/redis/core/ScanCursorUnitTests.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static org.assertj.core.api.Assertions.*;
1919

20-
import java.io.IOException;
2120
import java.util.ArrayList;
2221
import java.util.Arrays;
2322
import java.util.Collections;
@@ -32,7 +31,10 @@
3231
import org.springframework.dao.InvalidDataAccessApiUsageException;
3332

3433
/**
34+
* Unit tests for {@link ScanCursor}.
35+
*
3536
* @author Christoph Strobl
37+
* @author Mark Paluch
3638
*/
3739
class ScanCursorUnitTests {
3840

@@ -198,6 +200,43 @@ void hasNextShouldStopCorrectlyWhenWholeScanIterationDoesNotReturnResultsAndStat
198200
assertThat(cursor.getCursorId()).isEqualTo(0L);
199201
}
200202

203+
@Test // GH-1575
204+
void limitShouldApplyLimitation() {
205+
206+
LinkedList<ScanIteration<String>> values = new LinkedList<>();
207+
values.add(createIteration(1, "spring"));
208+
values.add(createIteration(2, "data"));
209+
values.add(createIteration(3, "redis"));
210+
values.add(createIteration(0));
211+
Cursor<String> cursor = initCursor(values).limit(2);
212+
213+
List<String> result = new ArrayList<>();
214+
while (cursor.hasNext()) {
215+
result.add(cursor.next());
216+
}
217+
218+
assertThat(result).hasSize(2).contains("spring", "data");
219+
}
220+
221+
@Test // GH-1575
222+
void limitShouldNotLimitOriginalCursor() {
223+
224+
LinkedList<ScanIteration<String>> values = new LinkedList<>();
225+
values.add(createIteration(1, "spring"));
226+
values.add(createIteration(2, "data"));
227+
values.add(createIteration(3, "redis"));
228+
values.add(createIteration(0));
229+
Cursor<String> cursor = initCursor(values);
230+
cursor.limit(1);
231+
232+
List<String> result = new ArrayList<>();
233+
while (cursor.hasNext()) {
234+
result.add(cursor.next());
235+
}
236+
237+
assertThat(result).hasSize(3);
238+
}
239+
201240
private CapturingCursorDummy initCursor(Queue<ScanIteration<String>> values) {
202241
CapturingCursorDummy cursor = new CapturingCursorDummy(values);
203242
cursor.open();

0 commit comments

Comments
 (0)