Skip to content

Commit a3271f6

Browse files
denglimingmp911de
authored andcommitted
Support for approximate trimming in XAddOptions and AddStreamRecord.
Closes #2247
1 parent e63d5df commit a3271f6

File tree

5 files changed

+58
-12
lines changed

5 files changed

+58
-12
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,17 @@ class AddStreamRecord extends KeyCommand {
201201
private final ByteBufferRecord record;
202202
private final @Nullable Long maxlen;
203203
private final boolean nomkstream;
204+
private final boolean approximateTrimming;
204205

205-
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream) {
206+
207+
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream,
208+
boolean approximateTrimming) {
206209

207210
super(record.getStream());
208211
this.record = record;
209212
this.maxlen = maxlen;
210213
this.nomkstream = nomkstream;
214+
this.approximateTrimming = approximateTrimming;
211215
}
212216

213217
/**
@@ -220,7 +224,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
220224

221225
Assert.notNull(record, "Record must not be null!");
222226

223-
return new AddStreamRecord(record, null, false);
227+
return new AddStreamRecord(record, null, false, false);
224228
}
225229

226230
/**
@@ -233,7 +237,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
233237

234238
Assert.notNull(body, "Body must not be null!");
235239

236-
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false);
240+
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false);
237241
}
238242

239243
/**
@@ -243,7 +247,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
243247
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
244248
*/
245249
public AddStreamRecord to(ByteBuffer key) {
246-
return new AddStreamRecord(record.withStreamKey(key), maxlen, false);
250+
return new AddStreamRecord(record.withStreamKey(key), maxlen, false, false);
247251
}
248252

249253
/**
@@ -252,7 +256,7 @@ public AddStreamRecord to(ByteBuffer key) {
252256
* @return new instance of {@link AddStreamRecord}.
253257
*/
254258
public AddStreamRecord maxlen(long maxlen) {
255-
return new AddStreamRecord(record, maxlen, false);
259+
return new AddStreamRecord(record, maxlen, false, false);
256260
}
257261

258262
/**
@@ -262,7 +266,7 @@ public AddStreamRecord maxlen(long maxlen) {
262266
* @since 2.6
263267
*/
264268
public AddStreamRecord makeNoStream() {
265-
return new AddStreamRecord(record, maxlen, true);
269+
return new AddStreamRecord(record, maxlen, true, false);
266270
}
267271

268272
/**
@@ -273,7 +277,23 @@ public AddStreamRecord makeNoStream() {
273277
* @since 2.6
274278
*/
275279
public AddStreamRecord makeNoStream(boolean makeNoStream) {
276-
return new AddStreamRecord(record, maxlen, makeNoStream);
280+
return new AddStreamRecord(record, maxlen, makeNoStream, false);
281+
}
282+
283+
/**
284+
* Apply efficient trimming for capped streams using the {@code ~} flag.
285+
*
286+
* @return new instance of {@link AddStreamRecord}.
287+
*/
288+
public AddStreamRecord approximateTrimming(boolean approximateTrimming) {
289+
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming);
290+
}
291+
292+
/**
293+
* @return {@literal true} if {@literal approximateTrimming} is set.
294+
*/
295+
public boolean isApproximateTrimming() {
296+
return approximateTrimming;
277297
}
278298

279299
/**

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,21 @@ default RecordId xAdd(MapRecord<byte[], byte[], byte[]> record) {
116116
*
117117
* @author Christoph Strobl
118118
* @author Mark John Moreno
119+
* @author Liming Deng
119120
* @since 2.3
120121
*/
121122
class XAddOptions {
122123

123-
private static final XAddOptions NONE = new XAddOptions(null, false);
124+
private static final XAddOptions NONE = new XAddOptions(null, false, false);
124125

125126
private final @Nullable Long maxlen;
126127
private final boolean nomkstream;
128+
private final boolean approximateTrimming;
127129

128-
private XAddOptions(@Nullable Long maxlen, boolean nomkstream) {
130+
private XAddOptions(@Nullable Long maxlen, boolean nomkstream, boolean approximateTrimming) {
129131
this.maxlen = maxlen;
130132
this.nomkstream = nomkstream;
133+
this.approximateTrimming = approximateTrimming;
131134
}
132135

133136
/**
@@ -143,7 +146,7 @@ public static XAddOptions none() {
143146
* @return new instance of {@link XAddOptions}.
144147
*/
145148
public static XAddOptions maxlen(long maxlen) {
146-
return new XAddOptions(maxlen, false);
149+
return new XAddOptions(maxlen, false, false);
147150
}
148151

149152
/**
@@ -153,7 +156,7 @@ public static XAddOptions maxlen(long maxlen) {
153156
* @since 2.6
154157
*/
155158
public static XAddOptions makeNoStream() {
156-
return new XAddOptions(null, true);
159+
return new XAddOptions(null, true, false);
157160
}
158161

159162
/**
@@ -164,7 +167,23 @@ public static XAddOptions makeNoStream() {
164167
* @since 2.6
165168
*/
166169
public static XAddOptions makeNoStream(boolean makeNoStream) {
167-
return new XAddOptions(null, makeNoStream);
170+
return new XAddOptions(null, makeNoStream, false);
171+
}
172+
173+
/**
174+
* Apply efficient trimming for capped streams using the {@code ~} flag.
175+
*
176+
* @return new instance of {@link XAddOptions}.
177+
*/
178+
public XAddOptions approximateTrimming(boolean approximateTrimming) {
179+
return new XAddOptions(null, nomkstream, approximateTrimming);
180+
}
181+
182+
/**
183+
* @return {@literal true} if {@literal approximateTrimming} is set.
184+
*/
185+
public boolean isApproximateTrimming() {
186+
return approximateTrimming;
168187
}
169188

170189
/**
@@ -202,13 +221,15 @@ public boolean equals(Object o) {
202221
}
203222
XAddOptions that = (XAddOptions) o;
204223
if (this.nomkstream != that.nomkstream) return false;
224+
if (this.approximateTrimming != that.approximateTrimming) return false;
205225
return ObjectUtils.nullSafeEquals(this.maxlen, that.maxlen);
206226
}
207227

208228
@Override
209229
public int hashCode() {
210230
int result = ObjectUtils.nullSafeHashCode(this.maxlen);
211231
result = 31 * result + ObjectUtils.nullSafeHashCode(this.nomkstream);
232+
result = 31 * result + ObjectUtils.nullSafeHashCode(this.approximateTrimming);
212233
return result;
213234
}
214235
}

src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
8787
if (options.isNoMkStream()) {
8888
xAddParams.noMkStream();
8989
}
90+
if (options.isApproximateTrimming()) {
91+
xAddParams.approximateTrimming();
92+
}
9093

9194
return connection.invoke()
9295
.from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), record.getValue(), xAddParams)

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStream
112112
args.maxlen(command.getMaxlen());
113113
}
114114
args.nomkstream(command.isNoMkStream());
115+
args.approximateTrimming(command.isApproximateTrimming());
115116

116117
return cmd.xadd(command.getKey(), args, command.getBody())
117118
.map(value -> new CommandResponse<>(command, RecordId.of(value)));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public RecordId xAdd(MapRecord<byte[], byte[], byte[]> record, XAddOptions optio
9292
args.maxlen(options.getMaxlen());
9393
}
9494
args.nomkstream(options.isNoMkStream());
95+
args.approximateTrimming(options.isApproximateTrimming());
9596

9697
return connection.invoke().from(RedisStreamAsyncCommands::xadd, record.getStream(), args, record.getValue())
9798
.get(RecordId::of);

0 commit comments

Comments
 (0)