From 7c43c6821df81abce3f9fc8126d4f1de1d8dd0a5 Mon Sep 17 00:00:00 2001 From: Mark John Moreno Date: Tue, 13 Jul 2021 22:40:29 +0800 Subject: [PATCH] Add NOMKSTREAM option to XADD command Closes: #2047 --- .../connection/ReactiveStreamCommands.java | 42 ++++++++++++++--- .../redis/connection/RedisStreamCommands.java | 45 ++++++++++++++++--- .../LettuceReactiveStreamCommands.java | 2 + .../lettuce/LettuceStreamCommands.java | 2 + .../lettuce/LettuceConnectionUnitTests.java | 17 +++++++ ...ttuceReactiveRedisConnectionUnitTests.java | 18 ++++++++ 6 files changed, 116 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index dc46f75c1b..5dc3e883e7 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -58,6 +58,7 @@ * @author Christoph Strobl * @author Tugdual Grall * @author Dengliming + * @author Mark John Moreno * @since 2.2 */ public interface ReactiveStreamCommands { @@ -199,12 +200,14 @@ class AddStreamRecord extends KeyCommand { private final ByteBufferRecord record; private final @Nullable Long maxlen; + private final boolean nomkstream; - private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen) { + private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream) { super(record.getStream()); this.record = record; this.maxlen = maxlen; + this.nomkstream = nomkstream; } /** @@ -217,7 +220,7 @@ public static AddStreamRecord of(ByteBufferRecord record) { Assert.notNull(record, "Record must not be null!"); - return new AddStreamRecord(record, null); + return new AddStreamRecord(record, null, false); } /** @@ -230,7 +233,7 @@ public static AddStreamRecord body(Map body) { Assert.notNull(body, "Body must not be null!"); - return new AddStreamRecord(StreamRecords.rawBuffer(body), null); + return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false); } /** @@ -240,7 +243,7 @@ public static AddStreamRecord body(Map body) { * @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied. */ public AddStreamRecord to(ByteBuffer key) { - return new AddStreamRecord(record.withStreamKey(key), maxlen); + return new AddStreamRecord(record.withStreamKey(key), maxlen, false); } /** @@ -249,7 +252,28 @@ public AddStreamRecord to(ByteBuffer key) { * @return new instance of {@link AddStreamRecord}. */ public AddStreamRecord maxlen(long maxlen) { - return new AddStreamRecord(record, maxlen); + return new AddStreamRecord(record, maxlen, false); + } + + /** + * Disable creation of stream if it does not already exist. + * + * @return new instance of {@link AddStreamRecord}. + * @since 2.6 + */ + public AddStreamRecord makeNoStream() { + return new AddStreamRecord(record, maxlen, true); + } + + /** + * Disable creation of stream if it does not already exist. + * + * @param makeNoStream {@code true} to not create a stream if it does not already exist. + * @return new instance of {@link AddStreamRecord}. + * @since 2.6 + */ + public AddStreamRecord makeNoStream(boolean makeNoStream) { + return new AddStreamRecord(record, maxlen, makeNoStream); } /** @@ -281,6 +305,14 @@ public Long getMaxlen() { public boolean hasMaxlen() { return maxlen != null && maxlen > 0; } + + /** + * @return {@literal true} if {@literal NOMKSTREAM} is set. + * @since 2.6 + */ + public boolean isNoMkStream() { + return nomkstream; + } } /** diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index f46cc27324..7f50913c65 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -41,6 +41,7 @@ * @author Christoph Strobl * @author Tugdual Grall * @author Dengliming + * @author Mark John Moreno * @see Redis Documentation - Streams * @since 2.2 */ @@ -116,16 +117,19 @@ default RecordId xAdd(MapRecord record) { * Additional options applicable for {@literal XADD} command. * * @author Christoph Strobl + * @author Mark John Moreno * @since 2.3 */ class XAddOptions { - private static final XAddOptions NONE = new XAddOptions(null); + private static final XAddOptions NONE = new XAddOptions(null, false); private final @Nullable Long maxlen; + private final boolean nomkstream; - private XAddOptions(@Nullable Long maxlen) { + private XAddOptions(@Nullable Long maxlen, boolean nomkstream) { this.maxlen = maxlen; + this.nomkstream = nomkstream; } /** @@ -141,7 +145,28 @@ public static XAddOptions none() { * @return new instance of {@link XAddOptions}. */ public static XAddOptions maxlen(long maxlen) { - return new XAddOptions(maxlen); + return new XAddOptions(maxlen, false); + } + + /** + * Disable creation of stream if it does not already exist. + * + * @return new instance of {@link XAddOptions}. + * @since 2.6 + */ + public static XAddOptions makeNoStream() { + return new XAddOptions(null, true); + } + + /** + * Disable creation of stream if it does not already exist. + * + * @param makeNoStream {@code true} to not create a stream if it does not already exist. + * @return new instance of {@link XAddOptions}. + * @since 2.6 + */ + public static XAddOptions makeNoStream(boolean makeNoStream) { + return new XAddOptions(null, makeNoStream); } /** @@ -161,6 +186,14 @@ public boolean hasMaxlen() { return maxlen != null && maxlen > 0; } + /** + * @return {@literal true} if {@literal NOMKSTREAM} is set. + * @since 2.6 + */ + public boolean isNoMkStream() { + return nomkstream; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -169,14 +202,16 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - XAddOptions that = (XAddOptions) o; + if (this.nomkstream != that.nomkstream) return false; return ObjectUtils.nullSafeEquals(this.maxlen, that.maxlen); } @Override public int hashCode() { - return ObjectUtils.nullSafeHashCode(this.maxlen); + int result = ObjectUtils.nullSafeHashCode(this.maxlen); + result = 31 * result + ObjectUtils.nullSafeHashCode(this.nomkstream); + return result; } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 8797d81d21..7fe6d03974 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -55,6 +55,7 @@ * @author Mark Paluch * @author Tugdual Grall * @author Dengliming + * @author Mark John Moreno * @since 2.2 */ class LettuceReactiveStreamCommands implements ReactiveStreamCommands { @@ -110,6 +111,7 @@ public Flux> xAdd(Publisher new CommandResponse<>(command, RecordId.of(value))); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 36f6564191..6f137850bd 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -49,6 +49,7 @@ * @author Tugdual Grall * @author Dejan Jankov * @author Dengliming + * @author Mark John Moreno * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -90,6 +91,7 @@ public RecordId xAdd(MapRecord record, XAddOptions optio if (options.hasMaxlen()) { args.maxlen(options.getMaxlen()); } + args.nomkstream(options.isNoMkStream()); return connection.invoke().from(RedisStreamAsyncCommands::xadd, record.getStream(), args, record.getValue()) .get(RecordId::of); diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java index e7882df5b9..efc644fc5c 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java @@ -237,6 +237,23 @@ void executeShouldPassThruCustomCommands() { verify(asyncCommandsMock).dispatch(eq(command.getType()), eq(command.getOutput()), any(CommandArgs.class)); } + + @Test // GH-2047 + void xaddShouldHonorNoMkStream() { + + MapRecord record = MapRecord.create("key".getBytes(), Collections.emptyMap()); + + connection.streamCommands().xAdd(record, XAddOptions.makeNoStream()); + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + if (connection.isPipelined()) { + verify(asyncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap()); + } else { + verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap()); + } + + assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true); + } + } public static class LettucePipelineConnectionUnitTests extends BasicUnitTests { diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java index 27a48ebed2..8ed68c1b41 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java @@ -241,4 +241,22 @@ void xaddShouldHonorMaxlen() { assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L); } + + @Test // GH-2047 + void xaddShouldHonorNoMkStream() { + + LettuceReactiveRedisConnection connection = new LettuceReactiveRedisConnection(connectionProvider); + + ArgumentCaptor args = ArgumentCaptor.forClass(XAddArgs.class); + when(reactiveCommands.xadd(any(ByteBuffer.class), args.capture(), anyMap())).thenReturn(Mono.just("1-1")); + + MapRecord record = MapRecord.create(ByteBuffer.wrap("key".getBytes()), + Collections.emptyMap()); + + connection.streamCommands().xAdd(Mono.just(AddStreamRecord.of(ByteBufferRecord.of(record)).makeNoStream())) + .subscribe(); + + assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true); + } + }