Skip to content

Commit 7e1746c

Browse files
committed
1 parent 9929053 commit 7e1746c

File tree

7 files changed

+117
-73
lines changed

7 files changed

+117
-73
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
23-
import java.time.Duration;
2423
import java.util.Arrays;
2524
import java.util.Collections;
2625
import java.util.Map;
@@ -59,6 +58,8 @@
5958
*
6059
* @author Mark Paluch
6160
* @author Christoph Strobl
61+
* @author Marcin Zielinski
62+
* @author John Blum
6263
* @since 2.2
6364
*/
6465
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -144,20 +145,12 @@ public Mono<RecordId> add(Record<K, ?> record) {
144145
}
145146

146147
@Override
147-
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
148-
RecordId... recordIds) {
148+
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
149149

150-
return createFlux(connection -> connection.xClaim(rawKey(key), group, newOwner, minIdleTime, recordIds)
150+
return createFlux(connection -> connection.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions)
151151
.map(this::deserializeRecord));
152152
}
153153

154-
@Override
155-
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
156-
157-
return createFlux(
158-
connection -> connection.xClaim(rawKey(key), group, newOwner, xClaimOptions).map(this::deserializeRecord));
159-
}
160-
161154
@Override
162155
public Mono<Long> delete(K key, RecordId... recordIds) {
163156

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

+7-21
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.springframework.data.redis.core;
1717

1818
import java.nio.charset.StandardCharsets;
19-
import java.time.Duration;
2019
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.Collections;
@@ -43,6 +42,7 @@
4342
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4443
import org.springframework.data.redis.hash.HashMapper;
4544
import org.springframework.data.redis.serializer.RedisSerializer;
45+
import org.springframework.data.redis.support.collections.CollectionUtils;
4646
import org.springframework.lang.Nullable;
4747
import org.springframework.util.Assert;
4848
import org.springframework.util.ClassUtils;
@@ -52,6 +52,8 @@
5252
*
5353
* @author Mark Paluch
5454
* @author Christoph Strobl
55+
* @author Marcin Zielinski
56+
* @author John Blum
5557
* @since 2.2
5658
*/
5759
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -135,32 +137,16 @@ public RecordId add(Record<K, ?> record) {
135137
}
136138

137139
@Override
138-
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
139-
RecordId... recordIds) {
140-
byte[] rawKey = rawKey(key);
140+
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
141141

142-
return execute(new RecordDeserializingRedisCallback() {
142+
return CollectionUtils.nullSafeList(execute(new RecordDeserializingRedisCallback() {
143143

144144
@Nullable
145145
@Override
146146
List<ByteRecord> inRedis(RedisConnection connection) {
147-
return connection.streamCommands().xClaim(rawKey, group, newOwner, minIdleTime, recordIds);
147+
return connection.streamCommands().xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions);
148148
}
149-
});
150-
}
151-
152-
@Override
153-
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
154-
byte[] rawKey = rawKey(key);
155-
156-
return execute(new RecordDeserializingRedisCallback() {
157-
158-
@Nullable
159-
@Override
160-
List<ByteRecord> inRedis(RedisConnection connection) {
161-
return connection.streamCommands().xClaim(rawKey, group, newOwner, xClaimOptions);
162-
}
163-
});
149+
}));
164150
}
165151

166152
@Override

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

