diff --git a/pom.xml b/pom.xml index 74369f5ab8..e9df31301c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,13 @@ - + 4.0.0 org.springframework.data spring-data-redis - 2.5.0-SNAPSHOT + 2.5.0-GH-1575-SNAPSHOT Spring Data Redis diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterSetCommands.java index 09b0719cb5..72bcb1e746 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterSetCommands.java @@ -452,7 +452,7 @@ protected ScanIteration doScan(long cursorId, ScanOptions options) { ScanParams params = JedisConverters.toScanParams(options); redis.clients.jedis.ScanResult result = connection.getCluster().sscan(key, JedisConverters.toBytes(cursorId), params); - return new ScanIteration<>(Long.valueOf(result.getCursor()), result.getResult()); + return new ScanIteration<>(Long.parseLong(result.getCursor()), result.getResult()); } }.open(); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java index 0c5374d54e..969727ce94 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java @@ -169,7 +169,7 @@ protected ScanIteration doScan(long cursorId, ScanOptions options) { ScanParams params = JedisConverters.toScanParams(options); redis.clients.jedis.ScanResult result = connection.getJedis().scan(Long.toString(cursorId), params); - return new ScanIteration<>(Long.valueOf(result.getCursor()), + return new ScanIteration<>(Long.parseLong(result.getCursor()), JedisConverters.stringListToByteList().convert(result.getResult())); } diff --git a/src/main/java/org/springframework/data/redis/core/ConvertingCursor.java b/src/main/java/org/springframework/data/redis/core/ConvertingCursor.java index e375626874..8b44a85ee2 100644 --- a/src/main/java/org/springframework/data/redis/core/ConvertingCursor.java +++ b/src/main/java/org/springframework/data/redis/core/ConvertingCursor.java @@ -15,8 +15,6 @@ */ package org.springframework.data.redis.core; -import java.io.IOException; - import org.springframework.core.convert.converter.Converter; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -81,7 +79,7 @@ public void remove() { * @see java.io.Closeable#close() */ @Override - public void close() throws IOException { + public void close() { delegate.close(); } @@ -121,5 +119,4 @@ public Cursor open() { public long getPosition() { return delegate.getPosition(); } - } diff --git a/src/main/java/org/springframework/data/redis/core/Cursor.java b/src/main/java/org/springframework/data/redis/core/Cursor.java index bd0ac13d6a..c75a7448db 100644 --- a/src/main/java/org/springframework/data/redis/core/Cursor.java +++ b/src/main/java/org/springframework/data/redis/core/Cursor.java @@ -15,15 +15,25 @@ */ package org.springframework.data.redis.core; -import java.io.Closeable; -import java.util.Iterator; +import org.springframework.data.util.CloseableIterator; /** + * Cursor abstraction to scan over the keyspace or elements within a data structure using a variant of a {@code SCAN} + * command. + *

+ * Using a Java 8 {@link #stream() java.util.stream.Stream} allows to apply additional + * {@link java.util.stream.Stream#filter(java.util.function.Predicate) filters} and {@link java.util.stream.Stream#limit(long) limits} to + * the underlying {@link Cursor}. + *

+ * Make sure to {@link CloseableIterator#close() close} the cursor when done as this allows implementations to clean up + * any resources they need to keep open to iterate over elements (eg. by using a try-with-resource statement). + * * @author Christoph Strobl + * @author Mark Paluch * @param * @since 1.4 */ -public interface Cursor extends Iterator, Closeable { +public interface Cursor extends CloseableIterator { /** * Get the reference cursor.
@@ -34,20 +44,22 @@ public interface Cursor extends Iterator, Closeable { long getCursorId(); /** - * @return Returns true if cursor closed. + * @return {@code true} if cursor closed. */ boolean isClosed(); /** - * Opens cursor and returns itself. + * Opens cursor and returns itself. This method is intended to be called by components constructing a {@link Cursor} + * and should not be called externally. * - * @return + * @return the opened cursor. + * @deprecated to be removed from the interface in the next major version. */ + @Deprecated Cursor open(); /** - * @return Returns the current position of the cursor. + * @return the current position of the cursor. */ long getPosition(); - } diff --git a/src/main/java/org/springframework/data/redis/core/ScanCursor.java b/src/main/java/org/springframework/data/redis/core/ScanCursor.java index 5e49de95a3..414945c4c9 100644 --- a/src/main/java/org/springframework/data/redis/core/ScanCursor.java +++ b/src/main/java/org/springframework/data/redis/core/ScanCursor.java @@ -15,13 +15,13 @@ */ package org.springframework.data.redis.core; -import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; /** @@ -38,10 +38,10 @@ */ public abstract class ScanCursor implements Cursor { - private @Nullable CursorState state; + private CursorState state; private long cursorId; - private @Nullable Iterator delegate; - private @Nullable final ScanOptions scanOptions; + private Iterator delegate; + private final ScanOptions scanOptions; private long position; /** @@ -54,7 +54,7 @@ public ScanCursor() { /** * Crates new {@link ScanCursor} with {@code id=0}. * - * @param options + * @param options the scan options to apply. */ public ScanCursor(ScanOptions options) { this(0, options); @@ -63,7 +63,7 @@ public ScanCursor(ScanOptions options) { /** * Crates new {@link ScanCursor} with {@link ScanOptions#NONE} * - * @param cursorId + * @param cursorId the cursor Id. */ public ScanCursor(long cursorId) { this(cursorId, ScanOptions.NONE); @@ -72,15 +72,15 @@ public ScanCursor(long cursorId) { /** * Crates new {@link ScanCursor} * - * @param cursorId - * @param options Defaulted to {@link ScanOptions#NONE} if nulled. + * @param cursorId the cursor Id. + * @param options Defaulted to {@link ScanOptions#NONE} if {@code null}. */ - public ScanCursor(long cursorId, ScanOptions options) { + public ScanCursor(long cursorId, @Nullable ScanOptions options) { this.scanOptions = options != null ? options : ScanOptions.NONE; this.cursorId = cursorId; this.state = CursorState.READY; - this.delegate = Collections. emptyList().iterator(); + this.delegate = Collections.emptyIterator(); } private void scan(long cursorId) { @@ -125,14 +125,7 @@ protected void doOpen(long cursorId) { private void processScanResult(ScanIteration result) { - if (result == null) { - - resetDelegate(); - state = CursorState.FINISHED; - return; - } - - cursorId = Long.valueOf(result.getCursorId()); + cursorId = result.getCursorId(); if (isFinished(cursorId)) { state = CursorState.FINISHED; @@ -157,7 +150,7 @@ protected boolean isFinished(long cursorId) { } private void resetDelegate() { - delegate = Collections. emptyList().iterator(); + delegate = Collections.emptyIterator(); } /* @@ -186,11 +179,7 @@ public boolean hasNext() { return true; } - if (cursorId > 0) { - return true; - } - - return false; + return cursorId > 0; } private void assertCursorIsOpen() { @@ -243,7 +232,7 @@ public void remove() { * @see java.io.Closeable#close() */ @Override - public final void close() throws IOException { + public final void close() { try { doClose(); diff --git a/src/test/java/org/springframework/data/redis/core/ScanCursorUnitTests.java b/src/test/java/org/springframework/data/redis/core/ScanCursorUnitTests.java index d722d3539d..cc876a4e1f 100644 --- a/src/test/java/org/springframework/data/redis/core/ScanCursorUnitTests.java +++ b/src/test/java/org/springframework/data/redis/core/ScanCursorUnitTests.java @@ -17,7 +17,6 @@ import static org.assertj.core.api.Assertions.*; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -25,14 +24,17 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.Stack; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; - import org.springframework.dao.InvalidDataAccessApiUsageException; /** + * Unit tests for {@link ScanCursor}. + * * @author Christoph Strobl + * @author Mark Paluch */ class ScanCursorUnitTests { @@ -198,6 +200,39 @@ void hasNextShouldStopCorrectlyWhenWholeScanIterationDoesNotReturnResultsAndStat assertThat(cursor.getCursorId()).isEqualTo(0L); } + @Test // GH-1575 + void streamLimitShouldApplyLimitation() { + + LinkedList> values = new LinkedList<>(); + values.add(createIteration(1, "spring")); + values.add(createIteration(2, "data")); + values.add(createIteration(3, "redis")); + values.add(createIteration(0)); + + Cursor cursor = initCursor(values); + + assertThat(cursor.stream().limit(2).collect(Collectors.toList())).hasSize(2).contains("spring", "data"); + } + + @Test // GH-1575 + void streamingCursorShouldForwardClose() { + + LinkedList> values = new LinkedList<>(); + values.add(createIteration(1, "spring")); + values.add(createIteration(2, "data")); + values.add(createIteration(3, "redis")); + values.add(createIteration(0)); + Cursor cursor = initCursor(values); + + assertThat(cursor.isClosed()).isFalse(); + + Stream stream = cursor.stream(); + stream.collect(Collectors.toList()); + stream.close(); + + assertThat(cursor.isClosed()).isTrue(); + } + private CapturingCursorDummy initCursor(Queue> values) { CapturingCursorDummy cursor = new CapturingCursorDummy(values); cursor.open(); @@ -208,11 +243,9 @@ private ScanIteration createIteration(long cursorId, String... values) { return new ScanIteration<>(cursorId, values.length > 0 ? Arrays.asList(values) : Collections. emptyList()); } - private class CapturingCursorDummy extends ScanCursor { - - private Queue> values; + private static class CapturingCursorDummy extends ScanCursor { - private Stack cursors; + private final Queue> values; CapturingCursorDummy(Queue> values) { this.values = values; @@ -221,11 +254,12 @@ private class CapturingCursorDummy extends ScanCursor { @Override protected ScanIteration doScan(long cursorId, ScanOptions options) { - if (cursors == null) { - cursors = new Stack<>(); + ScanIteration iteration = this.values.poll(); + + if (iteration == null) { + iteration = new ScanIteration<>(0, Collections.emptyList()); } - this.cursors.push(cursorId); - return this.values.poll(); + return iteration; } } }