Skip to content

Commit 2a51702

Browse files
author
Mark John Moreno
committed
Add NOMKSTREAM option to XADD command
Closes: spring-projects#2047
1 parent f47343d commit 2a51702

File tree

6 files changed

+139
-8
lines changed

6 files changed

+139
-8
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
* @author Christoph Strobl
5959
* @author Tugdual Grall
6060
* @author Dengliming
61+
* @author Mark John Moreno
6162
* @since 2.2
6263
*/
6364
public interface ReactiveStreamCommands {
@@ -199,12 +200,14 @@ class AddStreamRecord extends KeyCommand {
199200

200201
private final ByteBufferRecord record;
201202
private final @Nullable Long maxlen;
203+
private final boolean nomkstream;
202204

203-
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen) {
205+
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream) {
204206

205207
super(record.getStream());
206208
this.record = record;
207209
this.maxlen = maxlen;
210+
this.nomkstream = nomkstream;
208211
}
209212

210213
/**
@@ -217,7 +220,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
217220

218221
Assert.notNull(record, "Record must not be null!");
219222

220-
return new AddStreamRecord(record, null);
223+
return new AddStreamRecord(record, null, false);
221224
}
222225

223226
/**
@@ -230,7 +233,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
230233

231234
Assert.notNull(body, "Body must not be null!");
232235

233-
return new AddStreamRecord(StreamRecords.rawBuffer(body), null);
236+
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false);
234237
}
235238

236239
/**
@@ -240,7 +243,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
240243
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
241244
*/
242245
public AddStreamRecord to(ByteBuffer key) {
243-
return new AddStreamRecord(record.withStreamKey(key), maxlen);
246+
return new AddStreamRecord(record.withStreamKey(key), maxlen, false);
244247
}
245248

246249
/**
@@ -249,7 +252,26 @@ public AddStreamRecord to(ByteBuffer key) {
249252
* @return new instance of {@link AddStreamRecord}.
250253
*/
251254
public AddStreamRecord maxlen(long maxlen) {
252-
return new AddStreamRecord(record, maxlen);
255+
return new AddStreamRecord(record, maxlen, false);
256+
}
257+
258+
/**
259+
* Disable creation of stream if it does not already exist.
260+
*
261+
* @return new instance of {@link AddStreamRecord}.
262+
*/
263+
public AddStreamRecord makeNoStream() {
264+
return new AddStreamRecord(record, maxlen, true);
265+
}
266+
267+
/**
268+
* Disable creation of stream if it does not already exist.
269+
*
270+
* @param maxlen size of the stream to the given maximum number of elements.
271+
* @return new instance of {@link AddStreamRecord}.
272+
*/
273+
public AddStreamRecord makeNoStream(long maxlen) {
274+
return new AddStreamRecord(record, maxlen, true);
253275
}
254276

255277
/**
@@ -281,6 +303,13 @@ public Long getMaxlen() {
281303
public boolean hasMaxlen() {
282304
return maxlen != null && maxlen > 0;
283305
}
306+
307+
/**
308+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
309+
*/
310+
public boolean isNoMkStream() {
311+
return nomkstream;
312+
}
284313
}
285314