+47-19
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,22 @@
2727
import org.springframework.data.domain.Range;
2828
import org.springframework.data.redis.connection.Limit;
2929
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
30-
import org.springframework.data.redis.connection.stream.*;
30+
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
31+
import org.springframework.data.redis.connection.stream.Consumer;
32+
import org.springframework.data.redis.connection.stream.MapRecord;
33+
import org.springframework.data.redis.connection.stream.ObjectRecord;
34+
import org.springframework.data.redis.connection.stream.PendingMessage;
35+
import org.springframework.data.redis.connection.stream.PendingMessages;
36+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
37+
import org.springframework.data.redis.connection.stream.ReadOffset;
3138
import org.springframework.data.redis.connection.stream.Record;
39+
import org.springframework.data.redis.connection.stream.RecordId;
3240
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
3341
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
3442
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
43+
import org.springframework.data.redis.connection.stream.StreamOffset;
44+
import org.springframework.data.redis.connection.stream.StreamReadOptions;
45+
import org.springframework.data.redis.connection.stream.StreamRecords;
3546
import org.springframework.data.redis.hash.HashMapper;
3647
import org.springframework.lang.Nullable;
3748
import org.springframework.util.Assert;
@@ -42,6 +53,8 @@
4253
* @author Mark Paluch
4354
* @author Christoph Strobl
4455
* @author Dengliming
56+
* @author Marcin Zielinski
57+
* @author John Blum
4558
* @since 2.2
4659
*/
4760
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -129,33 +142,48 @@ default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record) {
129142
Mono<RecordId> add(Record<K, ?> record);
130143

131144
/**
132-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
133-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
145+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
146+
* the command argument.
134147
*
135-
* @param key the stream key.
136-
* @param group name of the consumer group.
137-
* @param newOwner name of the consumer claiming the message.
138-
* @param minIdleTime idle time required for a message to be claimed.
139-
* @param recordIds record IDs to be claimed
148+
* The message is claimed only if its idle time (ms) is greater than the {@link Duration minimum idle time}
149+
* specified when calling {@literal XCLAIM}.
140150
*
141-
* @return the {@link Flux} of claimed MapRecords.
151+
* @param key {@link K key} to the steam.
152+
* @param consumerGroup {@link String name} of the consumer group.
153+
* @param newOwner {@link String name} of the consumer claiming the message.
154+
* @param minIdleTime {@link Duration minimum idle time} required for a message to be claimed.
155+
* @param recordIds {@link RecordId record IDs} to be claimed.
156+
* @return {@link Flux} of claimed {@link MapRecord MapRecords}.
142157
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
158+
* @see org.springframework.data.redis.connection.stream.MapRecord
159+
* @see org.springframework.data.redis.connection.stream.RecordId
160+
* @see #claim(Object, String, String, XClaimOptions)
161+
* @see reactor.core.publisher.Flux
143162
*/
144-
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
163+
default Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime,
164+
RecordId... recordIds) {
165+
166+
return claim(key, consumerGroup, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
167+
}
145168

146169
/**
147-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
148-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
149-
*
150-
* @param key the stream key.
151-
* @param group name of the consumer group.
152-
* @param newOwner name of the consumer claiming the message.
153-
* @param xClaimOptions additional parameters for the CLAIM call.
170+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
171+
* the command argument.
172+
173+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
174+
* specified when calling {@literal XCLAIM}.
154175
*
155-
* @return the {@link Flux} of claimed MapRecords.
176+
* @param key {@link K key} to the steam.
177+
* @param consumerGroup {@link String name} of the consumer group.
178+
* @param newOwner {@link String name} of the consumer claiming the message.
179+
* @param xClaimOptions additional parameters for the {@literal CLAIM} call.
180+
* @return a {@link Flux} of claimed {@link MapRecord MapRecords}.
156181
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
182+
* @see org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions
183+
* @see org.springframework.data.redis.connection.stream.MapRecord
184+
* @see reactor.core.publisher.Flux
157185
*/
158-
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
186+
Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions);
159187

