Skip to content

Commit c63e2e2

Browse files
committed
Polishing.
Add support for MINID trimming. Add tests. Extract Jedis XAddParams conversion to StreamConverters. Reorder methods. Simplify StreamRecord creation. See #2247
1 parent 9b8bca6 commit c63e2e2

File tree

9 files changed

+216
-76
lines changed

9 files changed

+216
-76
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -199,19 +199,20 @@ default Mono<Long> xAck(ByteBuffer key, String group, RecordId... recordIds) {
199199
class AddStreamRecord extends KeyCommand {
200200

201201
private final ByteBufferRecord record;
202-
private final @Nullable Long maxlen;
203202
private final boolean nomkstream;
203+
private final @Nullable Long maxlen;
204204
private final boolean approximateTrimming;
205-
205+
private final @Nullable RecordId minId;
206206

207207
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream,
208-
boolean approximateTrimming) {
208+
boolean approximateTrimming, @Nullable RecordId minId) {
209209

210210
super(record.getStream());
211211
this.record = record;
212212
this.maxlen = maxlen;
213213
this.nomkstream = nomkstream;
214214
this.approximateTrimming = approximateTrimming;
215+
this.minId = minId;
215216
}
216217

217218
/**
@@ -224,7 +225,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
224225

225226
Assert.notNull(record, "Record must not be null!");
226227

227-
return new AddStreamRecord(record, null, false, false);
228+
return new AddStreamRecord(record, null, false, false, null);
228229
}
229230

230231
/**
@@ -237,7 +238,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
237238

238239
Assert.notNull(body, "Body must not be null!");
239240

240-
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false);
241+
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false, null);
241242
}
242243

243244
/**
@@ -247,53 +248,57 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
247248
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
248249
*/
249250
public AddStreamRecord to(ByteBuffer key) {
250-
return new AddStreamRecord(record.withStreamKey(key), maxlen, false, false);
251+
return new AddStreamRecord(record.withStreamKey(key), maxlen, nomkstream, approximateTrimming, minId);
251252
}
252253

253254
/**
254-
* Limit the size of the stream to the given maximum number of elements.
255+
* Disable creation of stream if it does not already exist.
255256
*
256257
* @return new instance of {@link AddStreamRecord}.
258+
* @since 2.6
257259
*/
258-
public AddStreamRecord maxlen(long maxlen) {
259-
return new AddStreamRecord(record, maxlen, false, false);
260+
public AddStreamRecord makeNoStream() {
261+
return new AddStreamRecord(record, maxlen, true, approximateTrimming, minId);
260262
}
261263

262264
/**
263265
* Disable creation of stream if it does not already exist.
264266
*
267+
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
265268
* @return new instance of {@link AddStreamRecord}.
266269
* @since 2.6
267270
*/
268-
public AddStreamRecord makeNoStream() {
269-
return new AddStreamRecord(record, maxlen, true, false);
271+
public AddStreamRecord makeNoStream(boolean makeNoStream) {
272+
return new AddStreamRecord(record, maxlen, makeNoStream, approximateTrimming, minId);
270273
}
271274

272275
/**
273-
* Disable creation of stream if it does not already exist.
276+
* Limit the size of the stream to the given maximum number of elements.
274277
*
275-
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
276278
* @return new instance of {@link AddStreamRecord}.
277-
* @since 2.6
278279
*/
279-
public AddStreamRecord makeNoStream(boolean makeNoStream) {
280-
return new AddStreamRecord(record, maxlen, makeNoStream, false);
280+
public AddStreamRecord maxlen(long maxlen) {
281+
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
281282
}
282283

283284
/**
284-
* Apply efficient trimming for capped streams using the {@code ~} flag.
285+
* Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified.
285286
*
287+
* @param minId the minimum record Id to retain.
286288
* @return new instance of {@link AddStreamRecord}.
289+
* @since 2.7
287290
*/
288-
public AddStreamRecord approximateTrimming(boolean approximateTrimming) {
289-
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming);
291+
public AddStreamRecord minId(RecordId minId) {
292+
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
290293
}
291294

292295
/**
293-
* @return {@literal true} if {@literal approximateTrimming} is set.
296+
* Apply efficient trimming for capped streams using the {@code ~} flag.
297+
*
298+
* @return new instance of {@link AddStreamRecord}.
294299
*/
295-
public boolean isApproximateTrimming() {
296-
return approximateTrimming;
300+
public AddStreamRecord approximateTrimming(boolean approximateTrimming) {
301+
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
297302
}
298303

299304
/**
@@ -307,6 +312,14 @@ public ByteBufferRecord getRecord() {
307312
return record;
308313
}
309314

315+
/**
316+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
317+
* @since 2.6
318+
*/
319+
public boolean isNoMkStream() {
320+
return nomkstream;
321+
}
322+
310323
/**
311324
* Limit the size of the stream to the given maximum number of elements.
312325
*
@@ -327,11 +340,28 @@ public boolean hasMaxlen() {
327340
}
328341

329342
/**
330-
* @return {@literal true} if {@literal NOMKSTREAM} is set.
331-
* @since 2.6
343+
* @return {@literal true} if {@literal approximateTrimming} is set.
344+
* @since 2.7
332345
*/
333-
public boolean isNoMkStream() {
334-
return nomkstream;
346+
public boolean isApproximateTrimming() {
347+
return approximateTrimming;
348+
}
349+
350+
/**
351+
* @return the minimum record Id to retain during trimming.
352+
* @since 2.7
353+
*/
354+
@Nullable
355+
public RecordId getMinId() {
356+
return minId;
357+
}
358+
359+
/**
360+
* @return {@literal true} if {@literal MINID} is set.
361+
* @since 2.7
362+
*/
363+
public boolean hasMinId() {
364+
return minId != null;
335365
}
336366
}
337367

@@ -1223,7 +1253,7 @@ public static GroupCommand deleteConsumer(Consumer consumer) {
12231253
}
12241254

12251255
public GroupCommand makeStream(boolean mkStream) {
1226-
return new GroupCommand(getKey(), action, groupName, consumerName, offset,mkStream);
1256+
return new GroupCommand(getKey(), action, groupName, consumerName, offset, mkStream);
12271257
}
12281258

12291259
public GroupCommand at(ReadOffset offset) {
@@ -1291,8 +1321,8 @@ default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset r
12911321
* @since 2.3
12921322
*/
12931323
default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset readOffset, boolean mkStream) {
1294-
return xGroup(Mono.just(GroupCommand.createGroup(groupName).forStream(key).at(readOffset).makeStream(mkStream))).next()
1295-
.map(CommandResponse::getOutput);
1324+
return xGroup(Mono.just(GroupCommand.createGroup(groupName).forStream(key).at(readOffset).makeStream(mkStream)))
1325+
.next().map(CommandResponse::getOutput);
12961326
}
12971327

12981328
/**

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,19 @@ default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
121121
*/
122122
class XAddOptions {
123123

124-
private static final XAddOptions NONE = new XAddOptions(null, false, false);
124+
private static final XAddOptions NONE = new XAddOptions(null, false, false, null);
125125

126126
private final @Nullable Long maxlen;
127127
private final boolean nomkstream;
128128
private final boolean approximateTrimming;
129+
private final @Nullable RecordId minId;
129130

130-
private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming) {
131+
private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming,
132+
@Nullable RecordId minId) {
131133
this.maxlen = maxlen;
132134
this.nomkstream = nomkstream;
133135
this.approximateTrimming = approximateTrimming;
136+
this.minId = minId;
134137
}
135138

136139
/**
@@ -141,33 +144,44 @@ public static XAddOptions none() {
141144
}
142145

143146
/**
144-
* Limit the size of the stream to the given maximum number of elements.
147+
* Disable creation of stream if it does not already exist.
145148
*
146149
* @return new instance of {@link XAddOptions}.
150+
* @since 2.6
147151
*/
148-
public static XAddOptions maxlen(long maxlen) {
149-
return new XAddOptions(maxlen, false, false);
152+
public static XAddOptions makeNoStream() {
153+
return new XAddOptions(null, true, false, null);
150154
}
151155

152156
/**
153157
* Disable creation of stream if it does not already exist.
154158
*
159+
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
155160
* @return new instance of {@link XAddOptions}.
156161
* @since 2.6
157162
*/
158-
public static XAddOptions makeNoStream() {
159-
return new XAddOptions(null, true, false);
163+
public static XAddOptions makeNoStream(boolean makeNoStream) {
164+
return new XAddOptions(null, makeNoStream, false, null);
160165
}
161166

162167
/**
163-
* Disable creation of stream if it does not already exist.
168+
* Limit the size of the stream to the given maximum number of elements.
164169
*
165-
* @param makeNoStream {@code true} to not create a stream if it does not already exist.
166170
* @return new instance of {@link XAddOptions}.
167-
* @since 2.6
168171
*/
169-
public static XAddOptions makeNoStream(boolean makeNoStream) {
170-
return new XAddOptions(null, makeNoStream, false);
172+
public static XAddOptions maxlen(long maxlen) {
173+
return new XAddOptions(maxlen, false, false, null);
174+
}
175+
176+
/**
177+
* Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified.
178+
*
179+
* @param minId the minimum record Id to retain.
180+
* @return new instance of {@link XAddOptions}.
181+
* @since 2.7
182+
*/
183+
public XAddOptions minId(RecordId minId) {
184+
return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
171185
}
172186

173187
/**
@@ -176,14 +190,15 @@ public static XAddOptions makeNoStream(boolean makeNoStream) {
176190
* @return new instance of {@link XAddOptions}.
177191
*/
178192
public XAddOptions approximateTrimming(boolean approximateTrimming) {
179-
return new XAddOptions(null, nomkstream, approximateTrimming);
193+
return new XAddOptions(maxlen, nomkstream, approximateTrimming, minId);
180194
}
181195

182196
/**
183-
* @return {@literal true} if {@literal approximateTrimming} is set.
197+
* @return {@literal true} if {@literal NOMKSTREAM} is set.
198+
* @since 2.6
184199
*/
185-
public boolean isApproximateTrimming() {
186-
return approximateTrimming;
200+
public boolean isNoMkStream() {
201+
return nomkstream;
187202
}
188203

189204
/**
@@ -204,32 +219,56 @@ public boolean hasMaxlen() {
204219
}
205220

206221
/**
207-
* @return {@literal true} if {@literal NOMKSTREAM} is set.
208-
* @since 2.6
222+
* @return {@literal true} if {@literal approximateTrimming} is set.
209223
*/
210-
public boolean isNoMkStream() {
211-
return nomkstream;
224+
public boolean isApproximateTrimming() {
225+
return approximateTrimming;
226+
}
227+
228+
/**
229+
* @return the minimum record Id to retain during trimming.
230+
* @since 2.7
231+
*/
232+
@Nullable
233+
public RecordId getMinId() {
234+
return minId;
235+
}
236+
237+
/**
238+
* @return {@literal true} if {@literal MINID} is set.
239+
* @since 2.7
240+
*/
241+
public boolean hasMinId() {
242+
return minId != null;
212243
}
213244

214245
@Override
215246
public boolean equals(Object o) {
216247
if (this == o) {
217248
return true;
218249
}
219-
if (o == null || getClass() != o.getClass()) {
250+
if (!(o instanceof XAddOptions)) {
220251
return false;
221252
}
222253
XAddOptions that = (XAddOptions) o;
223-
if (this.nomkstream != that.nomkstream) return false;
224-
if (this.approximateTrimming != that.approximateTrimming) return false;
225-
return ObjectUtils.nullSafeEquals(this.maxlen, that.maxlen);
254+
if (nomkstream != that.nomkstream) {
255+
return false;
256+
}
257+
if (approximateTrimming != that.approximateTrimming) {
258+
return false;
259+
}
260+
if (!ObjectUtils.nullSafeEquals(maxlen, that.maxlen)) {
261+
return false;
262+
}
263+
return ObjectUtils.nullSafeEquals(minId, that.minId);
226264
}
227265

228266
@Override
229267
public int hashCode() {
230-
int result = ObjectUtils.nullSafeHashCode(this.maxlen);
231-
result = 31 * result + ObjectUtils.nullSafeHashCode(this.nomkstream);
232-
result = 31 * result + ObjectUtils.nullSafeHashCode(this.approximateTrimming);
268+
int result = ObjectUtils.nullSafeHashCode(maxlen);
269+
result = 31 * result + (nomkstream ? 1 : 0);
270+
result = 31 * result + (approximateTrimming ? 1 : 0);
271+
result = 31 * result + ObjectUtils.nullSafeHashCode(minId);
233272
return result;
234273
}
235274
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.springframework.data.redis.connection.jedis.StreamConverters.*;
1919

2020
import redis.clients.jedis.BuilderFactory;
21+
import redis.clients.jedis.params.XAddParams;
2122

2223
import java.util.Arrays;
2324
import java.util.Collections;
@@ -72,15 +73,11 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
7273
Assert.notNull(record, "Record must not be null!");
7374
Assert.notNull(record.getStream(), "Stream must not be null!");
7475

75-
byte[] id = JedisConverters.toBytes(record.getId().getValue());
76-
long maxLength = Long.MAX_VALUE;
77-
if (options.hasMaxlen()) {
78-
maxLength = options.getMaxlen();
79-
}
76+
XAddParams params = StreamConverters.toXAddParams(record, options);
8077

8178
try {
82-
return RecordId.of(JedisConverters
83-
.toString(connection.getCluster().xadd(record.getStream(), id, record.getValue(), maxLength, false)));
79+
return RecordId
80+
.of(JedisConverters.toString(connection.getCluster().xadd(record.getStream(), record.getValue(), params)));
8481
} catch (Exception ex) {
8582
throw convertJedisAccessException(ex);
8683
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,10 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
7171
Assert.notNull(record, "Record must not be null!");
7272
Assert.notNull(record.getStream(), "Stream must not be null!");
7373

74-
XAddParams xAddParams = new XAddParams();
75-
xAddParams.id(record.getId().getValue());
76-
if (options.hasMaxlen()) {
77-
xAddParams.maxLen(options.getMaxlen());
78-
}
79-
if (options.isNoMkStream()) {
80-
xAddParams.noMkStream();
81-
}
82-
if (options.isApproximateTrimming()) {
83-
xAddParams.approximateTrimming();
84-
}
74+
XAddParams params = StreamConverters.toXAddParams(record, options);
8575

8676
return connection.invoke()
87-
.from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), record.getValue(), xAddParams)
77+
.from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), record.getValue(), params)
8878
.get(it -> RecordId.of(JedisConverters.toString(it)));
8979
}
9080

0 commit comments

Comments
 (0)