Skip to content

Commit 8d6ebb4

Browse files
jxblummp911de
authored andcommitted
Add configuration for TaskExecutor used by ClusterCommandsExecutor.
This change allows users to leverage the VirtualThread facilities and AsyncTaskExecutor implementations provided in and by the core Spring Framework as part of our Loom support theme. Closes #2594 Original pull request: #2669
1 parent 2eaf174 commit 8d6ebb4

File tree

7 files changed

+319
-143
lines changed

7 files changed

+319
-143
lines changed

src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -233,36 +233,42 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
233233
while (!done) {
234234

235235
done = true;
236+
236237
for (Map.Entry<NodeExecution, Future<NodeResult<T>>> entry : futures.entrySet()) {
237238

238239
if (!entry.getValue().isDone() && !entry.getValue().isCancelled()) {
239240
done = false;
240241
} else {
241242

242243
NodeExecution execution = entry.getKey();
244+
243245
try {
244246

245247
String futureId = ObjectUtils.getIdentityHexString(entry.getValue());
248+
246249
if (!saveGuard.contains(futureId)) {
247250

248251
if (execution.isPositional()) {
249252
result.add(execution.getPositionalKey(), entry.getValue().get());
250253
} else {
251254
result.add(entry.getValue().get());
252255
}
256+
253257
saveGuard.add(futureId);
254258
}
255-
} catch (ExecutionException e) {
259+
} catch (ExecutionException cause) {
256260

257-
RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
261+
RuntimeException exception = convertToDataAccessException((Exception) cause.getCause());
258262

259-
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
260-
} catch (InterruptedException e) {
263+
exceptions.put(execution.getNode(), exception != null ? exception : cause.getCause());
264+
} catch (InterruptedException cause) {
261265

262266
Thread.currentThread().interrupt();
263267

264-
RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
265-
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
268+
RuntimeException exception = convertToDataAccessException((Exception) cause.getCause());
269+
270+
exceptions.put(execution.getNode(), exception != null ? exception : cause.getCause());
271+
266272
break;
267273
}
268274
}
@@ -271,7 +277,6 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
271277
try {
272278
Thread.sleep(10);
273279
} catch (InterruptedException e) {
274-
275280
done = true;
276281
Thread.currentThread().interrupt();
277282
}
@@ -280,18 +285,19 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
280285
if (!exceptions.isEmpty()) {
281286
throw new ClusterCommandExecutionFailureException(new ArrayList<>(exceptions.values()));
282287
}
288+
283289
return result;
284290
}
285291

286292
/**
287293
* Run {@link MultiKeyClusterCommandCallback} with on a curated set of nodes serving one or more keys.
288294
*
289-
* @param cmd must not be {@literal null}.
295+
* @param commandCallback must not be {@literal null}.
290296
* @return never {@literal null}.
291297
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
292298
* {@link MultiKeyClusterCommandCallback command}.
293299
*/
294-
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> cmd,
300+
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> commandCallback,
295301
Iterable<byte[]> keys) {
296302

297303
Map<RedisClusterNode, PositionalKeys> nodeKeyMap = new HashMap<>();
@@ -309,19 +315,19 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa
309315

310316
if (entry.getKey().isMaster()) {
311317
for (PositionalKey key : entry.getValue()) {
312-
futures.put(new NodeExecution(entry.getKey(), key),
313-
executor.submit(() -> executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key.getBytes())));
318+
futures.put(new NodeExecution(entry.getKey(), key), this.executor.submit(() ->
319+
executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
314320
}
315321
}
316322
}
317323

318324
return collectResults(futures);
319325
}
320326

