Skip to content

Commit da5bab1

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-1197 - Upgrade to Lettuce 6.0 RC1.
Adopt to API changes. Original Pull Request: #554
1 parent 51ea73a commit da5bab1

File tree

10 files changed

+49
-56
lines changed

10 files changed

+49
-56
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<beanutils>1.9.2</beanutils>
2323
<xstream>1.4.12</xstream>
2424
<pool>2.7.0</pool>
25-
<lettuce>5.3.3.RELEASE</lettuce>
25+
<lettuce>6.0.0.RC1</lettuce>
2626
<jedis>3.3.0</jedis>
2727
<multithreadedtc>1.01</multithreadedtc>
2828
<netty>4.1.51.Final</netty>

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveScriptingCommands.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.springframework.data.redis.connection.ReactiveScriptingCommands;
2828
import org.springframework.data.redis.connection.ReturnType;
29+
import org.springframework.data.redis.util.ByteUtils;
2930
import org.springframework.util.Assert;
3031

3132
/**
@@ -80,7 +81,7 @@ public Mono<String> scriptLoad(ByteBuffer script) {
8081

8182
Assert.notNull(script, "Script must not be null!");
8283

83-
return connection.execute(cmd -> cmd.scriptLoad(script)).next();
84+
return connection.execute(cmd -> cmd.scriptLoad(ByteUtils.getBytes(script))).next();
8485
}
8586

8687
/*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
import io.lettuce.core.XReadArgs;
2222
import io.lettuce.core.XReadArgs.StreamOffset;
2323
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
24+
import io.lettuce.core.models.stream.PendingMessage;
2425
import reactor.core.publisher.Flux;
2526

2627
import java.nio.ByteBuffer;
27-
import java.util.ArrayList;
2828
import java.util.Collection;
2929
import java.util.List;
3030
import java.util.function.Function;
3131

3232
import org.reactivestreams.Publisher;
33+
3334
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
3435
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
3536
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
@@ -204,7 +205,7 @@ public Flux<CommandResponse<GroupCommand, String>> xGroup(Publisher<GroupCommand
204205
.xgroupDelconsumer(command.getKey(),
205206
io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(command.getGroupName()),
206207
ByteUtils.getByteBuffer(command.getConsumerName())))
207-
.map(it -> new CommandResponse<>(command, Boolean.TRUE.equals(it) ? "OK" : "Error"));
208+
.map(it -> new CommandResponse<>(command, "OK"));
208209
}
209210

210211
if (command.getAction().equals(GroupCommandAction.DESTROY)) {
@@ -243,25 +244,8 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessagesSummary>> xPen
243244
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
244245

245246
Assert.notNull(command.getKey(), "Key must not be null!");
246-
return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).collectList().map(it -> {
247-
248-
// begin
249-
// {* hacking *}
250-
// while (https://github.com/lettuce-io/lettuce-core/issues/1229 != resolved) begin
251-
252-
ArrayList<Object> target = new ArrayList<>(it);
253-
if (target.size() == 2 && target.get(1) instanceof List) {
254-
target.add(1, null);
255-
target.add(1, null);
256-
}
257-
while (target.size() < 4) {
258-
target.add(null);
259-
}
260-
261-
// end.
262-
// end.
263-
264-
return StreamConverters.toPendingMessagesInfo(command.getGroupName(), target);
247+
return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).map(it -> {
248+
return StreamConverters.toPendingMessagesInfo(command.getGroupName(), it);
265249
}).map(value -> new CommandResponse<>(command, value));
266250
}));
267251
}
@@ -282,7 +266,7 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
282266
io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount())
283267
: io.lettuce.core.Limit.unlimited();
284268

285-
Flux<Object> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(),
269+
Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(),
286270
io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit)
287271
: cmd.xpending(command.getKey(), groupName, range, limit);
288272

@@ -353,7 +337,7 @@ private static Flux<ByteBufferRecord> doRead(ReadCommand command, StreamReadOpti
353337
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
354338
}
355339

356-
/*
340+
/*
357341
* (non-Javadoc)
358342
* @see org.springframework.data.redis.connection.ReactiveStreamCommands#xInfo(org.reactivestreams.Publisher)
359343
*/
@@ -370,7 +354,7 @@ public Flux<CommandResponse<XInfoCommand, XInfoStream>> xInfo(Publisher<XInfoCom
370354

371355
}
372356

373-
/*
357+
/*
374358
* (non-Javadoc)
375359
* @see org.springframework.data.redis.connection.ReactiveStreamCommands#xInfoGroups(org.reactivestreams.Publisher)
376360
*/
@@ -386,7 +370,7 @@ public Flux<CommandResponse<XInfoCommand, Flux<XInfoGroup>>> xInfoGroups(Publish
386370
}));
387371
}
388372