160188
/**
161189
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from

src/main/java/org/springframework/data/redis/core/StreamOperations.java

+44-18
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,22 @@
2525
import org.springframework.data.domain.Range;
2626
import org.springframework.data.redis.connection.Limit;
2727
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
28-
import org.springframework.data.redis.connection.stream.*;
28+
import org.springframework.data.redis.connection.stream.ByteRecord;
29+
import org.springframework.data.redis.connection.stream.Consumer;
30+
import org.springframework.data.redis.connection.stream.MapRecord;
31+
import org.springframework.data.redis.connection.stream.ObjectRecord;
32+
import org.springframework.data.redis.connection.stream.PendingMessage;
33+
import org.springframework.data.redis.connection.stream.PendingMessages;
34+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
35+
import org.springframework.data.redis.connection.stream.ReadOffset;
2936
import org.springframework.data.redis.connection.stream.Record;
37+
import org.springframework.data.redis.connection.stream.RecordId;
3038
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumers;
3139
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroups;
3240
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
41+
import org.springframework.data.redis.connection.stream.StreamOffset;
42+
import org.springframework.data.redis.connection.stream.StreamReadOptions;
43+
import org.springframework.data.redis.connection.stream.StreamRecords;
3344
import org.springframework.data.redis.hash.HashMapper;
3445
import org.springframework.lang.Nullable;
3546
import org.springframework.util.Assert;
@@ -40,6 +51,8 @@
4051
* @author Mark Paluch
4152
* @author Christoph Strobl
4253
* @author Dengliming
54+
* @author Marcin Zielinski
55+
* @author John Blum
4356
* @since 2.2
4457
*/
4558
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -122,33 +135,46 @@ default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record) {
122135
RecordId add(Record<K, ?> record);
123136

124137
/**
125-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
126-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
138+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
139+
* the command argument.
127140
*
128-
* @param key the stream key.
129-
* @param group name of the consumer group.
130-
* @param newOwner name of the consumer claiming the message.
131-
* @param minIdleTime idle time required for a message to be claimed.
132-
* @param recordIds record IDs to be claimed
141+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
142+
* specified when calling {@literal XCLAIM}.
133143
*
134-
* @return list of claimed MapRecords.
144+
* @param key {@link K key} to the steam.
145+
* @param consumerGroup {@link String name} of the consumer group.
146+
* @param newOwner {@link String name} of the consumer claiming the message.
147+
* @param minIdleTime {@link Duration minimum idle time} required for a message to be claimed.
148+
* @param recordIds {@link RecordId record IDs} to be claimed.
149+
* @return {@link List} of claimed {@link MapRecord MapRecords}.
135150
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
151+
* @see org.springframework.data.redis.connection.stream.MapRecord
152+
* @see org.springframework.data.redis.connection.stream.RecordId
153+
* @see #claim(Object, String, String, XClaimOptions)
136154
*/
137-
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
155+
default List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime,
156+
RecordId... recordIds) {
157+
158+
return claim(key, consumerGroup, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
159+
}
138160

139161
/**
140-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
141-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
162+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
163+
* the command argument.
142164
*
143-
* @param key the stream key.
144-
* @param group name of the consumer group.
145-
* @param newOwner name of the consumer claiming the message.
146-
* @param xClaimOptions additional parameters for the CLAIM call.
165+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
166+
* specified when calling {@literal XCLAIM}.
147167
*
148-
* @return list of claimed MapRecords.
168+
* @param key {@link K key} to the steam.
169+
* @param consumerGroup {@link String name} of the consumer group.
170+
* @param newOwner {@link String name} of the consumer claiming the message.
171+
* @param xClaimOptions additional parameters for the {@literal CLAIM} call.
172+
* @return {@link List} of claimed {@link MapRecord MapRecords}.
149173
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
174+
* @see org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions
175+
* @see org.springframework.data.redis.connection.stream.MapRecord
150176
*/
151-
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
177+
List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions);
152178

153179
/**
154180
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from

src/main/java/org/springframework/data/redis/support/collections/CollectionUtils.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
import java.util.ArrayList;
1919
import java.util.Arrays;
2020
import java.util.Collection;
21+
import java.util.Collections;
2122
import java.util.List;
2223

2324
import org.springframework.dao.DataAccessException;
2425
import org.springframework.data.redis.core.RedisOperations;
2526
import org.springframework.data.redis.core.SessionCallback;
27+
import org.springframework.lang.NonNull;
28+
import org.springframework.lang.Nullable;
2629

2730
/**
2831
* Utility class used mainly for type conversion by the default collection implementations. Meant for internal use.
2932
*
3033
* @author Costin Leau
34+
* @author John Blum
3135
*/
32-
abstract class CollectionUtils {
36+
public abstract class CollectionUtils {
3337

3438
@SuppressWarnings("unchecked")
3539
static <E> Collection<E> reverse(Collection<? extends E> c) {
@@ -70,4 +74,9 @@ public Object execute(RedisOperations operations) throws DataAccessException {
7074
}
7175
});
7276
}
77+
78+
@NonNull
79+
public static <T> List<T> nullSafeList(@Nullable List<T> list) {
80+
return list != null ? list : Collections.emptyList();
81+
}
7382
}

src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
* Integration tests for {@link DefaultReactiveStreamOperations}.
6060
*
6161
* @author Mark Paluch
62-
* @auhtor Christoph Strobl
62+
* @author Christoph Strobl
63+
* @author Marcin Zielinski
6364
*/
6465
@MethodSource("testParams")
6566
@SuppressWarnings("unchecked")
@@ -361,7 +362,7 @@ void pendingShouldReadMessageDetails() {
361362

362363
}
363364

364-
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
365+
@ParameterizedRedisTest // GH-2465
365366
void claimShouldReadMessageDetails() {
366367

367368
K key = keyFactory.instance();

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53+
* @author Marcin Zielinski
5354
*/
5455
@MethodSource("testParams")
5556
@EnabledOnCommand("XADD")
@@ -413,7 +414,7 @@ void pendingShouldReadMessageDetails() {
413414
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
414415
}
415416

416-
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
417+
@ParameterizedRedisTest // GH-2465
417418
void claimShouldReadMessageDetails() {
418419

419420
K key = keyFactory.instance();

0 commit comments

Comments
 (0)