Skip to content

Commit f3f0c11

Browse files
morenomjcmp911de
authored andcommitted
Add NOMKSTREAM option to XADD command.
Closes #2047 Original pull request: #2118.
1 parent 39d3b7c commit f3f0c11

File tree

6 files changed

+116
-10
lines changed

6 files changed

+116
-10
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* @author Christoph Strobl
5959
* @author Tugdual Grall
6060
* @author Dengliming
61+
* @author Mark John Moreno
6162
* @since 2.2
6263
*/
6364
public interface ReactiveStreamCommands {
@@ -199,12 +200,14 @@ class AddStreamRecord extends KeyCommand {
199200

200201
private final ByteBufferRecord record;
201202
private final @Nullable Long maxlen;
203+
private final boolean nomkstream;
202204

203-
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen) {
205+
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream) {
204206

205207
super(record.getStream());
206208
this.record = record;
207209
this.maxlen = maxlen;
210+
this.nomkstream = nomkstream;
208211
}
209212

210213
/**
@@ -217,7 +220,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
217220

218221
Assert.notNull(record, "Record must not be null!");
219222

220-
return new AddStreamRecord(record, null);
223+
return new AddStreamRecord(record, null, false);
221224
}
222225

223226
/**
@@ -230,7 +233,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
230233

231234
Assert.notNull(body, "Body must not be null!");
232235

233-
return new AddStreamRecord(StreamRecords.rawBuffer(body), null);
236+
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false);
234237
}
235238

236239
/**
@@ -240,7 +243,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
240243
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
241244
*/
242245
public AddStreamRecord to(ByteBuffer key) {
243-
return new AddStreamRecord(record.withStreamKey(key), maxlen);
246+
return new AddStreamRecord(record.withStreamKey(key), maxlen, false);
244247
}
245248

246249
/**
@@ -249,7 +252,28 @@ public AddStreamRecord to(ByteBuffer key) {
249252
* @return new instance of {@link AddStreamRecord}.
250253
*/
251254
public AddStreamRecord maxlen(long maxlen) {
252-
return new AddStreamRecord(record, maxlen);
255+
return new AddStreamRecord(record, maxlen, false);
256+
}
257+
258+
/**
259+
* Disable creation of stream if it does not already exist.
260+
*
261+
* @return new instance of {@link AddStreamRecord}.
262+
* @since 2.6
263+
*/
264+
public AddStreamRecord makeNoStream() {
265+
return new AddStreamRecord(record, maxlen, true);
266+
}
267+
268+
/**
269+
* Disable creation of stream if it does not already exist.
270+
*
271+
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
272+
* @return new instance of {@link AddStreamRecord}.
273+
* @since 2.6
274+
*/
275+
public AddStreamRecord makeNoStream(boolean makeNoStream) {
276+
return new AddStreamRecord(record, maxlen, makeNoStream);
253277
}
254278

255279
/**
@@ -281,6 +305,14 @@ public Long getMaxlen() {
281305
public boolean hasMaxlen() {
282306
return maxlen != null && maxlen > 0;
283307
}
308+
309+
/**
310+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
311+
* @since 2.6
312+
*/
313+
public boolean isNoMkStream() {
314+
return nomkstream;
315+
}
284316
}
285317