286315
/**

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

+33-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* @author Christoph Strobl
4242
* @author Tugdual Grall
4343
* @author Dengliming
44+
* @author Mark John Moreno
4445
* @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
4546
* @since 2.2
4647
*/
@@ -116,16 +117,19 @@ default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
116117
* Additional options applicable for {@literal XADD} command.
117118
*
118119
* @author Christoph Strobl
120+
* @author Mark John Moreno
119121
* @since 2.3
120122
*/
121123
class XAddOptions {
122124

123-
private static final XAddOptions NONE = new XAddOptions(null);
125+
private static final XAddOptions NONE = new XAddOptions(null, false);
124126

125127
private final @Nullable Long maxlen;
128+
private final boolean nomkstream;
126129

127-
private XAddOptions(@Nullable Long maxlen) {
130+
private XAddOptions(@Nullable Long maxlen, boolean nomkstream) {
128131
this.maxlen = maxlen;
132+
this.nomkstream = nomkstream;
129133
}
130134

131135
/**
@@ -141,7 +145,26 @@ public static XAddOptions none() {
141145
* @return new instance of {@link XAddOptions}.
142146
*/
143147
public static XAddOptions maxlen(long maxlen) {
144-
return new XAddOptions(maxlen);
148+
return new XAddOptions(maxlen, false);
149+
}
150+
151+
/**
152+
* Disable creation of stream if it does not already exist.
153+
*
154+
* @return new instance of {@link XAddOptions}.
155+
*/
156+
public static XAddOptions makeNoStream() {
157+
return new XAddOptions(null, true);
158+
}
159+
160+
/**
161+
* Disable creation of stream if it does not already exist.
162+
*
163+
* @param maxlen size of the stream to the given maximum number of elements.
164+
* @return new instance of {@link XAddOptions}.
165+
*/
166+
public static XAddOptions makeNoStream(long maxlen) {
167+
return new XAddOptions(maxlen, true);
145168
}
146169

147170
/**
@@ -161,6 +184,13 @@ public boolean hasMaxlen() {
161184
return maxlen != null && maxlen > 0;
162185
}
163186

187+
/**
188+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
189+
*/
190+
public boolean isNoMkStream() {
191+
return nomkstream;
192+
}
193+
164194
@Override
165195
public boolean equals(Object o) {
166196
if (this == o) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

+2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* @author Mark Paluch
5656
* @author Tugdual Grall
5757
* @author Dengliming
58+
* @author Mark John Moreno
5859
* @since 2.2
5960
*/
6061
class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
@@ -110,6 +111,7 @@ public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStream
110111
if (command.hasMaxlen()) {
111112
args.maxlen(command.getMaxlen());
112113
}
114+
args.nomkstream(command.isNoMkStream());
113115

114116
return cmd.xadd(command.getKey(), args, command.getBody())
115117
.map(value -> new CommandResponse<>(command, RecordId.of(value)));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* @author Tugdual Grall
5050
* @author Dejan Jankov
5151
* @author Dengliming
52+
* @author Mark John Moreno
5253
* @since 2.2
5354
*/
5455
class LettuceStreamCommands implements RedisStreamCommands {
@@ -90,6 +91,7 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
9091
if (options.hasMaxlen()) {
9192
args.maxlen(options.getMaxlen());
9293
}
94+
args.nomkstream(options.isNoMkStream());
9395

9496
return connection.invoke().from(RedisStreamAsyncCommands::xadd, record.getStream(), args, record.getValue())
9597
.get(RecordId::of);

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java

+33
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,39 @@ void executeShouldPassThruCustomCommands() {
237237

238238
verify(asyncCommandsMock).dispatch(eq(command.getType()), eq(command.getOutput()), any(CommandArgs.class));
239239
}
240+
241+
@Test // GH-2047
242+
void xaddShouldHonorNoMkStream() {
243+
244+
MapRecord<byte[], byte[], byte[]> record = MapRecord.create("key".getBytes(), Collections.emptyMap());
245+
246+
connection.streamCommands().xAdd(record, XAddOptions.makeNoStream());
247+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
248+
if (connection.isPipelined()) {
249+
verify(asyncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
250+
} else {
251+
verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
252+
}
253+
254+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
255+
}
256+
257+
@Test // GH-2047
258+
void xaddShouldHonorNoMkStreamWithMaxLen() {
259+
260+
MapRecord<byte[], byte[], byte[]> record = MapRecord.create("key".getBytes(), Collections.emptyMap());
261+
262+
connection.streamCommands().xAdd(record, XAddOptions.makeNoStream(100));
263+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
264+
if (connection.isPipelined()) {
265+
verify(asyncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
266+
} else {
267+
verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
268+
}
269+
270+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
271+
assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L);
272+
}
240273
}
241274

242275
public static class LettucePipelineConnectionUnitTests extends BasicUnitTests {

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnectionUnitTests.java

+35
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,39 @@ void xaddShouldHonorMaxlen() {
241241

242242
assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L);
243243
}
244+
245+
@Test // GH-2047
246+
void xaddShouldHonorNoMkStream() {
247+
248+
LettuceReactiveRedisConnection connection = new LettuceReactiveRedisConnection(connectionProvider);
249+
250+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
251+
when(reactiveCommands.xadd(any(ByteBuffer.class), args.capture(), anyMap())).thenReturn(Mono.just("1-1"));
252+
253+
MapRecord<ByteBuffer, ByteBuffer, ByteBuffer> record = MapRecord.create(ByteBuffer.wrap("key".getBytes()),
254+
Collections.emptyMap());
255+
256+
connection.streamCommands().xAdd(Mono.just(AddStreamRecord.of(ByteBufferRecord.of(record)).makeNoStream()))
257+
.subscribe();
258+
259+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
260+
}
261+
262+
@Test // GH-2047
263+
void xaddShouldHonorNoMkStreamWithMaxLen() {
264+
265+
LettuceReactiveRedisConnection connection = new LettuceReactiveRedisConnection(connectionProvider);
266+
267+
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
268+
when(reactiveCommands.xadd(any(ByteBuffer.class), args.capture(), anyMap())).thenReturn(Mono.just("1-1"));
269+
270+
MapRecord<ByteBuffer, ByteBuffer, ByteBuffer> record = MapRecord.create(ByteBuffer.wrap("key".getBytes()),
271+
Collections.emptyMap());
272+
273+
connection.streamCommands().xAdd(Mono.just(AddStreamRecord.of(ByteBufferRecord.of(record)).makeNoStream(100)))
274+
.subscribe();
275+
276+
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
277+
assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L);
278+
}
244279
}

0 commit comments

Comments
 (0)