389-
/*
373+
/*
390374
* (non-Javadoc)
391375
* @see org.springframework.data.redis.connection.ReactiveStreamCommands#xInfoConsumers(org.reactivestreams.Publisher)
392376
*/

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.Arrays;
2626
import java.util.List;
27+
import java.util.Objects;
2728
import java.util.function.Function;
2829

2930
import org.springframework.dao.DataAccessException;
@@ -268,14 +269,16 @@ public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) {
268269
io.lettuce.core.Consumer<byte[]> lettuceConsumer = toConsumer(consumer);
269270

270271
if (isPipelined()) {
271-
pipeline(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer)));
272+
pipeline(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer),
273+
Objects::nonNull));
272274
return null;
273275
}
274276
if (isQueueing()) {
275-
transaction(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer)));
277+
transaction(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer),
278+
Objects::nonNull));
276279
return null;
277280
}
278-
return getConnection().xgroupDelconsumer(key, lettuceConsumer);
281+
return Objects.nonNull(getConnection().xgroupDelconsumer(key, lettuceConsumer));
279282
} catch (Exception ex) {
280283
throw convertLettuceAccessException(ex);
281284
}

src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.lettuce.core.XReadArgs;
2121
import io.lettuce.core.models.stream.PendingMessage;
2222
import io.lettuce.core.models.stream.PendingMessages;
23-
import io.lettuce.core.models.stream.PendingParser;
2423

