Skip to content

Add limit option to ScanCursor #2014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.5.0-SNAPSHOT</version>
<version>2.5.0-GH-1575-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
ScanParams params = JedisConverters.toScanParams(options);
redis.clients.jedis.ScanResult<byte[]> result = connection.getCluster().sscan(key,
JedisConverters.toBytes(cursorId), params);
return new ScanIteration<>(Long.valueOf(result.getCursor()), result.getResult());
return new ScanIteration<>(Long.parseLong(result.getCursor()), result.getResult());
}
}.open();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {

ScanParams params = JedisConverters.toScanParams(options);
redis.clients.jedis.ScanResult<String> result = connection.getJedis().scan(Long.toString(cursorId), params);
return new ScanIteration<>(Long.valueOf(result.getCursor()),
return new ScanIteration<>(Long.parseLong(result.getCursor()),
JedisConverters.stringListToByteList().convert(result.getResult()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.springframework.data.redis.core;

import java.io.IOException;

import org.springframework.core.convert.converter.Converter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -81,7 +79,7 @@ public void remove() {
* @see java.io.Closeable#close()
*/
@Override
public void close() throws IOException {
public void close() {
delegate.close();
}

Expand Down Expand Up @@ -121,5 +119,4 @@ public Cursor<T> open() {
public long getPosition() {
return delegate.getPosition();
}

}
28 changes: 20 additions & 8 deletions src/main/java/org/springframework/data/redis/core/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,25 @@
*/
package org.springframework.data.redis.core;

import java.io.Closeable;
import java.util.Iterator;
import org.springframework.data.util.CloseableIterator;

/**
* Cursor abstraction to scan over the keyspace or elements within a data structure using a variant of a {@code SCAN}
* command.
* <p />
* Using a Java 8 {@link #stream() java.util.stream.Stream} allows to apply additional
* {@link java.util.stream.Stream#filter(java.util.function.Predicate) filters} and {@link java.util.stream.Stream#limit(long) limits} to
* the underlying {@link Cursor}.
* <p />
* Make sure to {@link CloseableIterator#close() close} the cursor when done as this allows implementations to clean up
* any resources they need to keep open to iterate over elements (eg. by using a try-with-resource statement).
*
* @author Christoph Strobl
* @author Mark Paluch
* @param <T>
* @since 1.4
*/
public interface Cursor<T> extends Iterator<T>, Closeable {
public interface Cursor<T> extends CloseableIterator<T> {

/**
* Get the reference cursor. <br>
Expand All @@ -34,20 +44,22 @@ public interface Cursor<T> extends Iterator<T>, Closeable {
long getCursorId();

/**
* @return Returns true if cursor closed.
* @return {@code true} if cursor closed.
*/
boolean isClosed();

/**
* Opens cursor and returns itself.
* Opens cursor and returns itself. This method is intended to be called by components constructing a {@link Cursor}
* and should not be called externally.
*
* @return
* @return the opened cursor.
* @deprecated to be removed from the interface in the next major version.
*/
@Deprecated
Cursor<T> open();

/**
* @return Returns the current position of the cursor.
* @return the current position of the cursor.
*/
long getPosition();

}
39 changes: 14 additions & 25 deletions src/main/java/org/springframework/data/redis/core/ScanCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package org.springframework.data.redis.core;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -38,10 +38,10 @@
*/
public abstract class ScanCursor<T> implements Cursor<T> {

private @Nullable CursorState state;
private CursorState state;
private long cursorId;
private @Nullable Iterator<T> delegate;
private @Nullable final ScanOptions scanOptions;
private Iterator<T> delegate;
private final ScanOptions scanOptions;
private long position;

/**
Expand All @@ -54,7 +54,7 @@ public ScanCursor() {
/**
* Crates new {@link ScanCursor} with {@code id=0}.
*
* @param options
* @param options the scan options to apply.
*/
public ScanCursor(ScanOptions options) {
this(0, options);
Expand All @@ -63,7 +63,7 @@ public ScanCursor(ScanOptions options) {
/**
* Crates new {@link ScanCursor} with {@link ScanOptions#NONE}
*
* @param cursorId
* @param cursorId the cursor Id.
*/
public ScanCursor(long cursorId) {
this(cursorId, ScanOptions.NONE);
Expand All @@ -72,15 +72,15 @@ public ScanCursor(long cursorId) {
/**
* Crates new {@link ScanCursor}
*
* @param cursorId
* @param options Defaulted to {@link ScanOptions#NONE} if nulled.
* @param cursorId the cursor Id.
* @param options Defaulted to {@link ScanOptions#NONE} if {@code null}.
*/
public ScanCursor(long cursorId, ScanOptions options) {
public ScanCursor(long cursorId, @Nullable ScanOptions options) {

this.scanOptions = options != null ? options : ScanOptions.NONE;
this.cursorId = cursorId;
this.state = CursorState.READY;
this.delegate = Collections.<T> emptyList().iterator();
this.delegate = Collections.emptyIterator();
}

private void scan(long cursorId) {
Expand Down Expand Up @@ -125,14 +125,7 @@ protected void doOpen(long cursorId) {

private void processScanResult(ScanIteration<T> result) {

if (result == null) {

resetDelegate();
state = CursorState.FINISHED;
return;
}

cursorId = Long.valueOf(result.getCursorId());
cursorId = result.getCursorId();

if (isFinished(cursorId)) {
state = CursorState.FINISHED;
Expand All @@ -157,7 +150,7 @@ protected boolean isFinished(long cursorId) {
}

private void resetDelegate() {
delegate = Collections.<T> emptyList().iterator();
delegate = Collections.emptyIterator();
}

/*
Expand Down Expand Up @@ -186,11 +179,7 @@ public boolean hasNext() {
return true;
}

if (cursorId > 0) {
return true;
}

return false;
return cursorId > 0;
}

private void assertCursorIsOpen() {
Expand Down Expand Up @@ -243,7 +232,7 @@ public void remove() {
* @see java.io.Closeable#close()
*/
@Override
public final void close() throws IOException {
public final void close() {

try {
doClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

import static org.assertj.core.api.Assertions.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Stack;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;

import org.springframework.dao.InvalidDataAccessApiUsageException;

/**
* Unit tests for {@link ScanCursor}.
*
* @author Christoph Strobl
* @author Mark Paluch
*/
class ScanCursorUnitTests {

Expand Down Expand Up @@ -198,6 +200,39 @@ void hasNextShouldStopCorrectlyWhenWholeScanIterationDoesNotReturnResultsAndStat
assertThat(cursor.getCursorId()).isEqualTo(0L);
}

@Test // GH-1575
void streamLimitShouldApplyLimitation() {

LinkedList<ScanIteration<String>> values = new LinkedList<>();
values.add(createIteration(1, "spring"));
values.add(createIteration(2, "data"));
values.add(createIteration(3, "redis"));
values.add(createIteration(0));

Cursor<String> cursor = initCursor(values);

assertThat(cursor.stream().limit(2).collect(Collectors.toList())).hasSize(2).contains("spring", "data");
}

@Test // GH-1575
void streamingCursorShouldForwardClose() {

LinkedList<ScanIteration<String>> values = new LinkedList<>();
values.add(createIteration(1, "spring"));
values.add(createIteration(2, "data"));
values.add(createIteration(3, "redis"));
values.add(createIteration(0));
Cursor<String> cursor = initCursor(values);

assertThat(cursor.isClosed()).isFalse();

Stream<String> stream = cursor.stream();
stream.collect(Collectors.toList());
stream.close();

assertThat(cursor.isClosed()).isTrue();
}

private CapturingCursorDummy initCursor(Queue<ScanIteration<String>> values) {
CapturingCursorDummy cursor = new CapturingCursorDummy(values);
cursor.open();
Expand All @@ -208,11 +243,9 @@ private ScanIteration<String> createIteration(long cursorId, String... values) {
return new ScanIteration<>(cursorId, values.length > 0 ? Arrays.asList(values) : Collections.<String> emptyList());
}

private class CapturingCursorDummy extends ScanCursor<String> {

private Queue<ScanIteration<String>> values;
private static class CapturingCursorDummy extends ScanCursor<String> {

private Stack<Long> cursors;
private final Queue<ScanIteration<String>> values;

CapturingCursorDummy(Queue<ScanIteration<String>> values) {
this.values = values;
Expand All @@ -221,11 +254,12 @@ private class CapturingCursorDummy extends ScanCursor<String> {
@Override
protected ScanIteration<String> doScan(long cursorId, ScanOptions options) {

if (cursors == null) {
cursors = new Stack<>();
ScanIteration<String> iteration = this.values.poll();

if (iteration == null) {
iteration = new ScanIteration<>(0, Collections.emptyList());
}
this.cursors.push(cursorId);
return this.values.poll();
return iteration;
}
}
}