Skip to content

Commit b7f26fa

Browse files
jinkshowermp911de
authored andcommitted
Fix XAddOptions maxlen handling and XPendingOptions validation.
Closes #2982 Original pull request: #2985
1 parent 15f541b commit b7f26fa

File tree

5 files changed

+104
-12
lines changed

5 files changed

+104
-12
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public Long getMaxlen() {
337337
* @since 2.3
338338
*/
339339
public boolean hasMaxlen() {
340-
return maxlen != null && maxlen > 0;
340+
return maxlen != null;
341341
}
342342

343343
/**
@@ -685,7 +685,7 @@ default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName)
685685
Assert.notNull(key, "Key must not be null");
686686
Assert.notNull(groupName, "GroupName must not be null");
687687

688-
return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
688+
return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next()
689689
.map(CommandResponse::getOutput);
690690
}
691691

@@ -726,7 +726,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
726726
*/
727727
@Nullable
728728
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
729-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
729+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next()
730730
.map(CommandResponse::getOutput);
731731
}
732732

@@ -743,7 +743,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
743743
* @since 2.3
744744
*/
745745
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
746-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
746+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next()
747747
.map(CommandResponse::getOutput);
748748
}
749749

@@ -779,8 +779,8 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
779779
*/
780780
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
781781
Long count) {
782-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
783-
.map(CommandResponse::getOutput);
782+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)))
783+
.next().map(CommandResponse::getOutput);
784784
}
785785

786786
/**
@@ -832,9 +832,15 @@ static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
832832
/**
833833
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
834834
*
835+
* @param range must not be {@literal null}.
836+
* @param count the max number of messages to return. Must not be negative.
835837
* @return new instance of {@link XPendingOptions}.
836838
*/
837-
public PendingRecordsCommand range(Range<String> range, Long count) {
839+
public PendingRecordsCommand range(Range<?> range, Long count) {
840+
841+
Assert.notNull(range, "Range must not be null");
842+
Assert.isTrue(count > -1, "Count must not be negative");
843+
838844
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
839845
}
840846

@@ -886,7 +892,7 @@ public boolean hasConsumer() {
886892
* @return {@literal true} count is set.
887893
*/
888894
public boolean isLimited() {
889-
return count != null && count > -1;
895+
return count != null;
890896
}
891897
}
892898

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public Long getMaxlen() {
214214
* @return {@literal true} if {@literal MAXLEN} is set.
215215
*/
216216
public boolean hasMaxlen() {
217-
return maxlen != null && maxlen > 0;
217+
return maxlen != null;
218218
}
219219

220220
/**
@@ -788,19 +788,28 @@ public static XPendingOptions unbounded() {
788788
/**
789789
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
790790
*
791-
* @param count the max number of messages to return. Must not be {@literal null}.
791+
* @param count the max number of messages to return. Must not be negative.
792792
* @return new instance of {@link XPendingOptions}.
793793
*/
794794
public static XPendingOptions unbounded(Long count) {
795+
796+
Assert.isTrue(count > -1, "Count must not be negative");
797+
795798
return new XPendingOptions(null, Range.unbounded(), count);
796799
}
797800

798801
/**
799802
* Create new {@link XPendingOptions} with given {@link Range} and limit.
800803
*
804+
* @param range must not be {@literal null}.
805+
* @param count the max number of messages to return. Must not be negative.
801806
* @return new instance of {@link XPendingOptions}.
802807
*/
803808
public static XPendingOptions range(Range<?> range, Long count) {
809+
810+
Assert.notNull(range, "Range must not be null");
811+
Assert.isTrue(count > -1, "Count must not be negative");
812+
804813
return new XPendingOptions(null, range, count);
805814
}
806815

@@ -848,7 +857,7 @@ public boolean hasConsumer() {
848857
* @return {@literal true} count is set.
849858
*/
850859
public boolean isLimited() {
851-
return count != null && count > -1;
860+
return count != null;
852861
}
853862
}
854863

src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public StreamReadOptions block(Duration timeout) {
9696
*/
9797
public StreamReadOptions count(long count) {
9898

99-
Assert.isTrue(count > 0, "Count must be greater or equal to zero");
99+
Assert.isTrue(count > 0, "Count must be greater than zero");
100100

101101
return new StreamReadOptions(block, count, noack);
102102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.springframework.data.redis.connection;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import java.nio.ByteBuffer;
6+
7+
import org.junit.jupiter.api.Test;
8+
9+
import org.springframework.data.domain.Range;
10+
import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand;
11+
12+
/**
13+
* Unit tests for {@link ReactiveStreamCommands}.
14+
*
15+
* @author jinkshower
16+
*/
17+
class ReactiveStreamCommandsUnitTests {
18+
19+
@Test // GH-2982
20+
void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() {
21+
22+
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
23+
String groupName = "my-group";
24+
25+
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
26+
27+
assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
28+
}
29+
30+
@Test // GH-2982
31+
void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {
32+
33+
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
34+
String groupName = "my-group";
35+
36+
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
37+
Range<?> range = Range.closed("0", "10");
38+
39+
assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.springframework.data.redis.connection;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import org.junit.jupiter.api.Test;
6+
7+
import org.springframework.data.domain.Range;
8+
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
9+
10+
/**
11+
* Unit tests for {@link RedisStreamCommands}.
12+
*
13+
* @author jinkshower
14+
*/
15+
class RedisStreamCommandsUnitTests {
16+
17+
@Test // GH-2982
18+
void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() {
19+
20+
assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class);
21+
}
22+
23+
@Test // GH-2982
24+
void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() {
25+
26+
assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
27+
}
28+
29+
@Test // GH-2982
30+
void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {
31+
32+
Range<?> range = Range.closed("0", "10");
33+
34+
assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
35+
}
36+
}

0 commit comments

Comments
 (0)