286318
/**

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

+40-5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* @author Christoph Strobl
4242
* @author Tugdual Grall
4343
* @author Dengliming
44+
* @author Mark John Moreno
4445
* @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
4546
* @since 2.2
4647
*/
@@ -116,16 +117,19 @@ default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
116117
* Additional options applicable for {@literal XADD} command.
117118
*
118119
* @author Christoph Strobl
120+
* @author Mark John Moreno
119121
* @since 2.3
120122
*/
121123
class XAddOptions {
122124

123-
private static final XAddOptions NONE = new XAddOptions(null);
125+
private static final XAddOptions NONE = new XAddOptions(null, false);
124126

125127
private final @Nullable Long maxlen;
128+
private final boolean nomkstream;
126129

127-
private XAddOptions(@Nullable Long maxlen) {
130+
private XAddOptions(@Nullable Long maxlen, boolean nomkstream) {
128131
this.maxlen = maxlen;
132+
this.nomkstream = nomkstream;
129133
}
130134

131135
/**
@@ -141,7 +145,28 @@ public static XAddOptions none() {
141145
* @return new instance of {@link XAddOptions}.
142146
*/
143147
public static XAddOptions maxlen(long maxlen) {
144-
return new XAddOptions(maxlen);
148+
return new XAddOptions(maxlen, false);
149+
}
150+
151+
/**
152+
* Disable creation of stream if it does not already exist.
153+
*
154+
* @return new instance of {@link XAddOptions}.
155+
* @since 2.6
156+
*/
157+
public static XAddOptions makeNoStream() {
158+
return new XAddOptions(null, true);
159+
}
160+
161+
/**
162+
* Disable creation of stream if it does not already exist.
163+
*
164+
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
165+
* @return new instance of {@link XAddOptions}.
166+
* @since 2.6
167+
*/
168+
public static XAddOptions makeNoStream(boolean makeNoStream) {
169+
return new XAddOptions(null, makeNoStream);
145170
}
146171

147172
/**
@@ -161,6 +186,14 @@ public boolean hasMaxlen() {
161186
return maxlen != null && maxlen > 0;
162187
}
163188

189+
/**
190+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
191+
* @since 2.6
192+
*/
193+
public boolean isNoMkStream() {
194+
return nomkstream;
195+
}
196+
164197
@Override
165198
public boolean equals(Object o) {
166199
if (this == o) {
@@ -169,14 +202,16 @@ public boolean equals(Object o) {
169202
if (o == null || getClass() != o.getClass()) {
170203
return false;
171204
}
172-
173205
XAddOptions that = (XAddOptions) o;
206+
if (this.nomkstream != that.nomkstream) return false;
174207
return ObjectUtils.nullSafeEquals(this.maxlen, that.maxlen);
175208
}
176209

177210
@Override
178211
public int hashCode() {
179-
return ObjectUtils.nullSafeHashCode(this.maxlen);
212+
int result = ObjectUtils.nullSafeHashCode(this.maxlen);
213+
result = 31 * result + ObjectUtils.nullSafeHashCode(this.nomkstream);
214+
return result;
180215
}
181216
}
182217

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* @author Mark Paluch
5656
* @author Tugdual Grall
5757
* @author Dengliming
58+
* @author Mark John Moreno
5859
* @since 2.2
5960
*/
6061
class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
@@ -110,6 +111,7 @@ public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStream
110111
if (command.hasMaxlen()) {
111112
args.maxlen(command.getMaxlen());
112113
}
114+
args.nomkstream(command.isNoMkStream());
113115

114116
return cmd.xadd(command.getKey(), args, command.getBody())
115117
.map(value -> new CommandResponse<>(command, RecordId.of(value)));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* @author Tugdual Grall
5050
* @author Dejan Jankov
5151
* @author Dengliming
52+
* @author Mark John Moreno
5253
* @since 2.2
5354
*/
5455
class LettuceStreamCommands implements RedisStreamCommands {
@@ -90,6 +91,7 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
9091
if (options.hasMaxlen()) {
9192
args.maxlen(options.getMaxlen());
9293
}
94+
args.nomkstream(options.isNoMkStream());
9395

9496
return connection.invoke().from(RedisStreamAsyncCommands::xadd, record.getStream(), args, record.getValue())
9597
.get(RecordId::of);

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java

+17
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,23 @@ void executeShouldPassThruCustomCommands() {
237237

238238
verify(asyncCommandsMock).dispatch(eq(command.getType()), eq(command.getOutput()), any(CommandArgs.class));
239239
}
240+
241+
@Test // GH-2047
242+
void xaddShouldHonorNoMkStream() {
243+
244+
MapRecord<byte[], byte[], byte[]> record = MapRecord.create("key".getBytes(), Collections.emptyMap());
245+
246+
connection.streamCommands().xAdd(record, XAddOptions.makeNoStream());
247+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
248+
if (connection.isPipelined()) {
249+
verify(asyncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
250+
} else {
251+
verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
252+
}
253+
254+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
255+
}
256+
240257
}
241258

242259
public static class LettucePipelineConnectionUnitTests extends BasicUnitTests {

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java

+18
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,22 @@ void xaddShouldHonorMaxlen() {
241241

242242
assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L);
243243
}
244+
245+
@Test // GH-2047
246+
void xaddShouldHonorNoMkStream() {
247+
248+
LettuceReactiveRedisConnection connection = new LettuceReactiveRedisConnection(connectionProvider);
249+
250+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
251+
when(reactiveCommands.xadd(any(ByteBuffer.class), args.capture(), anyMap())).thenReturn(Mono.just("1-1"));
252+
253+
MapRecord<ByteBuffer, ByteBuffer, ByteBuffer> record = MapRecord.create(ByteBuffer.wrap("key".getBytes()),
254+
Collections.emptyMap());
255+
256+
connection.streamCommands().xAdd(Mono.just(AddStreamRecord.of(ByteBufferRecord.of(record)).makeNoStream()))
257+
.subscribe();
258+
259+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
260+
}
261+
244262
}

0 commit comments

Comments
 (0)