Skip to content

Commit e94fdd4

Browse files
committed
Add support for xautoclaim
1 parent 891e75d commit e94fdd4

9 files changed

+347
-0
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

+30
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.data.geo.Metric;
3030
import org.springframework.data.geo.Point;
3131
import org.springframework.data.redis.connection.stream.ByteRecord;
32+
import org.springframework.data.redis.connection.stream.ClaimedMessages;
33+
import org.springframework.data.redis.connection.stream.ClaimedMessagesIds;
3234
import org.springframework.data.redis.connection.stream.Consumer;
3335
import org.springframework.data.redis.connection.stream.MapRecord;
3436
import org.springframework.data.redis.connection.stream.PendingMessages;
@@ -503,6 +505,34 @@ default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClai
503505
return streamCommands().xClaim(key, group, newOwner, options);
504506
}
505507

508+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
509+
@Override
510+
@Deprecated
511+
default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
512+
return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start);
513+
}
514+
515+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
516+
@Override
517+
@Deprecated
518+
default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
519+
return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start, count);
520+
}
521+
522+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
523+
@Override
524+
@Deprecated
525+
default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
526+
return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start);
527+
}
528+
529+
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
530+
@Override
531+
@Deprecated
532+
default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
533+
return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start, count);
534+
}
535+
506536
/** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */
507537
@Override
508538
@Deprecated

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

+65
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,71 @@ public XClaimOptions ids(String... ids) {
521521
}
522522
}
523523

524+
/**
525+
* Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs
526+
* of messages successfully claimed, without returning the actual message. The retry counter is not incremented.
527+
*
528+
* @param key the {@literal key} the stream is stored at.
529+
* @param group the name of the {@literal consumer group}.
530+
* @param newOwner the name of the new {@literal consumer}.
531+
* @param minIdleTime must not be {@literal null}.
532+
* @param start must not be {@literal null}.
533+
* @return list of {@link RecordId ids} that changed user.
534+
* @see <a href="https://redis.io/commands/xautoclaim">Redis Documentation: XAUTOCLAIM</a>
535+
* @since 2.3
536+
*/
537+
@Nullable
538+
ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start);
539+
540+
/**
541+
* Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs
542+
* of messages successfully claimed, without returning the actual message. The retry counter is not incremented.
543+
*
544+
* @param key the {@literal key} the stream is stored at.
545+
* @param group the name of the {@literal consumer group}.
546+
* @param newOwner the name of the new {@literal consumer}.
547+
* @param minIdleTime must not be {@literal null}.
548+
* @param start must not be {@literal null}.
549+
* @param count limit the number of results. Must not be {@literal null}.
550+
* @return list of {@link RecordId ids} that changed user.
551+
* @see <a href="https://redis.io/commands/xautoclaim">Redis Documentation: XAUTOCLAIM</a>
552+
* @since 2.3
553+
*/
554+
@Nullable
555+
ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count);
556+
557+
558+
/**
559+
* Transfer ownership of pending stream entries that match the specified criteria.
560+
*
561+
* @param key the {@literal key} the stream is stored at.
562+
* @param group the name of the {@literal consumer group}.
563+
* @param newOwner the name of the new {@literal consumer}.
564+
* @param minIdleTime must not be {@literal null}.
565+
* @param start must not be {@literal null}.
566+
* @return list of {@link ByteRecord} that changed user.
567+
* @see <a href="https://redis.io/commands/xautoclaim">Redis Documentation: XAUTOCLAIM</a>
568+
*/
569+
@Nullable
570+
ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start);
571+
572+
/**
573+
* Transfer ownership of pending stream entries that match the specified criteria.
574+
*
575+
* @param key the {@literal key} the stream is stored at.
576+
* @param group the name of the {@literal consumer group}.
577+
* @param newOwner the name of the new {@literal consumer}.
578+
* @param minIdleTime must not be {@literal null}.
579+
* @param start must not be {@literal null}.
580+
* @param count limit the number of results. Must not be {@literal null}.
581+
* @return list of {@link ByteRecord} that changed user.
582+
* @see <a href="https://redis.io/commands/xautoclaim">Redis Documentation: XAUTOCLAIM</a>
583+
* @since 2.3
584+
*/
585+
@Nullable
586+
ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count);
587+
588+
524589
/**
525590
* Removes the records with the given id's from the stream. Returns the number of items deleted, that may be different
526591
* from the number of id's passed in case certain id's do not exist.

src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java

+23
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
import static org.springframework.data.redis.connection.jedis.StreamConverters.*;
1919

20+
import org.springframework.data.redis.connection.stream.ClaimedMessages;
21+
import org.springframework.data.redis.connection.stream.ClaimedMessagesIds;
2022
import redis.clients.jedis.BuilderFactory;
2123
import redis.clients.jedis.params.XAddParams;
2224
import redis.clients.jedis.params.XClaimParams;
2325
import redis.clients.jedis.params.XPendingParams;
2426
import redis.clients.jedis.params.XReadGroupParams;
2527
import redis.clients.jedis.params.XReadParams;
2628

29+
import java.time.Duration;
2730
import java.util.ArrayList;
2831
import java.util.Arrays;
2932
import java.util.Collections;
@@ -131,6 +134,26 @@ public List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaim
131134
}
132135
}
133136

137+
@Override
138+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
139+
return null;
140+
}
141+
142+
@Override
143+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
144+
return null;
145+
}
146+
147+
@Override
148+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
149+
return null;
150+
}
151+
152+
@Override
153+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
154+
return null;
155+
}
156+
134157
@Override
135158
public Long xDel(byte[] key, RecordId... recordIds) {
136159

src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java

+72
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,22 @@
1515
*/
1616
package org.springframework.data.redis.connection.jedis;
1717

