Skip to content

Add overloads for XAddOptions in StreamOperations #2936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand All @@ -58,6 +59,7 @@
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamCommands {
Expand Down Expand Up @@ -394,11 +396,40 @@ default Mono<RecordId> xAdd(ByteBufferRecord record) {
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
}

/**
* Add stream record with the specified options.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return {@link Mono} the {@link RecordId id}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {

Assert.notNull(record, "Record must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");

AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
.approximateTrimming(xAddOptions.isApproximateTrimming())
.makeNoStream(xAddOptions.isNoMkStream());

if (xAddOptions.hasMaxlen()) {
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
}

if (xAddOptions.hasMinId()) {
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
}

return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
}
Copy link
Contributor Author

@jinkshower jinkshower Jul 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason why I wrote somewhat verbose method in here is

unlike JedisStreamCommands or LettuceStreamCommands
which both have a method accepting (MapRecord<byte[], byte[], byte[]> record, XAddOptions options)

xAdd(Publisher commands) in LettuceReactiveStreamCommands
expects that Publisher already has variables of XAddOptions.
ex) maxlen, isApproxiateTrimming...

so I placed the decapsulation of XAddOptions here.

I thought of changing AddStreamRecord to have variable priavte final XAddOptions options
so that LettuceReactiveStreamCommands can delegate the decapsulation to other class
but I thought this exceeds the scope of this issue.


/**
* Add stream record with given {@literal body} to {@literal key}.
*
* @param commands must not be {@literal null}.
* @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
* @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed typo

* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
*/
Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand Down Expand Up @@ -60,6 +61,7 @@
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
Expand Down Expand Up @@ -146,6 +148,18 @@ public Mono<RecordId> add(Record<K, ?> record) {
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
}

@Override
public Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions) {

Assert.notNull(record.getStream(), "Key must not be null");
Assert.notNull(record.getValue(), "Body must not be null");
Assert.notNull(xAddOptions, "XAddOptions must not be null");

MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);

return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
}

@Override
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
Expand Down Expand Up @@ -54,6 +55,7 @@
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
Expand Down Expand Up @@ -136,6 +138,21 @@ public RecordId add(Record<K, ?> record) {
return execute(connection -> connection.xAdd(binaryRecord));
}

@Nullable
@Override
@SuppressWarnings("unchecked")
public RecordId add(Record<K , ?> record, XAddOptions options) {

Assert.notNull(record, "Record must not be null");
Assert.notNull(options, "XAddOptions must not be null");

MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record);

ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());

return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
}

@Override
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
Expand Down Expand Up @@ -54,6 +55,7 @@
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
Expand Down Expand Up @@ -94,6 +96,63 @@ default Mono<Long> acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getRequiredStream(), group, record.getId());
}

/**
* Append one or more records to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param bodyPublisher record body {@link Publisher}.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the record Ids.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Flux<RecordId> add (K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher,
XAddOptions xAddOptions) {
return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
}

/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}

/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions parameters for the {@literal XADD} call.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}

/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the {@link Mono} emitting the {@link RecordId}.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
Mono<RecordId> add(Record<K, ?> record, XAddOptions xAddOptions);

/**
* Append one or more records to the stream {@code key}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
Expand Down Expand Up @@ -53,6 +54,7 @@
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
* @author jinkshower
* @since 2.2
*/
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
Expand Down Expand Up @@ -95,6 +97,53 @@ default Long acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getRequiredStream(), group, record.getId());
}

/**
* Append a record to the stream {@code key} with the specified options.
*
* @param key the stream key.
* @param content record content as Map.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(K key, Map<? extends HK, ? extends HV> content, XAddOptions xAddOptions) {
return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
}

/**
* Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
*
* @param record the record to append.
* @param xAddOptions additional parameters for the {@literal XADD} call.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record, XAddOptions xAddOptions) {
return add((Record) record, xAddOptions);
}

/**
* Append the record, backed by the given value, to the stream with the specified options.
* The value will be hashed and serialized.
*
* @param record must not be {@literal null}.
* @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
* @return the record Id. {@literal null} when used in pipeline / transaction.
* @see MapRecord
* @see ObjectRecord
* @see <a href="https://redis.io/commands/xadd">Redis Documentation: XADD</a>
* @since 3.3
*/
@SuppressWarnings("unchecked")
@Nullable
RecordId add(Record<K, ?> record, XAddOptions xAddOptions);

/**
* Append a record to the stream {@code key}.
*
Expand Down
Loading