Skip to content

Commit 085aca9

Browse files
mp911dechristophstrobl
authored andcommitted
Consider exclusive Range for Jedis Stream range/pending commands.
We now consider Bound.exclusive(…) for XRANGE, XREVRANGE and XPENDING commands through Jedis. Closes: #2044 Original Pull Request: #2108
1 parent bdd6cee commit 085aca9

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,20 @@ static byte[][] entryIdsToBytes(List<RecordId> recordIds) {
6363
}
6464

6565
static String getLowerValue(Range<String> range) {
66-
67-
if (range.getLowerBound().equals(Range.Bound.unbounded())) {
68-
return "-";
69-
}
70-
71-
return range.getLowerBound().getValue().orElse("-");
66+
return getValue(range.getLowerBound(), "-");
7267
}
7368

7469
static String getUpperValue(Range<String> range) {
70+
return getValue(range.getUpperBound(), "+");
71+
}
72+
73+
private static String getValue(Range.Bound<String> bound, String fallbackValue) {
7574

76-
if (range.getUpperBound().equals(Range.Bound.unbounded())) {
77-
return "+";
75+
if (bound.equals(Range.Bound.unbounded())) {
76+
return fallbackValue;
7877
}
7978

80-
return range.getUpperBound().getValue().orElse("+");
79+
return bound.getValue().map(it -> bound.isInclusive() ? it : "(" + it).orElse(fallbackValue);
8180
}
8281

8382
static List<Object> mapToList(Map<String, Object> map) {

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -3620,7 +3620,7 @@ public void xPendingShouldLoadPendingMessages() {
36203620
actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
36213621
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
36223622

3623-
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L));
3623+
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
36243624

36253625
List<Object> results = getResults();
36263626
assertThat(results).hasSize(4);
@@ -3665,7 +3665,7 @@ public void xPendingShouldLoadPendingMessagesForConsumer() {
36653665
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
36663666

36673667
actual.add(connection.xPending(KEY_1, "my-group", "my-consumer",
3668-
org.springframework.data.domain.Range.open("-", "+"), 10L));
3668+
org.springframework.data.domain.Range.unbounded(), 10L));
36693669

36703670
List<Object> results = getResults();
36713671
assertThat(results).hasSize(4);
@@ -3688,7 +3688,7 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() {
36883688
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
36893689

36903690
actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2",
3691-
org.springframework.data.domain.Range.open("-", "+"), 10L));
3691+
org.springframework.data.domain.Range.unbounded(), 10L));
36923692

36933693
List<Object> results = getResults();
36943694
assertThat(results).hasSize(4);
@@ -3704,7 +3704,7 @@ void xPendingShouldLoadEmptyPendingMessages() {
37043704
actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
37053705
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
37063706

3707-
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L));
3707+
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
37083708

37093709
List<Object> results = getResults();
37103710
assertThat(results).hasSize(3);

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

+47-10
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,10 @@
3434
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
3535
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
3636
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
37-
import org.springframework.data.redis.connection.stream.Consumer;
38-
import org.springframework.data.redis.connection.stream.MapRecord;
39-
import org.springframework.data.redis.connection.stream.ObjectRecord;
40-
import org.springframework.data.redis.connection.stream.PendingMessages;
41-
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
42-
import org.springframework.data.redis.connection.stream.ReadOffset;
43-
import org.springframework.data.redis.connection.stream.RecordId;
44-
import org.springframework.data.redis.connection.stream.StreamOffset;
45-
import org.springframework.data.redis.connection.stream.StreamReadOptions;
46-
import org.springframework.data.redis.connection.stream.StreamRecords;
37+
import org.springframework.data.redis.connection.stream.*;
4738
import org.springframework.data.redis.test.condition.EnabledOnCommand;
4839
import org.springframework.data.redis.test.condition.EnabledOnRedisDriver;
40+
import org.springframework.data.redis.test.condition.EnabledOnRedisVersion;
4941
import org.springframework.data.redis.test.condition.RedisDetector;
5042
import org.springframework.data.redis.test.extension.RedisCluster;
5143
import org.springframework.data.redis.test.extension.RedisStanalone;
@@ -198,6 +190,28 @@ void rangeShouldReportMessages() {
198190
assertThat(message.getId()).isEqualTo(messageId1);
199191
}
200192

193+
@ParameterizedRedisTest // GH-2044
194+
@EnabledOnRedisVersion("6.2")
195+
void exclusiveRangeShouldReportMessages() {
196+
197+
K key = keyFactory.instance();
198+
HK hashKey = hashKeyFactory.instance();
199+
HV value = hashValueFactory.instance();
200+
201+
RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value));
202+
RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value));
203+
204+
List<MapRecord<K, HK, HV>> messages = streamOps.range(key,
205+
Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId2.getValue())));
206+
207+
assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId2);
208+
209+
messages = streamOps.range(key,
210+
Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId2.getValue())));
211+
212+
assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId1);
213+
}
214+
201215
@ParameterizedRedisTest // DATAREDIS-864
202216
void reverseRangeShouldReportMessages() {
203217

@@ -213,6 +227,29 @@ void reverseRangeShouldReportMessages() {
213227
assertThat(messages).hasSize(2).extracting("id").containsSequence(messageId2, messageId1);
214228
}
215229

230+
@ParameterizedRedisTest // GH-2044
231+
@EnabledOnRedisVersion("6.2")
232+
void exclusiveReverseRangeShouldReportMessages() {
233+
234+
K key = keyFactory.instance();
235+
HK hashKey = hashKeyFactory.instance();
236+
HV value = hashValueFactory.instance();
237+
238+
RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value));
239+
RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value));
240+
RecordId messageId3 = streamOps.add(key, Collections.singletonMap(hashKey, value));
241+
242+
List<MapRecord<K, HK, HV>> messages = streamOps.reverseRange(key,
243+
Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId3.getValue())));
244+
245+
assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId3, messageId2);
246+
247+
messages = streamOps.reverseRange(key,
248+
Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId3.getValue())));
249+
250+
assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId2, messageId1);
251+
}
252+
216253
@ParameterizedRedisTest // DATAREDIS-864
217254
void reverseRangeShouldConvertSimpleMessages() {
218255

0 commit comments

Comments
 (0)