Skip to content

Commit aab5128

Browse files
committed
Consider exclusive Range for Jedis Stream range/pending commands.
We now consider `Bound.exclusive(…)` for XRANGE, XREVRANGE and XPENDING commands through Jedis. Closes #2044
1 parent ab35ba7 commit aab5128

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,20 @@ static byte[][] entryIdsToBytes(List<RecordId> recordIds) {
6363
}
6464

6565
static String getLowerValue(Range<String> range) {
66-
67-
if (range.getLowerBound().equals(Range.Bound.unbounded())) {
68-
return "-";
69-
}
70-
71-
return range.getLowerBound().getValue().orElse("-");
66+
return getValue(range.getLowerBound(), "-");
7267
}
7368

7469
static String getUpperValue(Range<String> range) {
70+
return getValue(range.getUpperBound(), "+");
71+
}
72+
73+
private static String getValue(Range.Bound<String> bound, String fallbackValue) {
7574

76-
if (range.getUpperBound().equals(Range.Bound.unbounded())) {
77-
return "+";
75+
if (bound.equals(Range.Bound.unbounded())) {
76+
return fallbackValue;
7877
}
7978

80-
return range.getUpperBound().getValue().orElse("+");
79+
return bound.getValue().map(it -> bound.isInclusive() ? it : "(" + it).orElse(fallbackValue);
8180
}
8281

8382
static List<Object> mapToList(Map<String, Object> map) {

src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -3490,7 +3490,7 @@ public void xPendingShouldLoadPendingMessages() {
34903490
actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
34913491
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
34923492

3493-
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L));
3493+
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
34943494

34953495
List<Object> results = getResults();
34963496
assertThat(results).hasSize(4);
@@ -3535,7 +3535,7 @@ public void xPendingShouldLoadPendingMessagesForConsumer() {
35353535
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
35363536

35373537
actual.add(connection.xPending(KEY_1, "my-group", "my-consumer",
3538-
org.springframework.data.domain.Range.open("-", "+"), 10L));
3538+
org.springframework.data.domain.Range.unbounded(), 10L));
35393539

35403540
List<Object> results = getResults();
35413541
assertThat(results).hasSize(4);
@@ -3558,7 +3558,7 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() {
35583558
StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
35593559

35603560
actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2",
3561-
org.springframework.data.domain.Range.open("-", "+"), 10L));
3561+
org.springframework.data.domain.Range.unbounded(), 10L));
35623562

35633563
List<Object> results = getResults();
35643564
assertThat(results).hasSize(4);
@@ -3574,7 +3574,7 @@ void xPendingShouldLoadEmptyPendingMessages() {
35743574
actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
35753575
actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
35763576

3577-
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.open("-", "+"), 10L));
3577+
actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
35783578

35793579
List<Object> results = getResults();
35803580
assertThat(results).hasSize(3);

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

+47-10
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,10 @@
3434
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
3535
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
3636
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
37-
import org.springframework.data.redis.connection.stream.Consumer;
38-
import org.springframework.data.redis.connection.stream.MapRecord;
39-
import org.springframework.data.redis.connection.stream.ObjectRecord;
40-
import org.springframework.data.redis.connection.stream.PendingMessages;
41-
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
42-
import org.springframework.data.redis.connection.stream.ReadOffset;
43-
import org.springframework.data.redis.connection.stream.RecordId;
44-
import org.springframework.data.redis.connection.stream.StreamOffset;
45-
import org.springframework.data.redis.connection.stream.StreamReadOptions;
46-
import org.springframework.data.redis.connection.stream.StreamRecords;
37+
import org.springframework.data.redis.connection.stream.*;
4738
import org.springframework.data.redis.test.condition.EnabledOnCommand;
4839
import org.springframework.data.redis.test.condition.EnabledOnRedisDriver;
40+
import org.springframework.data.redis.test.condition.EnabledOnRedisVersion;
4941
import org.springframework.data.redis.test.condition.RedisDetector;
5042
import org.springframework.data.redis.test.extension.RedisCluster;
5143
import org.springframework.data.redis.test.extension.RedisStanalone;
@@ -198,6 +190,28 @@ void rangeShouldReportMessages() {
198190
assertThat(message.getId()).isEqualTo(messageId1);
199191
}
200192

193+
@ParameterizedRedisTest // GH-2044
194+
@EnabledOnRedisVersion("6.2")
195+
void exclusiveRangeShouldReportMessages() {
196+
197+
K key = keyFactory.instance();
198+
HK hashKey = hashKeyFactory.instance();
199+
HV value = hashValueFactory.instance();
200+
201+
RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value));
202+
RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value));
203+
204+
List<MapRecord<K, HK, HV>> messages = streamOps.range(key,
205+
Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId2.getValue())));
206+
207+
assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId2);
208+
209+
messages = streamOps.range(key,
210+
Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId2.getValue())));
211+
212+
assertThat(messages).hasSize(1).extracting(Record::getId).contains(messageId1);
213+
}
214+
201215
@ParameterizedRedisTest // DATAREDIS-864
202216
void reverseRangeShouldReportMessages() {
203217

@@ -213,6 +227,29 @@ void reverseRangeShouldReportMessages() {
213227
assertThat(messages).hasSize(2).extracting("id").containsSequence(messageId2, messageId1);
214228
}
215229

230+
@ParameterizedRedisTest // GH-2044
231+
@EnabledOnRedisVersion("6.2")
232+
void exclusiveReverseRangeShouldReportMessages() {
233+
234+
K key = keyFactory.instance();
235+
HK hashKey = hashKeyFactory.instance();
236+
HV value = hashValueFactory.instance();
237+
238+
RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value));
239+
RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value));
240+
RecordId messageId3 = streamOps.add(key, Collections.singletonMap(hashKey, value));
241+
242+
List<MapRecord<K, HK, HV>> messages = streamOps.reverseRange(key,
243+
Range.from(Bound.exclusive(messageId1.getValue())).to(Bound.inclusive(messageId3.getValue())));
244+
245+
assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId3, messageId2);
246+
247+
messages = streamOps.reverseRange(key,
248+
Range.from(Bound.inclusive(messageId1.getValue())).to(Bound.exclusive(messageId3.getValue())));
249+
250+
assertThat(messages).hasSize(2).extracting(Record::getId).containsSequence(messageId2, messageId1);
251+
}
252+
216253
@ParameterizedRedisTest // DATAREDIS-864
217254
void reverseRangeShouldConvertSimpleMessages() {
218255

0 commit comments

Comments
 (0)