|
15 | 15 | */
|
16 | 16 | package org.springframework.data.redis.connection.lettuce;
|
17 | 17 |
|
18 |
| -import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*; |
19 |
| - |
20 |
| -import io.lettuce.core.AbstractRedisClient; |
21 |
| -import io.lettuce.core.ClientOptions; |
22 |
| -import io.lettuce.core.ReadFrom; |
23 |
| -import io.lettuce.core.RedisClient; |
24 |
| -import io.lettuce.core.RedisConnectionException; |
25 |
| -import io.lettuce.core.RedisCredentialsProvider; |
26 |
| -import io.lettuce.core.RedisURI; |
27 |
| -import io.lettuce.core.api.StatefulConnection; |
28 |
| -import io.lettuce.core.api.StatefulRedisConnection; |
29 |
| -import io.lettuce.core.cluster.ClusterClientOptions; |
30 |
| -import io.lettuce.core.cluster.RedisClusterClient; |
31 |
| -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; |
32 |
| -import io.lettuce.core.codec.RedisCodec; |
33 |
| -import io.lettuce.core.resource.ClientResources; |
| 18 | +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.CODEC; |
| 19 | +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.PipeliningFlushPolicy; |
34 | 20 |
|
35 | 21 | import java.nio.ByteBuffer;
|
36 | 22 | import java.time.Duration;
|
|
43 | 29 | import java.util.function.Consumer;
|
44 | 30 | import java.util.stream.Collectors;
|
45 | 31 |
|
46 |
| -import org.apache.commons.logging.Log; |
47 |
| -import org.apache.commons.logging.LogFactory; |
48 |
| - |
49 | 32 | import org.springframework.beans.factory.DisposableBean;
|
50 | 33 | import org.springframework.beans.factory.InitializingBean;
|
51 | 34 | import org.springframework.dao.DataAccessException;
|
52 | 35 | import org.springframework.dao.InvalidDataAccessApiUsageException;
|
53 | 36 | import org.springframework.data.redis.ExceptionTranslationStrategy;
|
54 | 37 | import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
|
55 | 38 | import org.springframework.data.redis.RedisConnectionFailureException;
|
56 |
| -import org.springframework.data.redis.connection.*; |
| 39 | +import org.springframework.data.redis.connection.ClusterCommandExecutor; |
| 40 | +import org.springframework.data.redis.connection.ClusterTopologyProvider; |
| 41 | +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; |
| 42 | +import org.springframework.data.redis.connection.RedisClusterConfiguration; |
| 43 | +import org.springframework.data.redis.connection.RedisClusterConnection; |
| 44 | +import org.springframework.data.redis.connection.RedisConfiguration; |
57 | 45 | import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
|
58 | 46 | import org.springframework.data.redis.connection.RedisConfiguration.DomainSocketConfiguration;
|
59 | 47 | import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
|
60 | 48 | import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
|
| 49 | +import org.springframework.data.redis.connection.RedisConnection; |
| 50 | +import org.springframework.data.redis.connection.RedisConnectionFactory; |
| 51 | +import org.springframework.data.redis.connection.RedisPassword; |
| 52 | +import org.springframework.data.redis.connection.RedisSentinelConfiguration; |
| 53 | +import org.springframework.data.redis.connection.RedisSentinelConnection; |
| 54 | +import org.springframework.data.redis.connection.RedisSocketConfiguration; |
| 55 | +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; |
| 56 | +import org.springframework.data.redis.connection.RedisStaticMasterReplicaConfiguration; |
61 | 57 | import org.springframework.data.util.Optionals;
|
62 | 58 | import org.springframework.lang.Nullable;
|
63 | 59 | import org.springframework.util.Assert;
|
64 | 60 | import org.springframework.util.ClassUtils;
|
65 | 61 | import org.springframework.util.ObjectUtils;
|
66 | 62 | import org.springframework.util.StringUtils;
|
67 | 63 |
|
| 64 | +import io.lettuce.core.AbstractRedisClient; |
| 65 | +import io.lettuce.core.ClientOptions; |
| 66 | +import io.lettuce.core.ReadFrom; |
| 67 | +import io.lettuce.core.RedisClient; |
| 68 | +import io.lettuce.core.RedisConnectionException; |
| 69 | +import io.lettuce.core.RedisCredentialsProvider; |
| 70 | +import io.lettuce.core.RedisURI; |
| 71 | +import io.lettuce.core.api.StatefulConnection; |
| 72 | +import io.lettuce.core.api.StatefulRedisConnection; |
| 73 | +import io.lettuce.core.cluster.ClusterClientOptions; |
| 74 | +import io.lettuce.core.cluster.RedisClusterClient; |
| 75 | +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; |
| 76 | +import io.lettuce.core.codec.RedisCodec; |
| 77 | +import io.lettuce.core.resource.ClientResources; |
| 78 | + |
| 79 | +import org.apache.commons.logging.Log; |
| 80 | +import org.apache.commons.logging.LogFactory; |
| 81 | + |
68 | 82 | /**
|
69 | 83 | * Connection factory creating <a href="https://github.com/mp911de/lettuce">Lettuce</a>-based connections.
|
70 | 84 | * <p>
|
@@ -1109,124 +1123,147 @@ private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient c
|
1109 | 1123 | */
|
1110 | 1124 | protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
|
1111 | 1125 |
|
1112 |
| - ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null); |
| 1126 | + return isStaticMasterReplicaAware() ? createStaticMasterReplicaConnectionProvider((RedisClient) client, codec) |
| 1127 | + : isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec) |
| 1128 | + : createStandaloneConnectionProvider((RedisClient) client, codec); |
| 1129 | + } |
1113 | 1130 |
|
1114 |
| - if (isStaticMasterReplicaAware()) { |
| 1131 | + @SuppressWarnings("all") |
| 1132 | + private StaticMasterReplicaConnectionProvider createStaticMasterReplicaConnectionProvider(RedisClient client, |
| 1133 | + RedisCodec<?, ?> codec) { |
1115 | 1134 |
|
1116 |
| - List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() // |
1117 |
| - .map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort())) // |
1118 |
| - .peek(it -> it.setDatabase(getDatabase())) // |
1119 |
| - .collect(Collectors.toList()); |
| 1135 | + List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) this.configuration).getNodes().stream() |
| 1136 | + .map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort())) |
| 1137 | + .peek(it -> it.setDatabase(getDatabase())) |
| 1138 | + .collect(Collectors.toList()); |
1120 | 1139 |
|
1121 |
| - return new StaticMasterReplicaConnectionProvider((RedisClient) client, codec, nodes, readFrom); |
1122 |
| - } |
| 1140 | + return new StaticMasterReplicaConnectionProvider(client, codec, nodes, |
| 1141 | + getClientConfiguration().getReadFrom().orElse(null)); |
| 1142 | + } |
1123 | 1143 |
|
1124 |
| - if (isClusterAware()) { |
1125 |
| - return new ClusterConnectionProvider((RedisClusterClient) client, codec, readFrom); |
1126 |
| - } |
| 1144 | + private ClusterConnectionProvider createClusterConnectionProvider(RedisClusterClient client, RedisCodec<?, ?> codec) { |
| 1145 | + return new ClusterConnectionProvider(client, codec, getClientConfiguration().getReadFrom().orElse(null)); |
| 1146 | + } |
1127 | 1147 |
|
1128 |
| - return new StandaloneConnectionProvider((RedisClient) client, codec, readFrom); |
| 1148 | + private StandaloneConnectionProvider createStandaloneConnectionProvider(RedisClient client, RedisCodec<?, ?> codec) { |
| 1149 | + return new StandaloneConnectionProvider(client, codec, getClientConfiguration().getReadFrom().orElse(null)); |
1129 | 1150 | }
|
1130 | 1151 |
|
1131 | 1152 | protected AbstractRedisClient createClient() {
|
1132 | 1153 |
|
1133 |
| - if (isStaticMasterReplicaAware()) { |
| 1154 | + return isStaticMasterReplicaAware() ? createStaticMasterReplicaClient() |
| 1155 | + : isRedisSentinelAware() ? createSentinelClient() |
| 1156 | + : isClusterAware() ? createClusterClient() |
| 1157 | + : createBasicClient(); |
| 1158 | + } |
1134 | 1159 |
|
1135 |
| - RedisClient redisClient = clientConfiguration.getClientResources() // |
1136 |
| - .map(RedisClient::create) // |
1137 |
| - .orElseGet(RedisClient::create); |
| 1160 | + private RedisClient createStaticMasterReplicaClient() { |
1138 | 1161 |
|
1139 |
| - clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
| 1162 | + RedisClient redisClient = this.clientConfiguration.getClientResources() |
| 1163 | + .map(RedisClient::create) |
| 1164 | + .orElseGet(RedisClient::create); |
1140 | 1165 |
|
1141 |
| - return redisClient; |
1142 |
| - } |
| 1166 | + this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
1143 | 1167 |
|
1144 |
| - if (isRedisSentinelAware()) { |
| 1168 | + return redisClient; |
| 1169 | + } |
1145 | 1170 |
|
1146 |
| - RedisURI redisURI = getSentinelRedisURI(); |
1147 |
| - RedisClient redisClient = clientConfiguration.getClientResources() // |
1148 |
| - .map(clientResources -> RedisClient.create(clientResources, redisURI)) // |
1149 |
| - .orElseGet(() -> RedisClient.create(redisURI)); |
| 1171 | + private RedisClient createSentinelClient() { |
1150 | 1172 |
|
1151 |
| - clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
1152 |
| - return redisClient; |
1153 |
| - } |
| 1173 | + RedisURI redisURI = getSentinelRedisURI(); |
1154 | 1174 |
|
1155 |
| - if (isClusterAware()) { |
| 1175 | + RedisClient redisClient = this.clientConfiguration.getClientResources() |
| 1176 | + .map(clientResources -> RedisClient.create(clientResources, redisURI)) |
| 1177 | + .orElseGet(() -> RedisClient.create(redisURI)); |
1156 | 1178 |
|
1157 |
| - List<RedisURI> initialUris = new ArrayList<>(); |
1158 |
| - ClusterConfiguration configuration = (ClusterConfiguration) this.configuration; |
1159 |
| - for (RedisNode node : configuration.getClusterNodes()) { |
1160 |
| - initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort())); |
1161 |
| - } |
| 1179 | + this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
1162 | 1180 |
|
1163 |
| - RedisClusterClient clusterClient = clientConfiguration.getClientResources() // |
1164 |
| - .map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) // |
1165 |
| - .orElseGet(() -> RedisClusterClient.create(initialUris)); |
| 1181 | + return redisClient; |
| 1182 | + } |
1166 | 1183 |
|
1167 |
| - clusterClient.setOptions(getClusterClientOptions(configuration)); |
| 1184 | + @SuppressWarnings("all") |
| 1185 | + private RedisURI getSentinelRedisURI() { |
1168 | 1186 |
|
1169 |
| - return clusterClient; |
1170 |
| - } |
| 1187 | + RedisURI redisUri = LettuceConverters |
| 1188 | + .sentinelConfigurationToRedisURI((RedisSentinelConfiguration) this.configuration); |
1171 | 1189 |
|
1172 |
| - RedisURI uri = isDomainSocketAware() |
1173 |
| - ? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket()) |
1174 |
| - : createRedisURIAndApplySettings(getHostName(), getPort()); |
| 1190 | + applyToAll(redisUri, it -> { |
1175 | 1191 |
|
1176 |
| - RedisClient redisClient = clientConfiguration.getClientResources() // |
1177 |
| - .map(clientResources -> RedisClient.create(clientResources, uri)) // |
1178 |
| - .orElseGet(() -> RedisClient.create(uri)); |
1179 |
| - clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
| 1192 | + this.clientConfiguration.getClientName().ifPresent(it::setClientName); |
1180 | 1193 |
|
1181 |
| - return redisClient; |
| 1194 | + it.setSsl(this.clientConfiguration.isUseSsl()); |
| 1195 | + it.setVerifyPeer(this.clientConfiguration.isVerifyPeer()); |
| 1196 | + it.setStartTls(this.clientConfiguration.isStartTls()); |
| 1197 | + it.setTimeout(this.clientConfiguration.getCommandTimeout()); |
| 1198 | + }); |
| 1199 | + |
| 1200 | + redisUri.setDatabase(getDatabase()); |
| 1201 | + |
| 1202 | + this.clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory -> { |
| 1203 | + |
| 1204 | + redisUri.setCredentialsProvider(factory.createCredentialsProvider(this.configuration)); |
| 1205 | + |
| 1206 | + RedisCredentialsProvider sentinelCredentials = factory |
| 1207 | + .createSentinelCredentialsProvider((RedisSentinelConfiguration) this.configuration); |
| 1208 | + |
| 1209 | + redisUri.getSentinels().forEach(it -> it.setCredentialsProvider(sentinelCredentials)); |
| 1210 | + }); |
| 1211 | + |
| 1212 | + return redisUri; |
1182 | 1213 | }
|
1183 | 1214 |
|
1184 |
| - private ClusterClientOptions getClusterClientOptions(ClusterConfiguration configuration) { |
| 1215 | + @SuppressWarnings("all") |
| 1216 | + private RedisClusterClient createClusterClient() { |
1185 | 1217 |
|
1186 |
| - Optional<ClientOptions> clientOptions = clientConfiguration.getClientOptions(); |
1187 |
| - ClusterClientOptions clusterClientOptions = clientOptions // |
1188 |
| - .filter(ClusterClientOptions.class::isInstance) // |
1189 |
| - .map(ClusterClientOptions.class::cast) // |
1190 |
| - .orElseGet(() -> { |
1191 |
| - return clientOptions // |
1192 |
| - .map(it -> ClusterClientOptions.builder(it).build()) // |
1193 |
| - .orElseGet(ClusterClientOptions::create); |
1194 |
| - }); |
| 1218 | + List<RedisURI> initialUris = new ArrayList<>(); |
1195 | 1219 |
|
1196 |
| - if (configuration.getMaxRedirects() != null) { |
1197 |
| - return clusterClientOptions.mutate().maxRedirects(configuration.getMaxRedirects()).build(); |
1198 |
| - } |
| 1220 | + ClusterConfiguration configuration = (ClusterConfiguration) this.configuration; |
1199 | 1221 |
|
1200 |
| - return clusterClientOptions; |
| 1222 | + configuration.getClusterNodes().stream() |
| 1223 | + .map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort())) |
| 1224 | + .forEach(initialUris::add); |
| 1225 | + |
| 1226 | + RedisClusterClient clusterClient = this.clientConfiguration.getClientResources() |
| 1227 | + .map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) |
| 1228 | + .orElseGet(() -> RedisClusterClient.create(initialUris)); |
| 1229 | + |
| 1230 | + clusterClient.setOptions(getClusterClientOptions(configuration)); |
| 1231 | + |
| 1232 | + return clusterClient; |
1201 | 1233 | }
|
1202 | 1234 |
|
1203 |
| - private RedisURI getSentinelRedisURI() { |
| 1235 | + private ClusterClientOptions getClusterClientOptions(ClusterConfiguration configuration) { |
1204 | 1236 |
|
1205 |
| - RedisURI redisUri = LettuceConverters.sentinelConfigurationToRedisURI( |
1206 |
| - (org.springframework.data.redis.connection.RedisSentinelConfiguration) configuration); |
| 1237 | + Optional<ClientOptions> clientOptions = this.clientConfiguration.getClientOptions(); |
1207 | 1238 |
|
1208 |
| - applyToAll(redisUri, it -> { |
| 1239 | + ClusterClientOptions clusterClientOptions = clientOptions |
| 1240 | + .filter(ClusterClientOptions.class::isInstance) |
| 1241 | + .map(ClusterClientOptions.class::cast) |
| 1242 | + .orElseGet(() -> clientOptions |
| 1243 | + .map(it -> ClusterClientOptions.builder(it).build()) |
| 1244 | + .orElseGet(ClusterClientOptions::create)); |
1209 | 1245 |
|
1210 |
| - clientConfiguration.getClientName().ifPresent(it::setClientName); |
| 1246 | + if (configuration.getMaxRedirects() != null) { |
| 1247 | + return clusterClientOptions.mutate().maxRedirects(configuration.getMaxRedirects()).build(); |
| 1248 | + } |
1211 | 1249 |
|
1212 |
| - it.setSsl(clientConfiguration.isUseSsl()); |
1213 |
| - it.setVerifyPeer(clientConfiguration.isVerifyPeer()); |
1214 |
| - it.setStartTls(clientConfiguration.isStartTls()); |
1215 |
| - it.setTimeout(clientConfiguration.getCommandTimeout()); |
1216 |
| - }); |
| 1250 | + return clusterClientOptions; |
| 1251 | + } |
1217 | 1252 |
|
1218 |
| - redisUri.setDatabase(getDatabase()); |
| 1253 | + @SuppressWarnings("all") |
| 1254 | + private RedisClient createBasicClient() { |
1219 | 1255 |
|
1220 |
| - clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory -> { |
| 1256 | + RedisURI uri = isDomainSocketAware() |
| 1257 | + ? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) this.configuration).getSocket()) |
| 1258 | + : createRedisURIAndApplySettings(getHostName(), getPort()); |
1221 | 1259 |
|
1222 |
| - redisUri.setCredentialsProvider(factory.createCredentialsProvider(configuration)); |
| 1260 | + RedisClient redisClient = this.clientConfiguration.getClientResources() |
| 1261 | + .map(clientResources -> RedisClient.create(clientResources, uri)) |
| 1262 | + .orElseGet(() -> RedisClient.create(uri)); |
1223 | 1263 |
|
1224 |
| - RedisCredentialsProvider sentinelCredentials = factory |
1225 |
| - .createSentinelCredentialsProvider((RedisSentinelConfiguration) configuration); |
1226 |
| - redisUri.getSentinels().forEach(it -> it.setCredentialsProvider(sentinelCredentials)); |
1227 |
| - }); |
| 1264 | + this.clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions); |
1228 | 1265 |
|
1229 |
| - return redisUri; |
| 1266 | + return redisClient; |
1230 | 1267 | }
|
1231 | 1268 |
|
1232 | 1269 | private void assertInitialized() {
|
@@ -1271,16 +1308,16 @@ private RedisURI createRedisSocketURIAndApplySettings(String socketPath) {
|
1271 | 1308 | private void applyAuthentication(RedisURI.Builder builder) {
|
1272 | 1309 |
|
1273 | 1310 | String username = getRedisUsername();
|
| 1311 | + |
1274 | 1312 | if (StringUtils.hasText(username)) {
|
1275 | 1313 | // See https://github.com/lettuce-io/lettuce-core/issues/1404
|
1276 | 1314 | builder.withAuthentication(username, new String(getRedisPassword().toOptional().orElse(new char[0])));
|
1277 | 1315 | } else {
|
1278 | 1316 | getRedisPassword().toOptional().ifPresent(builder::withPassword);
|
1279 | 1317 | }
|
1280 | 1318 |
|
1281 |
| - clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory -> { |
1282 |
| - builder.withAuthentication(factory.createCredentialsProvider(configuration)); |
1283 |
| - }); |
| 1319 | + clientConfiguration.getRedisCredentialsProviderFactory().ifPresent(factory -> |
| 1320 | + builder.withAuthentication(factory.createCredentialsProvider(this.configuration))); |
1284 | 1321 | }
|
1285 | 1322 |
|
1286 | 1323 | @Override
|
|
0 commit comments