Skip to content

Introduce executor configuration to connection factories for Executor to be used with ClusterCommandExecutor #2669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.2.0-GH-2594-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,29 @@
import org.springframework.data.redis.connection.util.ByteArraySet;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

/**
* {@link ClusterCommandExecutor} takes care of running commands across the known cluster nodes. By providing an
* {@link AsyncTaskExecutor} the execution behavior can be influenced.
* {@link AsyncTaskExecutor} the execution behavior can be configured.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 1.7
*/
public class ClusterCommandExecutor implements DisposableBean {

private AsyncTaskExecutor executor;
private final ClusterTopologyProvider topologyProvider;
private int maxRedirects = 5;

private final AsyncTaskExecutor executor;

private final ClusterNodeResourceProvider resourceProvider;

private final ClusterTopologyProvider topologyProvider;

private final ExceptionTranslationStrategy exceptionTranslationStrategy;
private int maxRedirects = 5;

/**
* Create a new instance of {@link ClusterCommandExecutor}.
Expand All @@ -64,54 +67,50 @@ public class ClusterCommandExecutor implements DisposableBean {
public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterNodeResourceProvider resourceProvider,
ExceptionTranslationStrategy exceptionTranslation) {

Assert.notNull(topologyProvider, "ClusterTopologyProvider must not be null");
Assert.notNull(resourceProvider, "ClusterNodeResourceProvider must not be null");
Assert.notNull(exceptionTranslation, "ExceptionTranslationStrategy must not be null");

this.topologyProvider = topologyProvider;
this.resourceProvider = resourceProvider;
this.exceptionTranslationStrategy = exceptionTranslation;
this(topologyProvider, resourceProvider, exceptionTranslation, new SimpleAsyncTaskExecutor());
}

/**
* @param topologyProvider must not be {@literal null}.
* @param resourceProvider must not be {@literal null}.
* @param exceptionTranslation must not be {@literal null}.
* @param executor can be {@literal null}. Defaulted to {@link ThreadPoolTaskExecutor}.
* @param executor the task executor to null, defaults to {@link SimpleAsyncTaskExecutor} if {@literal null}.
*/
public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterNodeResourceProvider resourceProvider,
ExceptionTranslationStrategy exceptionTranslation, @Nullable AsyncTaskExecutor executor) {

this(topologyProvider, resourceProvider, exceptionTranslation);
this.executor = executor;
}
Assert.notNull(topologyProvider, "ClusterTopologyProvider must not be null");
Assert.notNull(resourceProvider, "ClusterNodeResourceProvider must not be null");
Assert.notNull(exceptionTranslation, "ExceptionTranslationStrategy must not be null");

{
if (executor == null) {
this.executor = new SimpleAsyncTaskExecutor();
}
this.topologyProvider = topologyProvider;
this.resourceProvider = resourceProvider;
this.exceptionTranslationStrategy = exceptionTranslation;
this.executor = executor != null ? executor : new SimpleAsyncTaskExecutor();
}

