Skip to content

Commit d631e5c

Browse files
committed
Fix XAddOptions maxlen handling and XPendingOptions validation
Closes spring-projects#2982
1 parent fb0f0bc commit d631e5c

File tree

7 files changed

+160
-12
lines changed

7 files changed

+160
-12
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public Long getMaxlen() {
337337
* @since 2.3
338338
*/
339339
public boolean hasMaxlen() {
340-
return maxlen != null && maxlen > 0;
340+
return maxlen != null;
341341
}
342342

343343
/**
@@ -685,7 +685,7 @@ default Mono<PendingMessagesSummary> xPending(ByteBuffer key, String groupName)
685685
Assert.notNull(key, "Key must not be null");
686686
Assert.notNull(groupName, "GroupName must not be null");
687687

688-
return xPendingSummary(Mono.just(new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null))).next()
688+
return xPendingSummary(Mono.just(PendingRecordsCommand.pending(key, groupName))).next()
689689
.map(CommandResponse::getOutput);
690690
}
691691

@@ -726,7 +726,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer) {
726726
*/
727727
@Nullable
728728
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName) {
729-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, Range.unbounded(), null))).next()
729+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName))).next()
730730
.map(CommandResponse::getOutput);
731731
}
732732

@@ -743,7 +743,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
743743
* @since 2.3
744744
*/
745745
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count) {
746-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, null, range, count))).next()
746+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count))).next()
747747
.map(CommandResponse::getOutput);
748748
}
749749

@@ -779,8 +779,8 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
779779
*/
780780
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
781781
Long count) {
782-
return xPending(Mono.just(new PendingRecordsCommand(key, groupName, consumerName, range, count))).next()
783-
.map(CommandResponse::getOutput);
782+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)))
783+
.next().map(CommandResponse::getOutput);
784784
}
785785

