From ab35ba71a40e3fb50c59b27d1ea1fc86830636e0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 30 Jun 2021 13:57:58 +0200 Subject: [PATCH 1/2] Prepare issue branch. --- pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 8657b1992a..e4678c9e60 100644 --- a/pom.xml +++ b/pom.xml @@ -1,11 +1,13 @@ - + 4.0.0 org.springframework.data spring-data-redis - 2.6.0-SNAPSHOT + 2.6.0-2044-SNAPSHOT Spring Data Redis From aab51284991cba1b62cb5eadd8cc022374c6252b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 30 Jun 2021 14:46:21 +0200 Subject: [PATCH 2/2] Consider exclusive Range for Jedis Stream range/pending commands. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now consider `Bound.exclusive(…)` for XRANGE, XREVRANGE and XPENDING commands through Jedis. Closes #2044 --- .../connection/jedis/StreamConverters.java | 17 +++--- .../AbstractConnectionIntegrationTests.java | 8 +-- ...faultStreamOperationsIntegrationTests.java | 57 +++++++++++++++---- 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index 565d0fd0db..f95661edc7 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -63,21 +63,20 @@ static byte[][] entryIdsToBytes(List recordIds) { } static String getLowerValue(Range range) { - - if (range.getLowerBound().equals(Range.Bound.unbounded())) { - return "-"; - } - - return range.getLowerBound().getValue().orElse("-"); + return getValue(range.getLowerBound(), "-"); } static String getUpperValue(Range range) { + return getValue(range.getUpperBound(), "+"); + } + + private static String getValue(Range.Bound bound, String fallbackValue) { - if (range.getUpperBound().equals(Range.Bound.unbounded())) { - return "+"; + if (bound.equals(Range.Bound.unbounded())) { + return fallbackValue; } - return range.getUpperBound().getValue().orElse("+"); + return bound.getValue().map(it -> bound.isInclusive() ? it : "(" + it).orElse(fallbackValue); } static List mapToList(Map map) { diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 0aeb887d0a..5bf50e66bb 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -3490,7 +3490,7 @@ public void xPendingShouldLoadPendingMessages() { actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"), StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L)); + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -3535,7 +3535,7 @@ public void xPendingShouldLoadPendingMessagesForConsumer() { StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", - org.springframework.data.domain.Range.open("-", "+"), 10L)); + org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -3558,7 +3558,7 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() { StreamOffset.create(KEY_1, ReadOffset.lastConsumed()))); actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2", - org.springframework.data.domain.Range.open("-", "+"), 10L)); + org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(4); @@ -3574,7 +3574,7 @@ void xPendingShouldLoadEmptyPendingMessages() { actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2))); actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group")); - actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L)); + actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L)); List results = getResults(); assertThat(results).hasSize(3); diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java index ccd2c45b66..bf568fe6c8 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java @@ -34,18 +34,10 @@ import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.PendingMessages; -import org.springframework.data.redis.connection.stream.PendingMessagesSummary; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.RecordId; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.connection.stream.StreamReadOptions; -import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.test.condition.EnabledOnCommand; import org.springframework.data.redis.test.condition.EnabledOnRedisDriver; +import org.springframework.data.redis.test.condition.EnabledOnRedisVersion; import org.springframework.data.redis.test.condition.RedisDetector; import org.springframework.data.redis.test.extension.RedisCluster; import org.springframework.data.redis.test.extension.RedisStanalone; @@ -198,6 +190,28 @@ void rangeShouldReportMessages() { assertThat(message.getId()).isEqualTo(messageId1); } + @ParameterizedRedisTest // GH-2044 + @EnabledOnRedisVersion("6.2") + void exclusiveRangeShouldReportMessages() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.range(key, + Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId2.getValue()))); + + assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId2); + + messages = streamOps.range(key, + Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId2.getValue()))); + + assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId1); + } + @ParameterizedRedisTest // DATAREDIS-864 void reverseRangeShouldReportMessages() { @@ -213,6 +227,29 @@ void reverseRangeShouldReportMessages() { assertThat(messages).hasSize(2).extracting("id").containsSequence(messageId2, messageId1); } + @ParameterizedRedisTest // GH-2044 + @EnabledOnRedisVersion("6.2") + void exclusiveReverseRangeShouldReportMessages() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId3 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.reverseRange(key, + Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId3.getValue()))); + + assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId3, messageId2); + + messages = streamOps.reverseRange(key, + Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId3.getValue()))); + + assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId2, messageId1); + } + @ParameterizedRedisTest // DATAREDIS-864 void reverseRangeShouldConvertSimpleMessages() {