diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java index 5dc3e883e7..665a9f4d5b 100644 --- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java @@ -201,13 +201,17 @@ class AddStreamRecord extends KeyCommand { private final ByteBufferRecord record; private final @Nullable Long maxlen; private final boolean nomkstream; + private final boolean approximateTrimming; - private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream) { + + private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream, + boolean approximateTrimming) { super(record.getStream()); this.record = record; this.maxlen = maxlen; this.nomkstream = nomkstream; + this.approximateTrimming = approximateTrimming; } /** @@ -220,7 +224,7 @@ public static AddStreamRecord of(ByteBufferRecord record) { Assert.notNull(record, "Record must not be null!"); - return new AddStreamRecord(record, null, false); + return new AddStreamRecord(record, null, false, false); } /** @@ -233,7 +237,7 @@ public static AddStreamRecord body(Map body) { Assert.notNull(body, "Body must not be null!"); - return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false); + return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false); } /** @@ -243,7 +247,7 @@ public static AddStreamRecord body(Map body) { * @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied. */ public AddStreamRecord to(ByteBuffer key) { - return new AddStreamRecord(record.withStreamKey(key), maxlen, false); + return new AddStreamRecord(record.withStreamKey(key), maxlen, false, false); } /** @@ -252,7 +256,7 @@ public AddStreamRecord to(ByteBuffer key) { * @return new instance of {@link AddStreamRecord}. */ public AddStreamRecord maxlen(long maxlen) { - return new AddStreamRecord(record, maxlen, false); + return new AddStreamRecord(record, maxlen, false, false); } /** @@ -262,7 +266,7 @@ public AddStreamRecord maxlen(long maxlen) { * @since 2.6 */ public AddStreamRecord makeNoStream() { - return new AddStreamRecord(record, maxlen, true); + return new AddStreamRecord(record, maxlen, true, false); } /** @@ -273,7 +277,23 @@ public AddStreamRecord makeNoStream() { * @since 2.6 */ public AddStreamRecord makeNoStream(boolean makeNoStream) { - return new AddStreamRecord(record, maxlen, makeNoStream); + return new AddStreamRecord(record, maxlen, makeNoStream, false); + } + + /** + * Apply efficient trimming for capped streams using the {@code ~} flag. + * + * @return new instance of {@link AddStreamRecord}. + */ + public AddStreamRecord approximateTrimming(boolean approximateTrimming) { + return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming); + } + + /** + * @return {@literal true} if {@literal approximateTrimming} is set. + */ + public boolean isApproximateTrimming() { + return approximateTrimming; } /** diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 2efb2d9b54..0411d11384 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -116,18 +116,21 @@ default RecordId xAdd(MapRecord record) { * * @author Christoph Strobl * @author Mark John Moreno + * @author Liming Deng * @since 2.3 */ class XAddOptions { - private static final XAddOptions NONE = new XAddOptions(null, false); + private static final XAddOptions NONE = new XAddOptions(null, false, false); private final @Nullable Long maxlen; private final boolean nomkstream; + private final boolean approximateTrimming; - private XAddOptions(@Nullable Long maxlen, boolean nomkstream) { + private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming) { this.maxlen = maxlen; this.nomkstream = nomkstream; + this.approximateTrimming = approximateTrimming; } /** @@ -143,7 +146,7 @@ public static XAddOptions none() { * @return new instance of {@link XAddOptions}. */ public static XAddOptions maxlen(long maxlen) { - return new XAddOptions(maxlen, false); + return new XAddOptions(maxlen, false, false); } /** @@ -153,7 +156,7 @@ public static XAddOptions maxlen(long maxlen) { * @since 2.6 */ public static XAddOptions makeNoStream() { - return new XAddOptions(null, true); + return new XAddOptions(null, true, false); } /** @@ -164,7 +167,23 @@ public static XAddOptions makeNoStream() { * @since 2.6 */ public static XAddOptions makeNoStream(boolean makeNoStream) { - return new XAddOptions(null, makeNoStream); + return new XAddOptions(null, makeNoStream, false); + } + + /** + * Apply efficient trimming for capped streams using the {@code ~} flag. + * + * @return new instance of {@link XAddOptions}. + */ + public XAddOptions approximateTrimming(boolean approximateTrimming) { + return new XAddOptions(null, nomkstream, approximateTrimming); + } + + /** + * @return {@literal true} if {@literal approximateTrimming} is set. + */ + public boolean isApproximateTrimming() { + return approximateTrimming; } /** @@ -202,6 +221,7 @@ public boolean equals(Object o) { } XAddOptions that = (XAddOptions) o; if (this.nomkstream != that.nomkstream) return false; + if (this.approximateTrimming != that.approximateTrimming) return false; return ObjectUtils.nullSafeEquals(this.maxlen, that.maxlen); } @@ -209,6 +229,7 @@ public boolean equals(Object o) { public int hashCode() { int result = ObjectUtils.nullSafeHashCode(this.maxlen); result = 31 * result + ObjectUtils.nullSafeHashCode(this.nomkstream); + result = 31 * result + ObjectUtils.nullSafeHashCode(this.approximateTrimming); return result; } } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index ef9293a744..920da7af54 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -87,6 +87,9 @@ public RecordId xAdd(MapRecord record, XAddOptions optio if (options.isNoMkStream()) { xAddParams.noMkStream(); } + if (options.isApproximateTrimming()) { + xAddParams.approximateTrimming(); + } return connection.invoke() .from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), record.getValue(), xAddParams) diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 7fe6d03974..250ae86e5c 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -112,6 +112,7 @@ public Flux> xAdd(Publisher new CommandResponse<>(command, RecordId.of(value))); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 6f137850bd..4ebd47c35b 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -92,6 +92,7 @@ public RecordId xAdd(MapRecord record, XAddOptions optio args.maxlen(options.getMaxlen()); } args.nomkstream(options.isNoMkStream()); + args.approximateTrimming(options.isApproximateTrimming()); return connection.invoke().from(RedisStreamAsyncCommands::xadd, record.getStream(), args, record.getValue()) .get(RecordId::of);