18+
import org.springframework.data.redis.connection.stream.ClaimedMessages;
19+
import org.springframework.data.redis.connection.stream.ClaimedMessagesIds;
1820
import redis.clients.jedis.BuilderFactory;
1921
import redis.clients.jedis.Jedis;
2022
import redis.clients.jedis.commands.PipelineBinaryCommands;
2123
import redis.clients.jedis.commands.StreamPipelineBinaryCommands;
2224
import redis.clients.jedis.params.XAddParams;
25+
import redis.clients.jedis.params.XAutoClaimParams;
2326
import redis.clients.jedis.params.XClaimParams;
2427
import redis.clients.jedis.params.XPendingParams;
2528
import redis.clients.jedis.params.XReadGroupParams;
2629
import redis.clients.jedis.params.XReadParams;
2730
import redis.clients.jedis.resps.StreamConsumersInfo;
2831
import redis.clients.jedis.resps.StreamGroupInfo;
2932

33+
import java.time.Duration;
3034
import java.util.ArrayList;
3135
import java.util.Arrays;
3236
import java.util.Collections;
@@ -116,6 +120,74 @@ public List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaim
116120
.get(r -> StreamConverters.convertToByteRecord(key, r));
117121
}
118122

123+
@Override
124+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
125+
126+
Assert.notNull(key, "Key must not be null");
127+
Assert.notNull(group, "Group must not be null");
128+
Assert.notNull(minIdleTime, "MinIdleTime must not be null");
129+
Assert.notNull(start, "Start must not be null");
130+
131+
XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams();
132+
133+
final List<ByteRecord> byteRecords = connection.invoke()
134+
.from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group),
135+
JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start),
136+
params).get(r -> StreamConverters.convertToByteRecord(key, r));
137+
138+
return null;
139+
}
140+
141+
@Override
142+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
143+
Assert.notNull(key, "Key must not be null");
144+
Assert.notNull(group, "Group must not be null");
145+
Assert.notNull(minIdleTime, "MinIdleTime must not be null");
146+
Assert.notNull(start, "Start must not be null");
147+
Assert.notNull(count, "Count must not be null");
148+
149+
XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue());
150+
151+
final List<ByteRecord> byteRecords = connection.invoke()
152+
.from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group),
153+
JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start),
154+
params).get(r -> StreamConverters.convertToByteRecord(key, r));
155+
156+
return null;
157+
}
158+
159+
@Override
160+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
161+
Assert.notNull(key, "Key must not be null");
162+
Assert.notNull(group, "Group must not be null");
163+
Assert.notNull(newOwner, "NewOwner must not be null");
164+
165+
XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams();
166+
167+
final List<ByteRecord> br= connection.invoke()
168+
.from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group),
169+
JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start),
170+
params).get(r -> StreamConverters.convertToByteRecord(key, r));
171+
172+
return null;
173+
}
174+
175+
@Override
176+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
177+
Assert.notNull(key, "Key must not be null");
178+
Assert.notNull(group, "Group must not be null");
179+
Assert.notNull(newOwner, "NewOwner must not be null");
180+
181+
XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue());
182+
183+
final List<ByteRecord> br= connection.invoke()
184+
.from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group),
185+
JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start),
186+
params).get(r -> StreamConverters.convertToByteRecord(key, r));
187+
188+
return null;
189+
}
190+
119191
@Override
120192
public Long xDel(byte[] key, RecordId... recordIds) {
121193

src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import redis.clients.jedis.BuilderFactory;
1919
import redis.clients.jedis.StreamEntryID;
2020
import redis.clients.jedis.params.XAddParams;
21+
import redis.clients.jedis.params.XAutoClaimParams;
2122
import redis.clients.jedis.params.XClaimParams;
2223
import redis.clients.jedis.params.XPendingParams;
2324
import redis.clients.jedis.params.XReadGroupParams;
@@ -263,6 +264,17 @@ public static XClaimParams toXClaimParams(RedisStreamCommands.XClaimOptions opti
263264
return params;
264265
}
265266

267+
public static XAutoClaimParams toXautoClaimParams(Integer count) {
268+
269+
XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams();
270+
271+
if (count != null) {
272+
params.count(count);
273+
}
274+
275+
return params;
276+
}
277+
266278
public static XReadParams toXReadParams(StreamReadOptions readOptions) {
267279

268280
XReadParams params = XReadParams.xReadParams();

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

+49
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
package org.springframework.data.redis.connection.lettuce;
1717

1818
import io.lettuce.core.XAddArgs;
19+
import io.lettuce.core.XAutoClaimArgs;
1920
import io.lettuce.core.XClaimArgs;
2021
import io.lettuce.core.XGroupCreateArgs;
2122
import io.lettuce.core.XReadArgs;
2223
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
2324
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
2425

26+
import java.time.Duration;
2527
import java.util.Arrays;
2628
import java.util.List;
2729
import java.util.Objects;
@@ -31,6 +33,8 @@
3133
import org.springframework.data.redis.connection.Limit;
3234
import org.springframework.data.redis.connection.RedisStreamCommands;
3335
import org.springframework.data.redis.connection.stream.ByteRecord;
36+
import org.springframework.data.redis.connection.stream.ClaimedMessages;
37+
import org.springframework.data.redis.connection.stream.ClaimedMessagesIds;
3438
import org.springframework.data.redis.connection.stream.Consumer;
3539
import org.springframework.data.redis.connection.stream.MapRecord;
3640
import org.springframework.data.redis.connection.stream.PendingMessages;
@@ -50,6 +54,7 @@
5054
* @author Dejan Jankov
5155
* @author Dengliming
5256
* @author Mark John Moreno
57+
* @author Krzysztof Kocel
5358
* @since 2.2
5459
*/
5560
class LettuceStreamCommands implements RedisStreamCommands {
@@ -117,6 +122,50 @@ public List<ByteRecord> xClaim(byte[] key, String group, String newOwner, XClaim
117122
.toList(StreamConverters.byteRecordConverter());
118123
}
119124

125+
@Override
126+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
127+
128+
io.lettuce.core.Consumer<byte[]> from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group),
129+
LettuceConverters.toBytes(newOwner));
130+
XAutoClaimArgs<byte[]> args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start);
131+
132+
return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args)
133+
.get(StreamConverters.claimedMessageJustIdConverter());
134+
}
135+
136+
@Override
137+
public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
138+
139+
io.lettuce.core.Consumer<byte[]> from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group),
140+
LettuceConverters.toBytes(newOwner));
141+
XAutoClaimArgs<byte[]> args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start).count(count);
142+
143+
return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args)
144+
.get(StreamConverters.claimedMessageJustIdConverter());
145+
}
146+
147+
@Override
148+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) {
149+
150+
io.lettuce.core.Consumer<byte[]> from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group),
151+
LettuceConverters.toBytes(newOwner));
152+
XAutoClaimArgs<byte[]> args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start);
153+
154+
return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args)
155+
.get(StreamConverters.claimedMessageConverter());
156+
}
157+
158+
@Override
159+
public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) {
160+
161+
io.lettuce.core.Consumer<byte[]> from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group),
162+
LettuceConverters.toBytes(newOwner));
163+
XAutoClaimArgs<byte[]> args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start).count(count);
164+
165+
return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args)
166+
.get(StreamConverters.claimedMessageConverter());
167+
}
168+
120169
@Override
121170
public Long xDel(byte[] key, RecordId... recordIds) {
122171

src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java

+10
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import io.lettuce.core.StreamMessage;
1919
import io.lettuce.core.XClaimArgs;
2020
import io.lettuce.core.XReadArgs;
21+
import io.lettuce.core.models.stream.ClaimedMessages;
2122
import io.lettuce.core.models.stream.PendingMessage;
2223
import io.lettuce.core.models.stream.PendingMessages;
2324

2425
import java.time.Duration;
2526
import java.util.List;
27+
import java.util.stream.Collectors;
2628

2729
import org.springframework.core.convert.converter.Converter;
2830
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
@@ -71,6 +73,14 @@ static Converter<StreamMessage<byte[], byte[]>, ByteRecord> byteRecordConverter(
7173
return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody());
7274
}
7375

76+
static Converter<ClaimedMessages<byte[], byte[]>, org.springframework.data.redis.connection.stream.ClaimedMessages> claimedMessageConverter() {
77+
return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessages(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> StreamConverters.byteRecordConverter().convert(message)).toList(), List.of());
78+
}
79+
80+
static Converter<ClaimedMessages<byte[], byte[]>, org.springframework.data.redis.connection.stream.ClaimedMessagesIds> claimedMessageJustIdConverter() {
81+
return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessagesIds(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> RecordId.of(message.getId())).toList(), List.of());
82+
}
83+
7484
/**
7585
* Convert the raw Lettuce xpending result to {@link PendingMessages}.
7686
*

0 commit comments

Comments
 (0)