786786
/**
@@ -832,9 +832,15 @@ static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
832832
/**
833833
* Create new {@link PendingRecordsCommand} with given {@link Range} and limit.
834834
*
835+
* @param range must not be {@literal null}.
836+
* @param count the max number of messages to return. Must not be negative.
835837
* @return new instance of {@link XPendingOptions}.
836838
*/
837-
public PendingRecordsCommand range(Range<String> range, Long count) {
839+
public PendingRecordsCommand range(Range<?> range, Long count) {
840+
841+
Assert.notNull(range, "Range must not be null");
842+
Assert.isTrue(count > -1, "Count must not be negative");
843+
838844
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
839845
}
840846

@@ -886,7 +892,7 @@ public boolean hasConsumer() {
886892
* @return {@literal true} count is set.
887893
*/
888894
public boolean isLimited() {
889-
return count != null && count > -1;
895+
return count != null;
890896
}
891897
}
892898

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public Long getMaxlen() {
214214
* @return {@literal true} if {@literal MAXLEN} is set.
215215
*/
216216
public boolean hasMaxlen() {
217-
return maxlen != null && maxlen > 0;
217+
return maxlen != null;
218218
}
219219

220220
/**
@@ -788,19 +788,28 @@ public static XPendingOptions unbounded() {
788788
/**
789789
* Create new {@link XPendingOptions} with an unbounded {@link Range} ({@literal - +}).
790790
*
791-
* @param count the max number of messages to return. Must not be {@literal null}.
791+
* @param count the max number of messages to return. Must not be negative.
792792
* @return new instance of {@link XPendingOptions}.
793793
*/
794794
public static XPendingOptions unbounded(Long count) {
795+
796+
Assert.isTrue(count > -1, "Count must not be negative");
797+
795798
return new XPendingOptions(null, Range.unbounded(), count);
796799
}
797800

798801
/**
799802
* Create new {@link XPendingOptions} with given {@link Range} and limit.
800803
*
804+
* @param range must not be {@literal null}.
805+
* @param count the max number of messages to return. Must not be negative.
801806
* @return new instance of {@link XPendingOptions}.
802807
*/
803808
public static XPendingOptions range(Range<?> range, Long count) {
809+
810+
Assert.notNull(range, "Range must not be null");
811+
Assert.isTrue(count > -1, "Count must not be negative");
812+
804813
return new XPendingOptions(null, range, count);
805814
}
806815

@@ -848,7 +857,7 @@ public boolean hasConsumer() {
848857
* @return {@literal true} count is set.
849858
*/
850859
public boolean isLimited() {
851-
return count != null && count > -1;
860+
return count != null;
852861
}
853862
}
854863

src/main/java/org/springframework/data/redis/connection/stream/StreamReadOptions.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public StreamReadOptions block(Duration timeout) {
9696
*/
9797
public StreamReadOptions count(long count) {
9898

99-
Assert.isTrue(count > 0, "Count must be greater or equal to zero");
99+
Assert.isTrue(count > 0, "Count must be greater than zero");
100100

101101
return new StreamReadOptions(block, count, noack);
102102
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.springframework.data.redis.connection;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import java.nio.ByteBuffer;
6+
7+
import org.junit.jupiter.api.Test;
8+
9+
import org.springframework.data.domain.Range;
10+
import org.springframework.data.redis.connection.ReactiveStreamCommands.PendingRecordsCommand;
11+
12+
/**
13+
* Unit tests for {@link ReactiveStreamCommands}.
14+
*
15+
* @author jinkshower
16+
*/
17+
class ReactiveStreamCommandsUnitTests {
18+
19+
@Test // GH-2982
20+
void pendingRecordsCommandRangeShouldThrowExceptionWhenRangeIsNull() {
21+
22+
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
23+
String groupName = "my-group";
24+
25+
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
26+
27+
assertThatThrownBy(() -> command.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
28+
}
29+
30+
@Test // GH-2982
31+
void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {
32+
33+
ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
34+
String groupName = "my-group";
35+
36+
PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
37+
Range<?> range = Range.closed("0", "10");
38+
39+
assertThatThrownBy(() -> command.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.springframework.data.redis.connection;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import org.junit.jupiter.api.Test;
6+
7+
import org.springframework.data.domain.Range;
8+
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
9+
10+
/**
11+
* Unit tests for {@link RedisStreamCommands}.
12+
*
13+
* @author jinkshower
14+
*/
15+
class RedisStreamCommandsUnitTests {
16+
17+
@Test // GH-2982
18+
void xPendingOptionsUnboundedShouldThrowExceptionWhenCountIsNegative() {
19+
20+
assertThatThrownBy(() -> XPendingOptions.unbounded(-1L)).isInstanceOf(IllegalArgumentException.class);
21+
}
22+
23+
@Test // GH-2982
24+
void xPendingOptionsRangeShouldThrowExceptionWhenRangeIsNull() {
25+
26+
assertThatThrownBy(() -> XPendingOptions.range(null, 10L)).isInstanceOf(IllegalArgumentException.class);
27+
}
28+
29+
@Test // GH-2982
30+
void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {
31+
32+
Range<?> range = Range.closed("0", "10");
33+
34+
assertThatThrownBy(() -> XPendingOptions.range(range, -1L)).isInstanceOf(IllegalArgumentException.class);
35+
}
36+
}

src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java

+30
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,21 @@ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() {
289289
.verifyComplete();
290290
}
291291

292+
@ParameterizedRedisTest // GH-2982
293+
void addNegativeMaxlenShouldThrowException() {
294+
295+
K key = keyFactory.instance();
296+
HK hashKey = hashKeyFactory.instance();
297+
HV value = valueFactory.instance();
298+
299+
XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);
300+
301+
streamOperations.add(key, Collections.singletonMap(hashKey, value), options).as(StepVerifier::create)
302+
.expectError(IllegalArgumentException.class).verify();
303+
304+
streamOperations.range(key, Range.unbounded()).as(StepVerifier::create).expectNextCount(0L).verifyComplete();
305+
}
306+
292307
@ParameterizedRedisTest // GH-2915
293308
void addMinIdShouldEvictLowerIdMessages() {
294309

@@ -528,6 +543,21 @@ void pendingShouldReadMessageDetails() {
528543

529544
}
530545

546+
@ParameterizedRedisTest // GH-2982
547+
void pendingNegativeCountShouldThrowException() {
548+
549+
K key = keyFactory.instance();
550+
HK hashKey = hashKeyFactory.instance();
551+
HV value = valueFactory.instance();
552+
553+
streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
554+
555+
streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").block();
556+
557+
streamOperations.pending(key, "my-group", Range.unbounded(), -1L).as(StepVerifier::create)
558+
.expectError(IllegalArgumentException.class).verify();
559+
}
560+
531561
@ParameterizedRedisTest // GH-2465
532562
void claimShouldReadMessageDetails() {
533563

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

+26
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,19 @@ void addMaxLenShouldLimitSimpleMessagesSize() {
206206
assertThat(message.getValue()).isEqualTo(newValue);
207207
}
208208

209+
@ParameterizedRedisTest // GH-2982
210+
void addNegativeMaxlenShouldThrowException() {
211+
212+
K key = keyFactory.instance();
213+
HV value = hashValueFactory.instance();
214+
215+
XAddOptions options = XAddOptions.maxlen(-1).approximateTrimming(false);
216+
217+
assertThatThrownBy(() -> streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options));
218+
219+
assertThat(streamOps.range(key, Range.unbounded())).isEmpty();
220+
}
221+
209222
@ParameterizedRedisTest // GH-2915
210223
void addMinIdShouldEvictLowerIdMessages() {
211224

@@ -565,6 +578,19 @@ void pendingShouldReadMessageDetails() {
565578
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
566579
}
567580

581+
@ParameterizedRedisTest // GH-2982
582+
void pendingNegativeCountShouldThrowException() {
583+
K key = keyFactory.instance();
584+
HK hashKey = hashKeyFactory.instance();
585+
HV value = hashValueFactory.instance();
586+
587+
streamOps.add(key, Collections.singletonMap(hashKey, value));
588+
streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");
589+
590+
assertThatThrownBy(() -> streamOps.pending(key, "my-group", Range.unbounded(), -1L))
591+
.isInstanceOf(IllegalArgumentException.class);
592+
}
593+
568594
@ParameterizedRedisTest // GH-2465
569595
void claimShouldReadMessageDetails() {
570596

0 commit comments

Comments
 (0)