Skip to content

Commit 03de6db

Browse files
zielarz25jxblum
authored andcommitted
Add support for XCLAIM in StreamOperations
Closes spring-projects#2465
1 parent 16068cb commit 03de6db

File tree

6 files changed

+165
-11
lines changed

6 files changed

+165
-11
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

+17
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
23+
import java.time.Duration;
2324
import java.util.Arrays;
2425
import java.util.Collections;
2526
import java.util.Map;
@@ -32,6 +33,7 @@
3233
import org.springframework.data.domain.Range;
3334
import org.springframework.data.redis.connection.ReactiveStreamCommands;
3435
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
36+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3537
import org.springframework.data.redis.connection.convert.Converters;
3638
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
3739
import org.springframework.data.redis.connection.stream.Consumer;
@@ -153,6 +155,21 @@ public Mono<RecordId> add(Record<K, ?> record) {
153155
* (non-Javadoc)
154156
* @see org.springframework.data.redis.core.ReactiveStreamOperations#delete(java.lang.Object, java.lang.String[])
155157
*/
158+
@Override
159+
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
160+
RecordId... recordIds) {
161+
162+
return createFlux(connection -> connection.xClaim(rawKey(key), group, newOwner, minIdleTime, recordIds)
163+
.map(this::deserializeRecord));
164+
}
165+
166+
@Override
167+
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
168+
169+
return createFlux(
170+
connection -> connection.xClaim(rawKey(key), group, newOwner, xClaimOptions).map(this::deserializeRecord));
171+
}
172+
156173
@Override
157174
public Mono<Long> delete(K key, RecordId... recordIds) {
158175

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

+31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.redis.core;
1717

1818
import java.nio.charset.StandardCharsets;
19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
@@ -26,6 +27,7 @@
2627
import org.springframework.data.domain.Range;
2728
import org.springframework.data.redis.connection.RedisConnection;
2829
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
30+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
2931
import org.springframework.data.redis.connection.stream.ByteRecord;
3032
import org.springframework.data.redis.connection.stream.Consumer;
3133
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -144,6 +146,35 @@ public RecordId add(Record<K, ?> record) {
144146
* (non-Javadoc)
145147
* @see org.springframework.data.redis.core.StreamOperations#delete(java.lang.Object, java.lang.String[])
146148
*/
149+
@Override
150+
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
151+
RecordId... recordIds) {
152+
byte[] rawKey = rawKey(key);
153+
154+
return execute(new RecordDeserializingRedisCallback() {
155+
156+
@Nullable
157+
@Override
158+
List<ByteRecord> inRedis(RedisConnection connection) {
159+
return connection.streamCommands().xClaim(rawKey, group, newOwner, minIdleTime, recordIds);
160+
}
161+
});
162+
}
163+
164+
@Override
165+
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
166+
byte[] rawKey = rawKey(key);
167+
168+
return execute(new RecordDeserializingRedisCallback() {
169+
170+
@Nullable
171+
@Override
172+
List<ByteRecord> inRedis(RedisConnection connection) {
173+
return connection.streamCommands().xClaim(rawKey, group, newOwner, xClaimOptions);
174+
}
175+
});
176+
}
177+
147178
@Override
148179
public Long delete(K key, RecordId... recordIds) {
149180

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

+31
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

21+
import java.time.Duration;
2122
import java.util.Arrays;
2223
import java.util.Map;
2324

2425
import org.reactivestreams.Publisher;
2526
import org.springframework.data.domain.Range;
27+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
2628
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
2729
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
2830
import org.springframework.data.redis.connection.stream.Consumer;
@@ -136,6 +138,35 @@ default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record) {
136138
*/
137139
Mono<RecordId> add(Record<K, ?> record);
138140

141+
/**
142+
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
143+
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
144+
*
145+
* @param key the stream key.
146+
* @param group name of the consumer group.
147+
* @param newOwner name of the consumer claiming the message.
148+
* @param minIdleTime idle time required for a message to be claimed.
149+
* @param recordIds record IDs to be claimed
150+
*
151+
* @return the {@link Flux} of claimed MapRecords.
152+
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
153+
*/
154+
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
155+
156+
/**
157+
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
158+
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
159+
*
160+
* @param key the stream key.
161+
* @param group name of the consumer group.
162+
* @param newOwner name of the consumer claiming the message.
163+
* @param xClaimOptions additional parameters for the CLAIM call.
164+
*
165+
* @return the {@link Flux} of claimed MapRecords.
166+
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
167+
*/
168+
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
169+
139170
/**
140171
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from
141172
* the number of IDs passed in case certain IDs do not exist.

src/main/java/org/springframework/data/redis/core/StreamOperations.java

+31
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import reactor.core.publisher.Mono;
1919

20+
import java.time.Duration;
2021
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.Map;
@@ -31,6 +32,7 @@
3132
import org.springframework.data.redis.connection.stream.PendingMessages;
3233
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3334
import org.springframework.data.redis.connection.stream.ReadOffset;
35+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3436
import org.springframework.data.redis.connection.stream.Record;
3537
import org.springframework.data.redis.connection.stream.RecordId;
3638
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
@@ -130,6 +132,35 @@ default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record) {
130132
@Nullable
131133
RecordId add(Record<K, ?> record);
132134

135+
/**
136+
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
137+
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
138+
*
139+
* @param key the stream key.
140+
* @param group name of the consumer group.
141+
* @param newOwner name of the consumer claiming the message.
142+
* @param minIdleTime idle time required for a message to be claimed.
143+
* @param recordIds record IDs to be claimed
144+
*
145+
* @return list of claimed MapRecords.
146+
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
147+
*/
148+
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
149+
150+
/**
151+
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
152+
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
153+
*
154+
* @param key the stream key.
155+
* @param group name of the consumer group.
156+
* @param newOwner name of the consumer claiming the message.
157+
* @param xClaimOptions additional parameters for the CLAIM call.
158+
*
159+
* @return list of claimed MapRecords.
160+
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
161+
*/
162+
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
163+
133164
/**
134165
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from
135166
* the number of IDs passed in case certain IDs do not exist.

src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import reactor.test.StepVerifier;
2222

23+
import java.time.Duration;
2324
import java.util.Collection;
2425
import java.util.Collections;
26+
import java.util.Map;
2527

2628
import org.junit.jupiter.api.BeforeEach;
2729

@@ -358,4 +360,29 @@ void pendingShouldReadMessageDetails() {
358360
}).verifyComplete();
359361

360362
}
363+
364+
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
365+
void claimShouldReadMessageDetails() {
366+
367+
K key = keyFactory.instance();
368+
HK hashKey = hashKeyFactory.instance();
369+
HV value = valueFactory.instance();
370+
371+
Map<HK, HV> content = Collections.singletonMap(hashKey, value);
372+
RecordId messageId = streamOperations.add(key, content).block();
373+
374+
streamOperations.createGroup(key, ReadOffset.from("0-0"), "my-group").then().as(StepVerifier::create)
375+
.verifyComplete();
376+
377+
streamOperations.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed()))
378+
.then().as(StepVerifier::create).verifyComplete();
379+
380+
streamOperations.claim(key, "my-group", "name", Duration.ZERO, messageId).as(StepVerifier::create)
381+
.assertNext(claimed -> {
382+
assertThat(claimed.getStream()).isEqualTo(key);
383+
assertThat(claimed.getValue()).isEqualTo(content);
384+
assertThat(claimed.getId()).isEqualTo(messageId);
385+
}).verifyComplete();
386+
387+
}
361388
}

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

+28-11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.assertj.core.api.Assumptions.*;
2020

21+
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
2324
import java.util.Collections;
@@ -34,16 +35,7 @@
3435
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
3536
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
3637
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
37-
import org.springframework.data.redis.connection.stream.Consumer;
38-
import org.springframework.data.redis.connection.stream.MapRecord;
39-
import org.springframework.data.redis.connection.stream.ObjectRecord;
40-
import org.springframework.data.redis.connection.stream.PendingMessages;
41-
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
42-
import org.springframework.data.redis.connection.stream.ReadOffset;
43-
import org.springframework.data.redis.connection.stream.RecordId;
44-
import org.springframework.data.redis.connection.stream.StreamOffset;
45-
import org.springframework.data.redis.connection.stream.StreamReadOptions;
46-
import org.springframework.data.redis.connection.stream.StreamRecords;
38+
import org.springframework.data.redis.connection.stream.*;
4739
import org.springframework.data.redis.test.condition.EnabledOnCommand;
4840
import org.springframework.data.redis.test.condition.EnabledOnRedisDriver;
4941
import org.springframework.data.redis.test.condition.EnabledOnRedisVersion;
@@ -72,7 +64,7 @@ public class DefaultStreamOperationsIntegrationTests<K, HK, HV> {
7264
private final StreamOperations<K, HK, HV> streamOps;
7365

7466
public DefaultStreamOperationsIntegrationTests(RedisTemplate<K, ?> redisTemplate, ObjectFactory<K> keyFactory,
75-
ObjectFactory<?> objectFactory) {
67+
ObjectFactory<?> objectFactory) {
7668

7769
this.redisTemplate = redisTemplate;
7870
this.connectionFactory = redisTemplate.getRequiredConnectionFactory();
@@ -420,4 +412,29 @@ void pendingShouldReadMessageDetails() {
420412
assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
421413
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
422414
}
415+
416+
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
417+
void claimShouldReadMessageDetails() {
418+
419+
K key = keyFactory.instance();
420+
HK hashKey = hashKeyFactory.instance();
421+
HV value = hashValueFactory.instance();
422+
423+
RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value));
424+
streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group");
425+
streamOps.read(Consumer.from("my-group", "name"), StreamOffset.create(key, ReadOffset.lastConsumed()));
426+
427+
List<MapRecord<K, HK, HV>> messages = streamOps.claim(key, "my-group", "new-owner", Duration.ZERO, messageId);
428+
429+
assertThat(messages).hasSize(1);
430+
431+
MapRecord<K, HK, HV> message = messages.get(0);
432+
433+
assertThat(message.getId()).isEqualTo(messageId);
434+
assertThat(message.getStream()).isEqualTo(key);
435+
436+
if (!(key instanceof byte[] || value instanceof byte[])) {
437+
assertThat(message.getValue()).containsEntry(hashKey, value);
438+
}
439+
}
423440
}

0 commit comments

Comments
 (0)