321-
private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterCommandCallback<S, T> cmd,
327+
private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterCommandCallback<S, T> commandCallback,
322328
RedisClusterNode node, byte[] key) {
323329

324-
Assert.notNull(cmd, "MultiKeyCommandCallback must not be null");
330+
Assert.notNull(commandCallback, "MultiKeyCommandCallback must not be null");
325331
Assert.notNull(node, "RedisClusterNode must not be null");
326332
Assert.notNull(key, "Keys for execution must not be null");
327333

@@ -330,7 +336,7 @@ private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterC
330336
Assert.notNull(client, "Could not acquire resource for node; Is your cluster info up to date");
331337

332338
try {
333-
return new NodeResult<>(node, cmd.doInCluster(client, key), key);
339+
return new NodeResult<>(node, commandCallback.doInCluster(client, key), key);
334340
} catch (RuntimeException ex) {
335341

336342
RuntimeException translatedException = convertToDataAccessException(ex);
@@ -345,8 +351,8 @@ private ClusterTopology getClusterTopology() {
345351
}
346352

347353
@Nullable
348-
private DataAccessException convertToDataAccessException(Exception e) {
349-
return exceptionTranslationStrategy.translate(e);
354+
private DataAccessException convertToDataAccessException(Exception cause) {
355+
return exceptionTranslationStrategy.translate(cause);
350356
}
351357

352358
/**
@@ -361,12 +367,12 @@ public void setMaxRedirects(int maxRedirects) {
361367
@Override
362368
public void destroy() throws Exception {
363369

364-
if (executor instanceof DisposableBean) {
365-
((DisposableBean) executor).destroy();
370+
if (this.executor instanceof DisposableBean disposableBean) {
371+
disposableBean.destroy();
366372
}
367373

368-
if (resourceProvider instanceof DisposableBean) {
369-
((DisposableBean) resourceProvider).destroy();
374+
if (this.resourceProvider instanceof DisposableBean disposableBean) {
375+
disposableBean.destroy();
370376
}
371377
}
372378

src/main/java/org/springframework/data/redis/connection/RedisClusterConfiguration.java

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.springframework.core.env.MapPropertySource;
2828
import org.springframework.core.env.PropertySource;
29+
import org.springframework.core.task.AsyncTaskExecutor;
2930
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
3031
import org.springframework.data.redis.util.RedisAssertions;
3132
import org.springframework.lang.Nullable;
@@ -51,6 +52,8 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon
5152

5253
private @Nullable Integer maxRedirects;
5354

55+
private @Nullable AsyncTaskExecutor executor;
56+
5457
private RedisPassword password = RedisPassword.none();
5558

5659
private final Set<RedisNode> clusterNodes;
@@ -109,6 +112,13 @@ public RedisClusterConfiguration(PropertySource<?> propertySource) {
109112
}
110113
}
111114

115+
private void appendClusterNodes(Set<String> hostAndPorts) {
116+
117+
for (String hostAndPort : hostAndPorts) {
118+
addClusterNode(RedisNode.fromString(hostAndPort));
119+
}
120+
}
121+
112122
/**
113123
* Set {@literal cluster nodes} to connect to.
114124
*
@@ -139,6 +149,15 @@ public void addClusterNode(RedisNode node) {
139149
this.clusterNodes.add(RedisAssertions.requireNonNull(node, "ClusterNode must not be null"));
140150
}
141151

152+
/**
153+
* @param host Redis cluster node host name or ip address.
154+
* @param port Redis cluster node port.
155+
* @return this.
156+
*/
157+
public RedisClusterConfiguration clusterNode(String host, Integer port) {
158+
return clusterNode(new RedisNode(host, port));
159+
}
160+
142161
/**
143162
* @return this.
144163
*/
@@ -149,11 +168,6 @@ public RedisClusterConfiguration clusterNode(RedisNode node) {
149168
return this;
150169
}
151170

152-
@Override
153-
public Integer getMaxRedirects() {
154-
return maxRedirects != null && maxRedirects > Integer.MIN_VALUE ? maxRedirects : null;
155-
}
156-
157171
/**
158172
* @param maxRedirects the max number of redirects to follow.
159173
*/
@@ -164,20 +178,9 @@ public void setMaxRedirects(int maxRedirects) {
164178
this.maxRedirects = maxRedirects;
165179
}
166180

167-
/**
168-
* @param host Redis cluster node host name or ip address.
169-
* @param port Redis cluster node port.
170-
* @return this.
171-
*/
172-
public RedisClusterConfiguration clusterNode(String host, Integer port) {
173-
return clusterNode(new RedisNode(host, port));
174-
}
175-
176-
private void appendClusterNodes(Set<String> hostAndPorts) {
177-
178-
for (String hostAndPort : hostAndPorts) {
179-
addClusterNode(RedisNode.fromString(hostAndPort));
180-
}
181+
@Override
182+
public Integer getMaxRedirects() {
183+
return maxRedirects != null && maxRedirects > Integer.MIN_VALUE ? maxRedirects : null;
181184
}
182185

183186
@Override
@@ -191,14 +194,24 @@ public String getUsername() {
191194
return this.username;
192195
}
193196

197+
@Override
198+
public void setPassword(RedisPassword password) {
199+
this.password = RedisAssertions.requireNonNull(password, "RedisPassword must not be null");
200+
}
201+
194202
@Override
195203
public RedisPassword getPassword() {
196204
return password;
197205
}
198206

199207
@Override
200-
public void setPassword(RedisPassword password) {
201-
this.password = RedisAssertions.requireNonNull(password, "RedisPassword must not be null");
208+
public void setAsyncTaskExecutor(@Nullable AsyncTaskExecutor executor) {
209+
this.executor = executor;
210+
}
211+
212+
@Nullable @Override
213+
public AsyncTaskExecutor getAsyncTaskExecutor() {
214+
return this.executor;
202215
}
203216

204217
@Override

src/main/java/org/springframework/data/redis/connection/RedisConfiguration.java

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.function.IntSupplier;
2222
import java.util.function.Supplier;
2323

24+
import org.springframework.core.task.AsyncTaskExecutor;
2425
import org.springframework.lang.Nullable;
2526
import org.springframework.util.Assert;
2627

@@ -205,6 +206,14 @@ interface WithAuthentication {
205206
*/
206207
void setUsername(@Nullable String username);
207208

209+
/**
210+
* Get the username to use when connecting.
211+
*
212+
* @return {@literal null} if none set.
213+
*/
214+
@Nullable
215+
String getUsername();
216+
208217
/**
209218
* Create and set a {@link RedisPassword} for given {@link String}.
210219
*
@@ -230,14 +239,6 @@ default void setPassword(@Nullable char[] password) {
230239
*/
231240
void setPassword(RedisPassword password);
232241

233-
/**
234-
* Get the username to use when connecting.
235-
*
236-
* @return {@literal null} if none set.
237-
*/
238-
@Nullable
239-
String getUsername();
240-
241242
/**
242243
* Get the RedisPassword to use when connecting.
243244
*
@@ -337,6 +338,53 @@ interface WithDomainSocket {
337338
String getSocket();
338339
}
339340

341+
/**
342+
* Configuration interface suitable for Redis cluster environments.
343+
*
344+
* @author Christoph Strobl
345+
* @since 2.1
346+
*/
347+
interface ClusterConfiguration extends WithPassword {
348+
349+
/**
350+
* Configures the {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
351+
*
352+
* @param executor {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
353+
*/
354+
void setAsyncTaskExecutor(AsyncTaskExecutor executor);
355+
356+
/**
357+
* Returns the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
358+
*
359+
* @return the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
360+
*/
361+
AsyncTaskExecutor getAsyncTaskExecutor();
362+
363+
/**
364+
* Returns an {@link Collections#unmodifiableSet(Set) Set} of {@link RedisNode cluster nodes}.
365+
*
366+
* @return {@link Set} of {@link RedisNode cluster nodes}. Never {@literal null}.
367+
*/
368+
Set<RedisNode> getClusterNodes();
369+
370+
/**
371+
* @return max number of redirects to follow or {@literal null} if not set.
372+
*/
373+
@Nullable
374+
Integer getMaxRedirects();
375+
376+
}
377+
378+
/**
379+
* Configuration interface suitable for single node redis connections using local unix domain socket.
380+
*
381+
* @author Christoph Strobl
382+
* @since 2.1
383+
*/
384+
interface DomainSocketConfiguration extends WithDomainSocket, WithDatabaseIndex, WithPassword {
385+
386+
}
387+
340388
/**
341389
* Configuration interface suitable for Redis Sentinel environments.
342390
*
@@ -459,28 +507,6 @@ default void setSentinelPassword(@Nullable char[] password) {
459507

460508
}
461509

462-
/**
463-
* Configuration interface suitable for Redis cluster environments.
464-
*
465-
* @author Christoph Strobl
466-
* @since 2.1
467-
*/
468-
interface ClusterConfiguration extends WithPassword {
469-
470-
/**
471-
* Returns an {@link Collections#unmodifiableSet(Set)} of {@literal cluster nodes}.
472-
*
473-
* @return {@link Set} of nodes. Never {@literal null}.
474-
*/
475-
Set<RedisNode> getClusterNodes();
476-
477-
/**
478-
* @return max number of redirects to follow or {@literal null} if not set.
479-
*/
480-
@Nullable
481-
Integer getMaxRedirects();
482-
}
483-
484510
/**
485511
* Configuration interface suitable for Redis master/replica environments with fixed hosts.
486512
*
@@ -495,14 +521,4 @@ interface StaticMasterReplicaConfiguration extends WithDatabaseIndex, WithPasswo
495521
*/
496522
List<RedisStandaloneConfiguration> getNodes();
497523
}
498-
499-
/**
500-
* Configuration interface suitable for single node redis connections using local unix domain socket.
501-
*
502-
* @author Christoph Strobl
503-
* @since 2.1
504-
*/
505-
interface DomainSocketConfiguration extends WithDomainSocket, WithDatabaseIndex, WithPassword {
506-
507-
}
508524
}

0 commit comments

Comments
 (0)