@@ -199,19 +199,20 @@ default Mono<Long> xAck(ByteBuffer key, String group, RecordId... recordIds) {
199
199
class AddStreamRecord extends KeyCommand {
200
200
201
201
private final ByteBufferRecord record ;
202
- private final @ Nullable Long maxlen ;
203
202
private final boolean nomkstream ;
203
+ private final @ Nullable Long maxlen ;
204
204
private final boolean approximateTrimming ;
205
-
205
+ private final @ Nullable RecordId minId ;
206
206
207
207
private AddStreamRecord (ByteBufferRecord record , @ Nullable Long maxlen , boolean nomkstream ,
208
- boolean approximateTrimming ) {
208
+ boolean approximateTrimming , @ Nullable RecordId minId ) {
209
209
210
210
super (record .getStream ());
211
211
this .record = record ;
212
212
this .maxlen = maxlen ;
213
213
this .nomkstream = nomkstream ;
214
214
this .approximateTrimming = approximateTrimming ;
215
+ this .minId = minId ;
215
216
}
216
217
217
218
/**
@@ -224,7 +225,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
224
225
225
226
Assert .notNull (record , "Record must not be null!" );
226
227
227
- return new AddStreamRecord (record , null , false , false );
228
+ return new AddStreamRecord (record , null , false , false , null );
228
229
}
229
230
230
231
/**
@@ -237,7 +238,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
237
238
238
239
Assert .notNull (body , "Body must not be null!" );
239
240
240
- return new AddStreamRecord (StreamRecords .rawBuffer (body ), null , false , false );
241
+ return new AddStreamRecord (StreamRecords .rawBuffer (body ), null , false , false , null );
241
242
}
242
243
243
244
/**
@@ -247,53 +248,57 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
247
248
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
248
249
*/
249
250
public AddStreamRecord to (ByteBuffer key ) {
250
- return new AddStreamRecord (record .withStreamKey (key ), maxlen , false , false );
251
+ return new AddStreamRecord (record .withStreamKey (key ), maxlen , nomkstream , approximateTrimming , minId );
251
252
}
252
253
253
254
/**
254
- * Limit the size of the stream to the given maximum number of elements .
255
+ * Disable creation of stream if it does not already exist .
255
256
*
256
257
* @return new instance of {@link AddStreamRecord}.
258
+ * @since 2.6
257
259
*/
258
- public AddStreamRecord maxlen ( long maxlen ) {
259
- return new AddStreamRecord (record , maxlen , false , false );
260
+ public AddStreamRecord makeNoStream ( ) {
261
+ return new AddStreamRecord (record , maxlen , true , approximateTrimming , minId );
260
262
}
261
263
262
264
/**
263
265
* Disable creation of stream if it does not already exist.
264
266
*
267
+ * @param makeNoStream {@code true} to not create a stream if it does not already exist.
265
268
* @return new instance of {@link AddStreamRecord}.
266
269
* @since 2.6
267
270
*/
268
- public AddStreamRecord makeNoStream () {
269
- return new AddStreamRecord (record , maxlen , true , false );
271
+ public AddStreamRecord makeNoStream (boolean makeNoStream ) {
272
+ return new AddStreamRecord (record , maxlen , makeNoStream , approximateTrimming , minId );
270
273
}
271
274
272
275
/**
273
- * Disable creation of stream if it does not already exist .
276
+ * Limit the size of the stream to the given maximum number of elements .
274
277
*
275
- * @param makeNoStream {@code true} to not create a stream if it does not already exist.
276
278
* @return new instance of {@link AddStreamRecord}.
277
- * @since 2.6
278
279
*/
279
- public AddStreamRecord makeNoStream ( boolean makeNoStream ) {
280
- return new AddStreamRecord (record , maxlen , makeNoStream , false );
280
+ public AddStreamRecord maxlen ( long maxlen ) {
281
+ return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming , minId );
281
282
}
282
283
283
284
/**
284
- * Apply efficient trimming for capped streams using the {@code ~} flag .
285
+ * Apply {@code MINID} trimming strategy, that evicts entries with IDs lower than the one specified .
285
286
*
287
+ * @param the minimum record Id to retain.
286
288
* @return new instance of {@link AddStreamRecord}.
289
+ * @since 2.7
287
290
*/
288
- public AddStreamRecord approximateTrimming ( boolean approximateTrimming ) {
289
- return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming );
291
+ public AddStreamRecord minId ( RecordId minId ) {
292
+ return new AddStreamRecord (record , maxlen , nomkstream , approximateTrimming , minId );
290
293
}
291
294
292
295
/**
293
- * @return {@literal true} if {@literal approximateTrimming} is set.
296
+ * Apply efficient trimming for capped streams using the {@code ~} flag.
297
+ *
298
+ * @return new instance of {@link AddStreamRecord}.
294
299
*/
295
- public boolean isApproximateTrimming ( ) {
296
- return approximateTrimming ;
300
+ public AddStreamRecord approximateTrimming ( boolean approximateTrimming ) {
301
+ return new AddStreamRecord ( record , maxlen , nomkstream , approximateTrimming , minId ) ;
297
302
}
298
303
299
304
/**
@@ -307,6 +312,14 @@ public ByteBufferRecord getRecord() {
307
312
return record ;
308
313
}
309
314
315
+ /**
316
+ * @return {@literal true} if {@literal NOMKSTREAM} is set.
317
+ * @since 2.6
318
+ */
319
+ public boolean isNoMkStream () {
320
+ return nomkstream ;
321
+ }
322
+
310
323
/**
311
324
* Limit the size of the stream to the given maximum number of elements.
312
325
*
@@ -327,11 +340,28 @@ public boolean hasMaxlen() {
327
340
}
328
341
329
342
/**
330
- * @return {@literal true} if {@literal NOMKSTREAM } is set.
331
- * @since 2.6
343
+ * @return {@literal true} if {@literal approximateTrimming } is set.
344
+ * @since 2.7
332
345
*/
333
- public boolean isNoMkStream () {
334
- return nomkstream ;
346
+ public boolean isApproximateTrimming () {
347
+ return approximateTrimming ;
348
+ }
349
+
350
+ /**
351
+ * @return the minimum record Id to retain during trimming.
352
+ * @since 2.7
353
+ */
354
+ @ Nullable
355
+ public RecordId getMinId () {
356
+ return minId ;
357
+ }
358
+
359
+ /**
360
+ * @return {@literal true} if {@literal MINID} is set.
361
+ * @since 2.7
362
+ */
363
+ public boolean hasMinId () {
364
+ return minId != null ;
335
365
}
336
366
}
337
367
@@ -1223,7 +1253,7 @@ public static GroupCommand deleteConsumer(Consumer consumer) {
1223
1253
}
1224
1254
1225
1255
public GroupCommand makeStream (boolean mkStream ) {
1226
- return new GroupCommand (getKey (), action , groupName , consumerName , offset ,mkStream );
1256
+ return new GroupCommand (getKey (), action , groupName , consumerName , offset , mkStream );
1227
1257
}
1228
1258
1229
1259
public GroupCommand at (ReadOffset offset ) {
@@ -1291,8 +1321,8 @@ default Mono<String> xGroupCreate(ByteBuffer key, String groupName, ReadOffset r
1291
1321
* @since 2.3
1292
1322
*/
1293
1323
default Mono <String > xGroupCreate (ByteBuffer key , String groupName , ReadOffset readOffset , boolean mkStream ) {
1294
- return xGroup (Mono .just (GroupCommand .createGroup (groupName ).forStream (key ).at (readOffset ).makeStream (mkStream ))). next ()
1295
- .map (CommandResponse ::getOutput );
1324
+ return xGroup (Mono .just (GroupCommand .createGroup (groupName ).forStream (key ).at (readOffset ).makeStream (mkStream )))
1325
+ .next (). map (CommandResponse ::getOutput );
1296
1326
}
1297
1327
1298
1328
/**
0 commit comments