2524
import java.nio.ByteBuffer;
2625
import java.time.Duration;
@@ -51,19 +50,17 @@
5150
* @author Christoph Strobl
5251
* @since 2.2
5352
*/
54-
@SuppressWarnings({ "unchecked", "rawtypes" })
53+
@SuppressWarnings({ "rawtypes" })
5554
class StreamConverters {
5655

5756
private static final Converter<List<StreamMessage<byte[], byte[]>>, List<RecordId>> MESSAGEs_TO_IDs = new ListConverter<>(
5857
messageToIdConverter());
5958

60-
private static final BiFunction<List<Object>, String, org.springframework.data.redis.connection.stream.PendingMessages> PENDING_MESSAGES_CONVERTER = (
59+
private static final BiFunction<List<PendingMessage>, String, org.springframework.data.redis.connection.stream.PendingMessages> PENDING_MESSAGES_CONVERTER = (
6160
source, groupName) -> {
6261

63-
List<Object> target = source.stream().map(StreamConverters::preConvertNativeValues).collect(Collectors.toList());
64-
List<PendingMessage> pendingMessages = PendingParser.parseRange(target);
6562

66-
List<org.springframework.data.redis.connection.stream.PendingMessage> messages = pendingMessages.stream()
63+
List<org.springframework.data.redis.connection.stream.PendingMessage> messages = source.stream()
6764
.map(it -> {
6865

6966
RecordId id = RecordId.of(it.getId());
@@ -78,17 +75,15 @@ class StreamConverters {
7875

7976
};
8077

81-
private static final BiFunction<List<Object>, String, PendingMessagesSummary> PENDING_MESSAGES_SUMMARY_CONVERTER = (
78+
private static final BiFunction<PendingMessages, String, PendingMessagesSummary> PENDING_MESSAGES_SUMMARY_CONVERTER = (
8279
source, groupName) -> {
8380

84-
List<Object> target = source.stream().map(StreamConverters::preConvertNativeValues).collect(Collectors.toList());
81+
org.springframework.data.domain.Range<String> range = source.getMessageIds().isUnbounded()
82+
? org.springframework.data.domain.Range.unbounded()
83+
: org.springframework.data.domain.Range.open(source.getMessageIds().getLower().getValue(),
84+
source.getMessageIds().getUpper().getValue());
8585

86-
PendingMessages pendingMessages = PendingParser.parse(target);
87-
org.springframework.data.domain.Range<String> range = org.springframework.data.domain.Range.open(
88-
pendingMessages.getMessageIds().getLower().getValue(), pendingMessages.getMessageIds().getUpper().getValue());
89-
90-
return new PendingMessagesSummary(groupName, pendingMessages.getCount(), range,
91-
pendingMessages.getConsumerMessageCount());
86+
return new PendingMessagesSummary(groupName, source.getCount(), range, source.getConsumerMessageCount());
9287
};
9388

9489
/**
@@ -138,7 +133,7 @@ static Converter<List<StreamMessage<byte[], byte[]>>, List<RecordId>> messagesTo
138133
* @since 2.3
139134
*/
140135
static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName,
141-
org.springframework.data.domain.Range<?> range, List<Object> source) {
136+
org.springframework.data.domain.Range<?> range, List<PendingMessage> source) {
142137
return PENDING_MESSAGES_CONVERTER.apply(source, groupName).withinRange(range);
143138
}
144139

@@ -150,7 +145,7 @@ static org.springframework.data.redis.connection.stream.PendingMessages toPendin
150145
* @return
151146
* @since 2.3
152147
*/
153-
static PendingMessagesSummary toPendingMessagesInfo(String groupName, List<Object> source) {
148+
static PendingMessagesSummary toPendingMessagesInfo(String groupName, PendingMessages source) {
154149
return PENDING_MESSAGES_SUMMARY_CONVERTER.apply(source, groupName);
155150
}
156151

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.lettuce.core.KqueueProvider;
2323
import io.lettuce.core.ReadFrom;
2424
import io.lettuce.core.RedisException;
25+
import io.lettuce.core.RedisFuture;
2526
import io.lettuce.core.api.async.RedisAsyncCommands;
2627
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
2728
import reactor.test.StepVerifier;
2829

2930
import java.io.File;
3031
import java.time.Duration;
32+
import java.util.concurrent.ExecutionException;
3133
import java.util.function.Consumer;
3234

3335
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -230,11 +232,12 @@ public void testDisableSharedConnection() throws Exception {
230232
assertThat(conn2.isClosed()).isTrue();
231233
// Give some time for native connection to asynchronously close
232234
Thread.sleep(100);
235+
RedisFuture<String> future = ((RedisAsyncCommands<byte[], byte[]>) conn2.getNativeConnection()).ping();
233236
try {
234-
((RedisAsyncCommands<byte[], byte[]>) conn2.getNativeConnection()).ping();
237+
future.get();
235238
fail("The native connection should be closed");
236-
} catch (RedisException e) {
237-
// expected
239+
} catch (ExecutionException e) {
240+
// expected, Lettuce async failures are signalled on the Future
238241
}
239242
}
240243

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.junit.Assume.*;
20-
import static org.springframework.data.redis.SpinBarrier.*;
2120

2221
import io.lettuce.core.api.async.RedisAsyncCommands;
2322

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ public class LettuceReactiveStreamCommandsTests extends LettuceReactiveCommandsT
5050

5151
@Before
5252
public void before() {
53-
54-
// TODO: Upgrade to 5.0
55-
assumeTrue(RedisTestProfileValueSource.atLeast("redisVersion", "4.9"));
53+
assumeTrue(RedisTestProfileValueSource.atLeast("redisVersion", "5.0"));
5654
}
5755

5856
@Test // DATAREDIS-864

src/test/java/org/springframework/data/redis/test/util/LettuceRedisClientProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
*/
1616
package org.springframework.data.redis.test.util;
1717

18+
import io.lettuce.core.ClientOptions;
1819
import io.lettuce.core.RedisClient;
1920
import io.lettuce.core.RedisURI;
21+
import io.lettuce.core.protocol.ProtocolVersion;
2022

2123
import org.junit.rules.ExternalResource;
24+
2225
import org.springframework.data.redis.SettingsUtils;
2326
import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources;
2427

@@ -44,6 +47,8 @@ protected void before() {
4447

4548
client = RedisClient.create(LettuceTestClientResources.getSharedClientResources(),
4649
RedisURI.builder().withHost(host).withPort(port).build());
50+
client.setOptions(
51+
ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build());
4752
}
4853

4954
@Override

src/test/java/org/springframework/data/redis/test/util/LettuceRedisClusterClientProvider.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package org.springframework.data.redis.test.util;
1717

18-
import org.junit.rules.ExternalResource;
19-
import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources;
20-
2118
import io.lettuce.core.RedisURI;
19+
import io.lettuce.core.cluster.ClusterClientOptions;
2220
import io.lettuce.core.cluster.RedisClusterClient;
2321
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
22+
import io.lettuce.core.protocol.ProtocolVersion;
23+
24+
import org.junit.rules.ExternalResource;
25+
26+
import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources;
2427

2528
/**
2629
* @author Christoph Strobl
@@ -44,6 +47,8 @@ protected void before() {
4447

4548
client = RedisClusterClient.create(LettuceTestClientResources.getSharedClientResources(),
4649
RedisURI.builder().withHost(host).withPort(port).build());
50+
client.setOptions(ClusterClientOptions.builder().protocolVersion(ProtocolVersion.RESP2)
51+
.pingBeforeActivateConnection(false).build());
4752
}
4853

4954
@Override

0 commit comments

Comments
 (0)