From 35930184c15d1d03e102009a158bca8d4e5ef1d0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 7 Aug 2020 11:04:15 +0200 Subject: [PATCH 1/2] DATAREDIS-1197 - Prepare issue branch. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 14e0ceeaba..faa4c0a57d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.4.0-SNAPSHOT + 2.4.0-DATAREDIS-1197-SNAPSHOT Spring Data Redis From fadf8f4e9f37422162c2bc41b9bbd7a983a0d512 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 7 Aug 2020 11:19:19 +0200 Subject: [PATCH 2/2] DATAREDIS-1197 - Upgrade to Lettuce 6.0 RC1. Adopt to API changes. --- pom.xml | 2 +- .../LettuceReactiveScriptingCommands.java | 3 +- .../LettuceReactiveStreamCommands.java | 34 +++++-------------- .../lettuce/LettuceStreamCommands.java | 9 +++-- .../connection/lettuce/StreamConverters.java | 27 ++++++--------- .../LettuceConnectionFactoryTests.java | 9 +++-- .../LettuceConnectionIntegrationTests.java | 1 - .../LettuceReactiveStreamCommandsTests.java | 4 +-- .../test/util/LettuceRedisClientProvider.java | 5 +++ .../LettuceRedisClusterClientProvider.java | 11 ++++-- 10 files changed, 49 insertions(+), 56 deletions(-) diff --git a/pom.xml b/pom.xml index faa4c0a57d..62d0c7d911 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 1.9.2 1.4.12 2.7.0 - 5.3.2.RELEASE + 6.0.0.RC1 3.3.0 1.01 4.1.51.Final diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveScriptingCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveScriptingCommands.java index fca79aec86..ef3d0e6807 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveScriptingCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveScriptingCommands.java @@ -26,6 +26,7 @@ import org.springframework.data.redis.connection.ReactiveScriptingCommands; import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.util.ByteUtils; import org.springframework.util.Assert; /** @@ -80,7 +81,7 @@ public Mono scriptLoad(ByteBuffer script) { Assert.notNull(script, "Script must not be null!"); - return connection.execute(cmd -> cmd.scriptLoad(script)).next(); + return connection.execute(cmd -> cmd.scriptLoad(ByteUtils.getBytes(script))).next(); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java index 2cb940c0a3..cf1baeb95a 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java @@ -21,15 +21,16 @@ import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; +import io.lettuce.core.models.stream.PendingMessage; import reactor.core.publisher.Flux; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand; import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse; @@ -204,7 +205,7 @@ public Flux> xGroup(Publisher new CommandResponse<>(command, Boolean.TRUE.equals(it) ? "OK" : "Error")); + .map(it -> new CommandResponse<>(command, "OK")); } if (command.getAction().equals(GroupCommandAction.DESTROY)) { @@ -243,25 +244,8 @@ public Flux> xPen return connection.execute(cmd -> Flux.from(commands).concatMap(command -> { Assert.notNull(command.getKey(), "Key must not be null!"); - return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).collectList().map(it -> { - - // begin - // {* hacking *} - // while (https://github.com/lettuce-io/lettuce-core/issues/1229 != resolved) begin - - ArrayList target = new ArrayList<>(it); - if (target.size() == 2 && target.get(1) instanceof List) { - target.add(1, null); - target.add(1, null); - } - while (target.size() < 4) { - target.add(null); - } - - // end. - // end. - - return StreamConverters.toPendingMessagesInfo(command.getGroupName(), target); + return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).map(it -> { + return StreamConverters.toPendingMessagesInfo(command.getGroupName(), it); }).map(value -> new CommandResponse<>(command, value)); })); } @@ -282,7 +266,7 @@ public Flux> xPending( io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount()) : io.lettuce.core.Limit.unlimited(); - Flux publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), + Flux publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) : cmd.xpending(command.getKey(), groupName, range, limit); @@ -353,7 +337,7 @@ private static Flux doRead(ReadCommand command, StreamReadOpti .map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveStreamCommands#xInfo(org.reactivestreams.Publisher) */ @@ -370,7 +354,7 @@ public Flux> xInfo(Publisher>> xInfoGroups(Publish })); } - /* + /* * (non-Javadoc) * @see org.springframework.data.redis.connection.ReactiveStreamCommands#xInfoConsumers(org.reactivestreams.Publisher) */ diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 3290712ffc..2a3f0036be 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.function.Function; import org.springframework.dao.DataAccessException; @@ -267,14 +268,16 @@ public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) { io.lettuce.core.Consumer lettuceConsumer = toConsumer(consumer); if (isPipelined()) { - pipeline(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer))); + pipeline(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer), + Objects::nonNull)); return null; } if (isQueueing()) { - transaction(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer))); + transaction(connection.newLettuceResult(getAsyncConnection().xgroupDelconsumer(key, lettuceConsumer), + Objects::nonNull)); return null; } - return getConnection().xgroupDelconsumer(key, lettuceConsumer); + return Objects.nonNull(getConnection().xgroupDelconsumer(key, lettuceConsumer)); } catch (Exception ex) { throw convertLettuceAccessException(ex); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java index 75e126905d..733f107951 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java @@ -20,7 +20,6 @@ import io.lettuce.core.XReadArgs; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; -import io.lettuce.core.models.stream.PendingParser; import java.nio.ByteBuffer; import java.time.Duration; @@ -51,19 +50,17 @@ * @author Christoph Strobl * @since 2.2 */ -@SuppressWarnings({ "unchecked", "rawtypes" }) +@SuppressWarnings({ "rawtypes" }) class StreamConverters { private static final Converter>, List> MESSAGEs_TO_IDs = new ListConverter<>( messageToIdConverter()); - private static final BiFunction, String, org.springframework.data.redis.connection.stream.PendingMessages> PENDING_MESSAGES_CONVERTER = ( + private static final BiFunction, String, org.springframework.data.redis.connection.stream.PendingMessages> PENDING_MESSAGES_CONVERTER = ( source, groupName) -> { - List target = source.stream().map(StreamConverters::preConvertNativeValues).collect(Collectors.toList()); - List pendingMessages = PendingParser.parseRange(target); - List messages = pendingMessages.stream() + List messages = source.stream() .map(it -> { RecordId id = RecordId.of(it.getId()); @@ -78,17 +75,15 @@ class StreamConverters { }; - private static final BiFunction, String, PendingMessagesSummary> PENDING_MESSAGES_SUMMARY_CONVERTER = ( + private static final BiFunction PENDING_MESSAGES_SUMMARY_CONVERTER = ( source, groupName) -> { - List target = source.stream().map(StreamConverters::preConvertNativeValues).collect(Collectors.toList()); + org.springframework.data.domain.Range range = source.getMessageIds().isUnbounded() + ? org.springframework.data.domain.Range.unbounded() + : org.springframework.data.domain.Range.open(source.getMessageIds().getLower().getValue(), + source.getMessageIds().getUpper().getValue()); - PendingMessages pendingMessages = PendingParser.parse(target); - org.springframework.data.domain.Range range = org.springframework.data.domain.Range.open( - pendingMessages.getMessageIds().getLower().getValue(), pendingMessages.getMessageIds().getUpper().getValue()); - - return new PendingMessagesSummary(groupName, pendingMessages.getCount(), range, - pendingMessages.getConsumerMessageCount()); + return new PendingMessagesSummary(groupName, source.getCount(), range, source.getConsumerMessageCount()); }; /** @@ -138,7 +133,7 @@ static Converter>, List> messagesTo * @since 2.3 */ static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName, - org.springframework.data.domain.Range range, List source) { + org.springframework.data.domain.Range range, List source) { return PENDING_MESSAGES_CONVERTER.apply(source, groupName).withinRange(range); } @@ -150,7 +145,7 @@ static org.springframework.data.redis.connection.stream.PendingMessages toPendin * @return * @since 2.3 */ - static PendingMessagesSummary toPendingMessagesInfo(String groupName, List source) { + static PendingMessagesSummary toPendingMessagesInfo(String groupName, PendingMessages source) { return PENDING_MESSAGES_SUMMARY_CONVERTER.apply(source, groupName); } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java index 0963998080..0c99744779 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java @@ -22,12 +22,14 @@ import io.lettuce.core.KqueueProvider; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisException; +import io.lettuce.core.RedisFuture; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.api.reactive.BaseRedisReactiveCommands; import reactor.test.StepVerifier; import java.io.File; import java.time.Duration; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -230,11 +232,12 @@ public void testDisableSharedConnection() throws Exception { assertThat(conn2.isClosed()).isTrue(); // Give some time for native connection to asynchronously close Thread.sleep(100); + RedisFuture future = ((RedisAsyncCommands) conn2.getNativeConnection()).ping(); try { - ((RedisAsyncCommands) conn2.getNativeConnection()).ping(); + future.get(); fail("The native connection should be closed"); - } catch (RedisException e) { - // expected + } catch (ExecutionException e) { + // expected, Lettuce async failures are signalled on the Future } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java index 2c842dd6ce..78f9822bd6 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java @@ -17,7 +17,6 @@ import static org.assertj.core.api.Assertions.*; import static org.junit.Assume.*; -import static org.springframework.data.redis.SpinBarrier.*; import io.lettuce.core.api.async.RedisAsyncCommands; diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java index 57b6ec78cb..61773cb71d 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java @@ -50,9 +50,7 @@ public class LettuceReactiveStreamCommandsTests extends LettuceReactiveCommandsT @Before public void before() { - - // TODO: Upgrade to 5.0 - assumeTrue(RedisTestProfileValueSource.atLeast("redisVersion", "4.9")); + assumeTrue(RedisTestProfileValueSource.atLeast("redisVersion", "5.0")); } @Test // DATAREDIS-864 diff --git a/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClientProvider.java b/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClientProvider.java index b68287821f..eb4ec8baba 100644 --- a/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClientProvider.java +++ b/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClientProvider.java @@ -15,10 +15,13 @@ */ package org.springframework.data.redis.test.util; +import io.lettuce.core.ClientOptions; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; +import io.lettuce.core.protocol.ProtocolVersion; import org.junit.rules.ExternalResource; + import org.springframework.data.redis.SettingsUtils; import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources; @@ -44,6 +47,8 @@ protected void before() { client = RedisClient.create(LettuceTestClientResources.getSharedClientResources(), RedisURI.builder().withHost(host).withPort(port).build()); + client.setOptions( + ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).pingBeforeActivateConnection(false).build()); } @Override diff --git a/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClusterClientProvider.java b/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClusterClientProvider.java index 59c64a030a..7f5eabbd0a 100644 --- a/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClusterClientProvider.java +++ b/src/test/java/org/springframework/data/redis/test/util/LettuceRedisClusterClientProvider.java @@ -15,12 +15,15 @@ */ package org.springframework.data.redis.test.util; -import org.junit.rules.ExternalResource; -import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources; - import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.protocol.ProtocolVersion; + +import org.junit.rules.ExternalResource; + +import org.springframework.data.redis.connection.lettuce.LettuceTestClientResources; /** * @author Christoph Strobl @@ -44,6 +47,8 @@ protected void before() { client = RedisClusterClient.create(LettuceTestClientResources.getSharedClientResources(), RedisURI.builder().withHost(host).withPort(port).build()); + client.setOptions(ClusterClientOptions.builder().protocolVersion(ProtocolVersion.RESP2) + .pingBeforeActivateConnection(false).build()); } @Override