/**
* Run {@link ClusterCommandCallback} on a random node.
*
* @param cmd must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @return never {@literal null}.
*/
public <T> NodeResult<T> executeCommandOnArbitraryNode(ClusterCommandCallback<?, T> cmd) {
public <T> NodeResult<T> executeCommandOnArbitraryNode(ClusterCommandCallback<?, T> commandCallback) {

Assert.notNull(commandCallback, "ClusterCommandCallback must not be null");

Assert.notNull(cmd, "ClusterCommandCallback must not be null");
List<RedisClusterNode> nodes = new ArrayList<>(getClusterTopology().getActiveNodes());
return executeCommandOnSingleNode(cmd, nodes.get(new Random().nextInt(nodes.size())));

return executeCommandOnSingleNode(commandCallback, nodes.get(new Random().nextInt(nodes.size())));
}

/**
* Run {@link ClusterCommandCallback} on given {@link RedisClusterNode}.
*
* @param cmd must not be {@literal null}.
* @param node must not be {@literal null}.
* @return the {@link NodeResult} from the single, targeted {@link RedisClusterNode}.
* @throws IllegalArgumentException in case no resource can be acquired for given node.
* @return
*/
public <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S, T> cmd, RedisClusterNode node) {
return executeCommandOnSingleNode(cmd, node, 0);
Expand All @@ -132,19 +131,20 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
RedisClusterNode nodeToUse = lookupNode(node);

S client = this.resourceProvider.getResourceForSpecificNode(nodeToUse);

Assert.notNull(client, "Could not acquire resource for node; Is your cluster info up to date");

try {
return new NodeResult<>(node, cmd.doInCluster(client));
} catch (RuntimeException ex) {
} catch (RuntimeException cause) {

RuntimeException translatedException = convertToDataAccessException(ex);
if (translatedException instanceof ClusterRedirectException) {
ClusterRedirectException cre = (ClusterRedirectException) translatedException;
return executeCommandOnSingleNode(cmd,
topologyProvider.getTopology().lookup(cre.getTargetHost(), cre.getTargetPort()), redirectCount + 1);
RuntimeException translatedException = convertToDataAccessException(cause);

if (translatedException instanceof ClusterRedirectException clusterRedirectException) {
return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(
clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()), redirectCount + 1);
} else {
throw translatedException != null ? translatedException : ex;
throw translatedException != null ? translatedException : cause;
}
} finally {
this.resourceProvider.returnResourceForSpecificNode(nodeToUse, client);
Expand All @@ -159,10 +159,11 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
*/
private RedisClusterNode lookupNode(RedisClusterNode node) {

try {
return topologyProvider.getTopology().lookup(node);
} catch (ClusterStateFailureException e) {
throw new IllegalArgumentException(String.format("Node %s is unknown to cluster", node), e);
} catch (ClusterStateFailureException cause) {
throw new IllegalArgumentException(String.format("Node %s is unknown to cluster", node), cause);
}
}

Expand All @@ -171,7 +172,8 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
*
* @param cmd must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
*/
public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) {
return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes());
Expand All @@ -181,7 +183,8 @@ public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCa
* @param callback must not be {@literal null}.
* @param nodes must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
*/
public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallback<S, T> callback,
Expand All @@ -202,6 +205,7 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
}

Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();

for (RedisClusterNode node : resolvedRedisClusterNodes) {
futures.put(new NodeExecution(node), executor.submit(() -> executeCommandOnSingleNode(callback, node)));
}
Expand All @@ -213,50 +217,57 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu

boolean done = false;

MultiNodeResult<T> result = new MultiNodeResult<>();
Map<RedisClusterNode, Throwable> exceptions = new HashMap<>();

MultiNodeResult<T> result = new MultiNodeResult<>();
Set<String> saveGuard = new HashSet<>();

while (!done) {

done = true;

for (Map.Entry<NodeExecution, Future<NodeResult<T>>> entry : futures.entrySet()) {

if (!entry.getValue().isDone() && !entry.getValue().isCancelled()) {
done = false;
} else {

NodeExecution execution = entry.getKey();

try {

String futureId = ObjectUtils.getIdentityHexString(entry.getValue());

if (!saveGuard.contains(futureId)) {

if (execution.isPositional()) {
result.add(execution.getPositionalKey(), entry.getValue().get());
} else {
result.add(entry.getValue().get());
}

saveGuard.add(futureId);
}
} catch (ExecutionException e) {
} catch (ExecutionException cause) {

RuntimeException exception = convertToDataAccessException((Exception) cause.getCause());

RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
} catch (InterruptedException e) {
exceptions.put(execution.getNode(), exception != null ? exception : cause.getCause());
} catch (InterruptedException cause) {

Thread.currentThread().interrupt();

RuntimeException ex = convertToDataAccessException((Exception) e.getCause());
exceptions.put(execution.getNode(), ex != null ? ex : e.getCause());
RuntimeException exception = convertToDataAccessException((Exception) cause.getCause());

exceptions.put(execution.getNode(), exception != null ? exception : cause.getCause());

break;
}
}
}

try {
Thread.sleep(10);
} catch (InterruptedException e) {

done = true;
Thread.currentThread().interrupt();
}
Expand All @@ -265,54 +276,58 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
if (!exceptions.isEmpty()) {
throw new ClusterCommandExecutionFailureException(new ArrayList<>(exceptions.values()));
}

return result;
}

