From f320d66b5b7d9c43a12259b64ffb5fdc3d4c628a Mon Sep 17 00:00:00 2001 From: jinkshower Date: Sun, 7 Jul 2024 21:13:59 +0900 Subject: [PATCH] Add overloads for XAddOptions in StreamOperations. Closes #2915 --- .../connection/ReactiveStreamCommands.java | 33 +++- .../core/DefaultReactiveStreamOperations.java | 14 ++ .../redis/core/DefaultStreamOperations.java | 17 ++ .../redis/core/ReactiveStreamOperations.java | 59 +++++++ .../data/redis/core/StreamOperations.java | 49 ++++++ ...ctiveStreamOperationsIntegrationTests.java | 166 ++++++++++++++++++ ...faultStreamOperationsIntegrationTests.java | 151 ++++++++++++++++ 7 files changed, 488 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 460a8cc3fe..8ba39b08e0 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -32,6 +32,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -58,6 +59,7 @@ * @author Tugdual Grall * @author Dengliming * @author Mark John Moreno + * @author jinkshower * @since 2.2 */ public interface ReactiveStreamCommands { @@ -394,11 +396,40 @@ default Mono xAdd(ByteBufferRecord record) { return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput); } + /** + * Add stream record with the specified options. + * + * @param record must not be {@literal null}. + * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}. + * @return {@link Mono} the {@link RecordId id}. + * @see Redis Documentation: XADD + * @since 3.3 + */ + default Mono xAdd(ByteBufferRecord record, XAddOptions xAddOptions) { + + Assert.notNull(record, "Record must not be null"); + Assert.notNull(xAddOptions, "XAddOptions must not be null"); + + AddStreamRecord addStreamRecord = AddStreamRecord.of(record) + .approximateTrimming(xAddOptions.isApproximateTrimming()) + .makeNoStream(xAddOptions.isNoMkStream()); + + if (xAddOptions.hasMaxlen()) { + addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen()); + } + + if (xAddOptions.hasMinId()) { + addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId()); + } + + return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput); + } + /** * Add stream record with given {@literal body} to {@literal key}. * * @param commands must not be {@literal null}. - * @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands. + * @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands. * @see Redis Documentation: XADD */ Flux> xAdd(Publisher commands); diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java index 432cf77283..a3d7b93668 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java @@ -33,6 +33,7 @@ import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.ReactiveStreamCommands; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.convert.Converters; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -60,6 +61,7 @@ * @author Christoph Strobl * @author Marcin Zielinski * @author John Blum + * @author jinkshower * @since 2.2 */ class DefaultReactiveStreamOperations implements ReactiveStreamOperations { @@ -146,6 +148,18 @@ public Mono add(Record record) { return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input))); } + @Override + public Mono add(Record record, XAddOptions xAddOptions) { + + Assert.notNull(record.getStream(), "Key must not be null"); + Assert.notNull(record.getValue(), "Body must not be null"); + Assert.notNull(xAddOptions, "XAddOptions must not be null"); + + MapRecord input = StreamObjectMapper.toMapRecord(this, record); + + return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions)); + } + @Override public Flux> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) { diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java index 52eaa318ae..a53b18d030 100644 --- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java @@ -26,6 +26,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -54,6 +55,7 @@ * @author Christoph Strobl * @author Marcin Zielinski * @author John Blum + * @author jinkshower * @since 2.2 */ class DefaultStreamOperations extends AbstractOperations implements StreamOperations { @@ -136,6 +138,21 @@ public RecordId add(Record record) { return execute(connection -> connection.xAdd(binaryRecord)); } + @Nullable + @Override + @SuppressWarnings("unchecked") + public RecordId add(Record record, XAddOptions options) { + + Assert.notNull(record, "Record must not be null"); + Assert.notNull(options, "XAddOptions must not be null"); + + MapRecord input = StreamObjectMapper.toMapRecord(this, record); + + ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer()); + + return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options)); + } + @Override public List> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) { diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java index fc3ca29651..e4b905030e 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java @@ -26,6 +26,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.stream.ByteBufferRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -54,6 +55,7 @@ * @author Dengliming * @author Marcin Zielinski * @author John Blum + * @author jinkshower * @since 2.2 */ public interface ReactiveStreamOperations extends HashMapperProvider { @@ -94,6 +96,63 @@ default Mono acknowledge(String group, Record record) { return acknowledge(record.getRequiredStream(), group, record.getId()); } + /** + * Append one or more records to the stream {@code key} with the specified options. + * + * @param key the stream key. + * @param bodyPublisher record body {@link Publisher}. + * @param xAddOptions parameters for the {@literal XADD} call. + * @return the record Ids. + * @see Redis Documentation: XADD + * @since 3.3 + */ + default Flux add (K key, Publisher> bodyPublisher, + XAddOptions xAddOptions) { + return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions)); + } + + /** + * Append a record to the stream {@code key} with the specified options. + * + * @param key the stream key. + * @param content record content as Map. + * @param xAddOptions parameters for the {@literal XADD} call. + * @return the {@link Mono} emitting the {@link RecordId}. + * @see Redis Documentation: XADD + * @since 3.3 + */ + default Mono add(K key, Map content, XAddOptions xAddOptions) { + return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions); + } + + /** + * Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options. + * + * @param record the record to append. + * @param xAddOptions parameters for the {@literal XADD} call. + * @return the {@link Mono} emitting the {@link RecordId}. + * @see Redis Documentation: XADD + * @since 3.3 + */ + @SuppressWarnings("unchecked") + default Mono add(MapRecord record, XAddOptions xAddOptions) { + return add((Record) record, xAddOptions); + } + + /** + * Append the record, backed by the given value, to the stream with the specified options. + * The value will be hashed and serialized. + * + * @param record must not be {@literal null}. + * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}. + * @return the {@link Mono} emitting the {@link RecordId}. + * @see MapRecord + * @see ObjectRecord + * @see Redis Documentation: XADD + * @since 3.3 + */ + Mono add(Record record, XAddOptions xAddOptions); + /** * Append one or more records to the stream {@code key}. * diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java index 4636c5bcf6..c38a524ff0 100644 --- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java +++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java @@ -25,6 +25,7 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -53,6 +54,7 @@ * @author Dengliming * @author Marcin Zielinski * @author John Blum + * @author jinkshower * @since 2.2 */ public interface StreamOperations extends HashMapperProvider { @@ -95,6 +97,53 @@ default Long acknowledge(String group, Record record) { return acknowledge(record.getRequiredStream(), group, record.getId()); } + /** + * Append a record to the stream {@code key} with the specified options. + * + * @param key the stream key. + * @param content record content as Map. + * @param xAddOptions additional parameters for the {@literal XADD} call. + * @return the record Id. {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XADD + * @since 3.3 + */ + @SuppressWarnings("unchecked") + @Nullable + default RecordId add(K key, Map content, XAddOptions xAddOptions) { + return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions); + } + + /** + * Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options. + * + * @param record the record to append. + * @param xAddOptions additional parameters for the {@literal XADD} call. + * @return the record Id. {@literal null} when used in pipeline / transaction. + * @see Redis Documentation: XADD + * @since 3.3 + */ + @SuppressWarnings("unchecked") + @Nullable + default RecordId add(MapRecord record, XAddOptions xAddOptions) { + return add((Record) record, xAddOptions); + } + + /** + * Append the record, backed by the given value, to the stream with the specified options. + * The value will be hashed and serialized. + * + * @param record must not be {@literal null}. + * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}. + * @return the record Id. {@literal null} when used in pipeline / transaction. + * @see MapRecord + * @see ObjectRecord + * @see Redis Documentation: XADD + * @since 3.3 + */ + @SuppressWarnings("unchecked") + @Nullable + RecordId add(Record record, XAddOptions xAddOptions); + /** * Append a record to the stream {@code key}. * diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java index c6040d795c..27cf52f0f7 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java @@ -35,6 +35,7 @@ import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; @@ -61,6 +62,7 @@ * @author Mark Paluch * @author Christoph Strobl * @author Marcin Zielinski + * @author jinkshower */ @MethodSource("testParams") @SuppressWarnings("unchecked") @@ -192,6 +194,170 @@ void addShouldAddReadSimpleMessageWithRawSerializer() { .verifyComplete(); } + @ParameterizedRedisTest // GH-2915 + void addMaxLenShouldLimitMessagesSize() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + HV newValue = valueFactory.instance(); + XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false); + + RecordId messageId = streamOperations.add(key, Collections.singletonMap(hashKey, newValue), options).block(); + + streamOperations.range(key, Range.unbounded()).as(StepVerifier::create) + .consumeNextWith(actual -> { + + assertThat(actual.getId()).isEqualTo(messageId); + assertThat(actual.getStream()).isEqualTo(key); + assertThat(actual).hasSize(1); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(actual.getValue()).containsEntry(hashKey, newValue); + } + + }) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2915 + void addMaxLenShouldLimitSimpleMessagesSize() { + + assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer) + && !(serializer instanceof GenericJackson2JsonRedisSerializer) + && !(serializer instanceof JdkSerializationRedisSerializer) && !(serializer instanceof OxmSerializer)); + + K key = keyFactory.instance(); + HV value = valueFactory.instance(); + + streamOperations.add(StreamRecords.objectBacked(value).withStreamKey(key)).block(); + + HV newValue = valueFactory.instance(); + XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false); + + RecordId messageId = streamOperations.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block(); + + streamOperations.range((Class) value.getClass(), key, Range.unbounded()).as(StepVerifier::create) + .consumeNextWith(actual -> { + + assertThat(actual.getId()).isEqualTo(messageId); + assertThat(actual.getStream()).isEqualTo(key); + assertThat(actual.getValue()).isEqualTo(newValue); + + }) + .expectNextCount(0) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2915 + void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() { + + assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer) + && !(serializer instanceof GenericJackson2JsonRedisSerializer)); + + SerializationPair keySerializer = redisTemplate.getSerializationContext().getKeySerializationPair(); + + RedisSerializationContext serializationContext = RedisSerializationContext + . newSerializationContext(StringRedisSerializer.UTF_8).key(keySerializer) + .hashValue(SerializationPair.raw()).hashKey(SerializationPair.raw()).build(); + + ReactiveRedisTemplate raw = new ReactiveRedisTemplate<>(redisTemplate.getConnectionFactory(), + serializationContext); + + K key = keyFactory.instance(); + Person value = new PersonObjectFactory().instance(); + + raw.opsForStream().add(StreamRecords.objectBacked(value).withStreamKey(key)).block(); + + Person newValue = new PersonObjectFactory().instance(); + XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false); + + RecordId messageId = raw.opsForStream().add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block(); + + raw.opsForStream().range((Class) value.getClass(), key, Range.unbounded()).as(StepVerifier::create) + .consumeNextWith(it -> { + + assertThat(it.getId()).isEqualTo(messageId); + assertThat(it.getStream()).isEqualTo(key); + assertThat(it.getValue()).isEqualTo(newValue); + + }) + .expectNextCount(0) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2915 + void addMinIdShouldEvictLowerIdMessages() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + XAddOptions options = XAddOptions.none().minId(messageId1); + + RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + + streamOperations.range(key, Range.unbounded()).as(StepVerifier::create) + .consumeNextWith(actual -> { + assertThat(actual.getId()).isEqualTo(messageId1); + assertThat(actual.getStream()).isEqualTo(key); + }) + .consumeNextWith(actual -> { + assertThat(actual.getId()).isEqualTo(messageId2); + assertThat(actual.getStream()).isEqualTo(key); + }) + .expectNextCount(0) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2915 + void addMakeNoStreamShouldNotCreateStreamWhenNoStreamExists() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + XAddOptions options = XAddOptions.makeNoStream(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + + streamOperations.size(key).as(StepVerifier::create) + .expectNext(0L) + .verifyComplete(); + + streamOperations.range(key, Range.unbounded()).as(StepVerifier::create) + .expectNextCount(0L) + .verifyComplete(); + } + + @ParameterizedRedisTest // GH-2915 + void addMakeNoStreamShouldCreateStreamWhenStreamExists() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = valueFactory.instance(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value)).block(); + + XAddOptions options = XAddOptions.makeNoStream(); + + streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block(); + + streamOperations.size(key).as(StepVerifier::create) + .expectNext(2L) + .verifyComplete(); + + streamOperations.range(key, Range.unbounded()).as(StepVerifier::create) + .expectNextCount(2L) + .verifyComplete(); + } + @ParameterizedRedisTest // DATAREDIS-864 void rangeShouldReportMessages() { diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java index eb2cb33079..06ab8c5982 100644 --- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java @@ -32,6 +32,7 @@ import org.springframework.data.redis.Person; import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension; @@ -51,6 +52,7 @@ * @author Mark Paluch * @author Christoph Strobl * @author Marcin Zielinski + * @author jinkshower */ @MethodSource("testParams") @EnabledOnCommand("XADD") @@ -149,6 +151,155 @@ void addShouldAddReadSimpleMessage() { assertThat(message.getValue()).isEqualTo(value); } + @ParameterizedRedisTest // GH-2915 + void addMaxLenShouldLimitMessagesSize() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + + HV newValue = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, newValue), options); + + List> messages = streamOps.range(key, Range.unbounded()); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, newValue); + } + } + + @ParameterizedRedisTest // GH-2915 + void addMaxLenShouldLimitSimpleMessagesSize() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + + HV newValue = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false); + + RecordId messageId = streamOps.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options); + + List> messages = streamOps.range((Class) value.getClass(), key, Range.unbounded()); + + assertThat(messages).hasSize(1); + + ObjectRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + assertThat(message.getValue()).isEqualTo(newValue); + } + + @ParameterizedRedisTest // GH-2915 + void addMinIdShouldEvictLowerIdMessages() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + XAddOptions options = XAddOptions.none().minId(messageId1); + + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value), options); + + List> messages = streamOps.range(key, Range.unbounded()); + + assertThat(messages).hasSize(2); + + MapRecord message1 = messages.get(0); + + assertThat(message1.getId()).isEqualTo(messageId1); + assertThat(message1.getStream()).isEqualTo(key); + + MapRecord message2 = messages.get(1); + + assertThat(message2.getId()).isEqualTo(messageId2); + assertThat(message2.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message1.getValue()).containsEntry(hashKey, value); + assertThat(message2.getValue()).containsEntry(hashKey, value); + } + } + + @ParameterizedRedisTest // GH-2915 + void addMinIdShouldEvictLowerIdSimpleMessages() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + RecordId messageId1 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + + XAddOptions options = XAddOptions.none().minId(messageId1); + + RecordId messageId2 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + + List> messages = streamOps.range((Class) value.getClass(), key, Range.unbounded()); + + assertThat(messages).hasSize(2); + + ObjectRecord message1 = messages.get(0); + + assertThat(message1.getId()).isEqualTo(messageId1); + assertThat(message1.getStream()).isEqualTo(key); + assertThat(message1.getValue()).isEqualTo(value); + + ObjectRecord message2 = messages.get(1); + + assertThat(message2.getId()).isEqualTo(messageId2); + assertThat(message2.getStream()).isEqualTo(key); + assertThat(message2.getValue()).isEqualTo(value); + } + + @ParameterizedRedisTest // GH-2915 + void addMakeNoStreamShouldNotCreateStreamWhenNoStreamExists() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + XAddOptions options = XAddOptions.makeNoStream(); + + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + + assertThat(streamOps.size(key)).isZero(); + assertThat(streamOps.range(key, Range.unbounded())).isEmpty(); + } + + @ParameterizedRedisTest // GH-2915 + void addMakeNoStreamShouldCreateStreamWhenStreamExists() { + + K key = keyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key)); + + XAddOptions options = XAddOptions.makeNoStream(); + + streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options); + + assertThat(streamOps.size(key)).isEqualTo(2); + assertThat(streamOps.range(key, Range.unbounded())).hasSize(2); + } + @ParameterizedRedisTest // DATAREDIS-864 void simpleMessageReadWriteSymmetry() {