/**
* Run {@link MultiKeyClusterCommandCallback} with on a curated set of nodes serving one or more keys.
*
* @param cmd must not be {@literal null}.
* @param commandCallback must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link MultiKeyClusterCommandCallback command}.
*/
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> cmd,
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> commandCallback,
Iterable<byte[]> keys) {

Map<RedisClusterNode, PositionalKeys> nodeKeyMap = new HashMap<>();

int index = 0;

for (byte[] key : keys) {
for (RedisClusterNode node : getClusterTopology().getKeyServingNodes(key)) {
nodeKeyMap.computeIfAbsent(node, val -> PositionalKeys.empty()).append(PositionalKey.of(key, index++));
}
}

Map<NodeExecution, Future<NodeResult<T>>> futures = new LinkedHashMap<>();

for (Entry<RedisClusterNode, PositionalKeys> entry : nodeKeyMap.entrySet()) {

if (entry.getKey().isMaster()) {
for (PositionalKey key : entry.getValue()) {
futures.put(new NodeExecution(entry.getKey(), key),
executor.submit(() -> executeMultiKeyCommandOnSingleNode(cmd, entry.getKey(), key.getBytes())));
futures.put(new NodeExecution(entry.getKey(), key), this.executor
.submit(() -> executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
}
}
}

return collectResults(futures);
}

private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterCommandCallback<S, T> cmd,
private <S, T> NodeResult<T> executeMultiKeyCommandOnSingleNode(MultiKeyClusterCommandCallback<S, T> commandCallback,
RedisClusterNode node, byte[] key) {

Assert.notNull(cmd, "MultiKeyCommandCallback must not be null");
Assert.notNull(commandCallback, "MultiKeyCommandCallback must not be null");
Assert.notNull(node, "RedisClusterNode must not be null");
Assert.notNull(key, "Keys for execution must not be null");

S client = this.resourceProvider.getResourceForSpecificNode(node);

Assert.notNull(client, "Could not acquire resource for node; Is your cluster info up to date");

try {
return new NodeResult<>(node, cmd.doInCluster(client, key), key);
return new NodeResult<>(node, commandCallback.doInCluster(client, key), key);
} catch (RuntimeException ex) {

RuntimeException translatedException = convertToDataAccessException(ex);
Expand All @@ -327,8 +342,8 @@ private ClusterTopology getClusterTopology() {
}

@Nullable
private DataAccessException convertToDataAccessException(Exception e) {
return exceptionTranslationStrategy.translate(e);
private DataAccessException convertToDataAccessException(Exception cause) {
return exceptionTranslationStrategy.translate(cause);
}

/**
Expand All @@ -343,12 +358,8 @@ public void setMaxRedirects(int maxRedirects) {
@Override
public void destroy() throws Exception {

if (executor instanceof DisposableBean) {
((DisposableBean) executor).destroy();
}

if (resourceProvider instanceof DisposableBean) {
((DisposableBean) resourceProvider).destroy();
if (this.resourceProvider instanceof DisposableBean disposableBean) {
disposableBean.destroy();
}
}

Expand Down Expand Up @@ -479,7 +490,9 @@ public RedisClusterNode getNode() {
}

/**
* @return
* Returns the key as an array of bytes.
*
* @return the key as an array of bytes.
*/
public byte[] getKey() {
return key.getArray();
Expand Down
Loading