diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index 1e5323cdc9d..777dbdf3012 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -18,25 +18,24 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.SmartLifecycle; +import org.springframework.core.log.LogAccessor; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.integration.leader.Candidate; import org.springframework.integration.leader.Context; import org.springframework.integration.leader.DefaultCandidate; import org.springframework.integration.leader.event.DefaultLeaderEventPublisher; import org.springframework.integration.leader.event.LeaderEventPublisher; import org.springframework.integration.support.locks.LockRegistry; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; /** @@ -67,9 +66,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe public static final long DEFAULT_BUSY_WAIT_TIME = 50L; - private static final Log LOGGER = LogFactory.getLog(LockRegistryLeaderInitiator.class); - - private final Object lifecycleMonitor = new Object(); + private static final LogAccessor LOGGER = new LogAccessor(LockRegistryLeaderInitiator.class); /** * A lock registry. The locks it manages should be global (whatever that means for the @@ -103,14 +100,7 @@ public String getRole() { /** * Executor service for running leadership daemon. */ - private ExecutorService executorService = - Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-")); - - /** - * Flag to denote whether the {@link ExecutorService} was provided via the setter and - * thus should not be shutdown when {@link #destroy()} is called. - */ - private boolean executorServiceExplicitlySet; + private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("lock-leadership-"); /** * Time in milliseconds to wait in between attempts to re-acquire the lock, once it is @@ -161,7 +151,7 @@ public String getRole() { /** * Future returned by submitting an {@link LeaderSelector} to - * {@link #executorService}. This is used to cancel leadership. + * {@link #taskExecutor}. This is used to cancel leadership. */ private volatile Future future; @@ -192,10 +182,21 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) { * single thread Executor will be used. * @param executorService the executor service * @since 5.0.2 + * @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)} */ + @Deprecated(since = "6.2", forRemoval = true) public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - this.executorServiceExplicitlySet = true; + setTaskExecutor(new TaskExecutorAdapter(executorService)); + } + + /** + * Set a {@link AsyncTaskExecutor} for running leadership daemon. + * @param taskExecutor the {@link AsyncTaskExecutor} to use. + * @since 6.2 + */ + public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { + Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null."); + this.taskExecutor = taskExecutor; } public void setHeartBeatMillis(long heartBeatMillis) { @@ -224,9 +225,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv */ @Override public boolean isRunning() { - synchronized (this.lifecycleMonitor) { - return this.running; - } + return this.running; } @Override @@ -287,26 +286,21 @@ public void setPublishFailedEvents(boolean publishFailedEvents) { * Start the registration of the {@link #candidate} for leader election. */ @Override - public void start() { + public synchronized void start() { if (this.leaderEventPublisher == null && this.applicationEventPublisher != null) { this.leaderEventPublisher = new DefaultLeaderEventPublisher(this.applicationEventPublisher); } - synchronized (this.lifecycleMonitor) { - if (!this.running) { - this.leaderSelector = new LeaderSelector(buildLeaderPath()); - this.running = true; - this.future = this.executorService.submit(this.leaderSelector); - LOGGER.debug("Started LeaderInitiator"); - } + if (!this.running) { + this.leaderSelector = new LeaderSelector(buildLeaderPath()); + this.running = true; + this.future = this.taskExecutor.submit(this.leaderSelector); + LOGGER.debug("Started LeaderInitiator"); } } @Override public void destroy() { stop(); - if (!this.executorServiceExplicitlySet) { - this.executorService.shutdown(); - } } /** @@ -314,16 +308,14 @@ public void destroy() { * candidate is currently leader, its leadership will be revoked. */ @Override - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - this.running = false; - if (this.future != null) { - this.future.cancel(true); - } - this.future = null; - LOGGER.debug("Stopped LeaderInitiator for " + getContext()); + public synchronized void stop() { + if (this.running) { + this.running = false; + if (this.future != null) { + this.future.cancel(true); } + this.future = null; + LOGGER.debug(() -> "Stopped LeaderInitiator for " + getContext()); } } @@ -382,9 +374,9 @@ public Void call() { try { this.lock.unlock(); } - catch (Exception e) { - LOGGER.debug("Could not unlock during stop for " + this.context - + " - treat as broken. Revoking...", e); + catch (Exception ex) { + LOGGER.debug(ex, () -> + "Could not unlock during stop for " + this.context + " - treat as broken. Revoking..."); } // We are stopping, therefore not leading anymore handleRevoked(); @@ -394,9 +386,7 @@ public Void call() { } private void tryAcquireLock() throws InterruptedException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Acquiring the lock for " + this.context); - } + LOGGER.debug(() -> "Acquiring the lock for " + this.context); // We always try to acquire the lock, in case it expired boolean acquired = this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS); @@ -436,8 +426,8 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR this.lock.unlock(); } catch (Exception e1) { - LOGGER.debug("Could not unlock - treat as broken " + this.context + - ". Revoking " + (isRunning() ? " and retrying..." : "..."), e1); + LOGGER.debug(e1, () -> "Could not unlock - treat as broken " + this.context + + ". Revoking " + (isRunning() ? " and retrying..." : "...")); } // The lock was broken and we are no longer leader @@ -462,18 +452,16 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR Thread.currentThread().interrupt(); } } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Error acquiring the lock for " + this.context + - ". " + (isRunning() ? "Retrying..." : ""), ex); - } + LOGGER.debug(ex, () -> + "Error acquiring the lock for " + this.context + ". " + (isRunning() ? "Retrying..." : "")); } return false; } private void restartSelectorBecauseOfError(Exception ex) { - LOGGER.warn("Restarting LeaderSelector for " + this.context + " because of error.", ex); + LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error."); LockRegistryLeaderInitiator.this.future = - LockRegistryLeaderInitiator.this.executorService.submit( + LockRegistryLeaderInitiator.this.taskExecutor.submit( () -> { // Give it a chance to elect some other leader. Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis); @@ -492,8 +480,8 @@ private void handleGranted() throws InterruptedException { LockRegistryLeaderInitiator.this.leaderEventPublisher.publishOnGranted( LockRegistryLeaderInitiator.this, this.context, this.lockKey); } - catch (Exception e) { - LOGGER.warn("Error publishing OnGranted event.", e); + catch (Exception ex) { + LOGGER.warn(ex, "Error publishing OnGranted event."); } } } @@ -506,8 +494,8 @@ private void handleRevoked() { LockRegistryLeaderInitiator.this, this.context, LockRegistryLeaderInitiator.this.candidate.getRole()); } - catch (Exception e) { - LOGGER.warn("Error publishing OnRevoked event.", e); + catch (Exception ex) { + LOGGER.warn(ex, "Error publishing OnRevoked event."); } } } @@ -520,8 +508,8 @@ private void publishFailedToAcquire() { this.context, LockRegistryLeaderInitiator.this.candidate.getRole()); } - catch (Exception e) { - LOGGER.warn("Error publishing OnFailedToAcquire event.", e); + catch (Exception ex) { + LOGGER.warn(ex, "Error publishing OnFailedToAcquire event."); } } } @@ -543,9 +531,7 @@ public boolean isLeader() { @Override public void yield() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Yielding leadership from " + this); - } + LOGGER.debug(() -> "Yielding leadership from " + this); LockRegistryLeaderInitiator.this.leaderSelector.yielding = true; } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java index 15fbcd1f689..e7dc3efc75b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2022 the original author or authors. + * Copyright 2012-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,27 +17,24 @@ package org.springframework.integration.support.leader; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.core.task.support.ExecutorServiceAdapter; +import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.integration.leader.Context; import org.springframework.integration.leader.DefaultCandidate; import org.springframework.integration.leader.event.DefaultLeaderEventPublisher; import org.springframework.integration.leader.event.LeaderEventPublisher; import org.springframework.integration.support.locks.DefaultLockRegistry; import org.springframework.integration.support.locks.LockRegistry; -import org.springframework.integration.test.util.TestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -69,7 +66,7 @@ public class LockRegistryLeaderInitiatorTests { private final LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate()); - @Before + @BeforeEach public void init() { this.initiator.setLeaderEventPublisher(new CountingPublisher(this.granted, this.revoked)); } @@ -255,9 +252,8 @@ public void testGracefulLeaderSelectorExit() throws Exception { .given(lock) .tryLock(anyLong(), eq(TimeUnit.MILLISECONDS)); - new DirectFieldAccessor(another).setPropertyValue("executorService", - new ExecutorServiceAdapter( - new SyncTaskExecutor())); + new DirectFieldAccessor(another).setPropertyValue("taskExecutor", + new TaskExecutorAdapter(new SyncTaskExecutor())); another.start(); @@ -297,29 +293,6 @@ public void testExceptionFromLock() throws Exception { another.stop(); } - @Test - public void shouldShutdownInternalExecutorService() { - this.initiator.start(); - this.initiator.destroy(); - - ExecutorService executorService = - TestUtils.getPropertyValue(this.initiator, "executorService", ExecutorService.class); - - assertThat(executorService.isShutdown()).isTrue(); - } - - @Test - public void doNotShutdownProvidedExecutorService() { - LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(this.registry); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - another.setExecutorService(executorService); - - another.start(); - another.destroy(); - - assertThat(executorService.isShutdown()).isFalse(); - } - private static class CountingPublisher implements LeaderEventPublisher { private final CountDownLatch granted; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java index c83849ca027..2a843119cf3 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -27,13 +25,15 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.cp.CPSubsystem; import com.hazelcast.cp.lock.FencedLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.SmartLifecycle; +import org.springframework.core.log.LogAccessor; +import org.springframework.core.log.LogMessage; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.integration.leader.Candidate; import org.springframework.integration.leader.Context; import org.springframework.integration.leader.DefaultCandidate; @@ -57,9 +57,7 @@ */ public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware { - private static final Log logger = LogFactory.getLog(LeaderInitiator.class); - - private static int threadNameCount = 0; + private static final LogAccessor logger = new LogAccessor(LeaderInitiator.class); private static final Context NULL_CONTEXT = new NullContext(); @@ -75,12 +73,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat /** * Executor service for running leadership daemon. */ - private final ExecutorService executorService = - Executors.newSingleThreadExecutor(r -> { - Thread thread = new Thread(r, "Hazelcast-leadership-" + (threadNameCount++)); - thread.setDaemon(true); - return thread; - }); + private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("Hazelcast-leadership-"); private long heartBeatMillis = LockRegistryLeaderInitiator.DEFAULT_HEART_BEAT_TIME; @@ -98,7 +91,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat private int phase; /** - * Future returned by submitting an {@link LeaderSelector} to {@link #executorService}. + * Future returned by submitting an {@link LeaderSelector} to {@link #taskExecutor}. * This is used to cancel leadership. */ private volatile Future future; @@ -130,7 +123,17 @@ public LeaderInitiator(HazelcastInstance client, Candidate candidate) { } /** - * Sets the {@link LeaderEventPublisher}. + * Set a {@link AsyncTaskExecutor} for running leadership daemon. + * @param taskExecutor the {@link AsyncTaskExecutor} to use. + * @since 6.2 + */ + public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { + Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null."); + this.taskExecutor = taskExecutor; + } + + /** + * Set the {@link LeaderEventPublisher}. * @param leaderEventPublisher the event publisher */ public void setLeaderEventPublisher(LeaderEventPublisher leaderEventPublisher) { @@ -209,7 +212,7 @@ public synchronized void start() { if (!this.running) { this.leaderSelector = new LeaderSelector(); this.running = true; - this.future = this.executorService.submit(this.leaderSelector); + this.future = this.taskExecutor.submit(this.leaderSelector); } } @@ -246,17 +249,14 @@ public boolean isRunning() { @Override public void destroy() { stop(); - this.executorService.shutdown(); } FencedLock getLock() { CPSubsystem cpSubSystem = this.client.getCPSubsystem(); FencedLock lock = cpSubSystem.getLock(this.candidate.getRole()); - if (logger.isDebugEnabled()) { - logger.debug( - String.format("Use lock groupId '%s', lock count '%s'", lock.getGroupId(), lock.getLockCount())); - } + logger.debug( + LogMessage.format("Use lock groupId '%s', lock count '%s'", lock.getGroupId(), lock.getLockCount())); return lock; } @@ -278,17 +278,17 @@ public Void call() { try { while (isRunning()) { try { - if (logger.isTraceEnabled()) { - logger.trace("Am I the leader (" + LeaderInitiator.this.candidate.getRole() + ") ? " - + this.leader); - } + logger.trace(() -> + "Am I the leader (" + LeaderInitiator.this.candidate.getRole() + ")? " + this.leader); if (getLock().isLockedByCurrentThread()) { if (!this.leader) { // Since we have the lock we need to ensure that the leader flag is set this.leader = true; } // Give it a chance to expire. - if (LeaderInitiator.this.yieldSign.tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) { + if (LeaderInitiator.this.yieldSign + .tryAcquire(LeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS)) { + revokeLeadership(); // Give it a chance to elect some other leader. Thread.sleep(LeaderInitiator.this.busyWaitMillis); @@ -305,7 +305,7 @@ public Void call() { } } } - catch (Exception e) { + catch (Exception ex) { // The lock was broken and we are no longer leader revokeLeadership(); @@ -316,14 +316,12 @@ public Void call() { Thread.sleep(LeaderInitiator.this.busyWaitMillis); } catch (InterruptedException e1) { - // Ignore interruption and let it to be caught on the next cycle. + // Ignore interruption and let it be caught on the next cycle. Thread.currentThread().interrupt(); } } - if (logger.isDebugEnabled()) { - logger.debug("Error acquiring the lock for " + this.context + - ". " + (isRunning() ? "Retrying..." : ""), e); - } + logger.debug(ex, () -> "Error acquiring the lock for " + this.context + + ". " + (isRunning() ? "Retrying..." : "")); } } } @@ -342,8 +340,8 @@ private void revokeLeadership() { getLock().unlock(); } catch (Exception e1) { - logger.warn("Could not unlock - treat as broken " + this.context + ". Revoking " - + (isRunning() ? " and retrying..." : "..."), e1); + logger.warn(e1, () -> "Could not unlock - treat as broken " + this.context + ". Revoking " + + (isRunning() ? " and retrying..." : "...")); } @@ -359,8 +357,8 @@ private void handleGranted() throws InterruptedException { LeaderInitiator.this.leaderEventPublisher.publishOnGranted( LeaderInitiator.this, this.context, this.role); } - catch (Exception e) { - logger.warn("Error publishing OnGranted event.", e); + catch (Exception ex) { + logger.warn(ex, "Error publishing OnGranted event."); } } } @@ -372,8 +370,8 @@ private void handleRevoked() { LeaderInitiator.this.leaderEventPublisher.publishOnRevoked( LeaderInitiator.this, this.context, this.role); } - catch (Exception e) { - logger.warn("Error publishing OnRevoked event.", e); + catch (Exception ex) { + logger.warn(ex, "Error publishing OnRevoked event."); } } } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java index c68f166008d..08d0c74b4f0 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -33,10 +32,12 @@ import org.springframework.context.SmartLifecycle; import org.springframework.core.log.LogAccessor; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.util.UUIDConverter; import org.springframework.lang.Nullable; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; /** @@ -73,8 +74,7 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc private final String tablePrefix; - @Nullable - private ExecutorService executor; + private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("postgres-channel-message-table-subscriber-"); private CountDownLatch latch = new CountDownLatch(0); @@ -108,9 +108,21 @@ public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupp * listening for notifications as a blocking operation which will permanently block a thread of this executor * while running. * @param executor The executor to use or {@code null} if an executor should be created by this class. + * @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)} */ - public synchronized void setExecutor(@Nullable ExecutorService executor) { - this.executor = executor; + @Deprecated(since = "6.2", forRemoval = true) + public synchronized void setExecutor(ExecutorService executor) { + setTaskExecutor(new TaskExecutorAdapter(executor)); + } + + /** + * Provide a managed {@link AsyncTaskExecutor} for Postgres listener daemon. + * @param taskExecutor the {@link AsyncTaskExecutor} to use. + * @since 6.2 + */ + public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { + Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null."); + this.taskExecutor = taskExecutor; } /** @@ -141,18 +153,11 @@ public synchronized void start() { if (this.latch.getCount() > 0) { return; } - ExecutorService executorToUse = this.executor; - if (executorToUse == null) { - CustomizableThreadFactory threadFactory = - new CustomizableThreadFactory("postgres-channel-message-table-subscriber-"); - threadFactory.setDaemon(true); - executorToUse = Executors.newSingleThreadExecutor(threadFactory); - this.executor = executorToUse; - } + this.latch = new CountDownLatch(1); CountDownLatch startingLatch = new CountDownLatch(1); - this.future = executorToUse.submit(() -> { + this.future = this.taskExecutor.submit(() -> { try { while (isActive()) { try { diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java index 422b01333ab..67428a30a2f 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java @@ -65,7 +65,7 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build(); - private Executor executor = new SimpleAsyncTaskExecutor(); + private Executor executor; /** * Create a subscribable channel for a Postgres database. @@ -115,6 +115,14 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { this.retryTemplate = retryTemplate; } + @Override + protected void onInit() { + super.onInit(); + if (this.executor == null) { + this.executor = new SimpleAsyncTaskExecutor(getBeanName() + "-dispatcher-"); + } + } + @Override public boolean subscribe(MessageHandler handler) { boolean subscribed = super.subscribe(handler); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index bbe9ab567d3..f29a8bf422d 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -124,6 +124,8 @@ void setUp(TestInfo testInfo) { this.postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, groupId, postgresChannelMessageTableSubscriber); + this.postgresSubscribableChannel.setBeanName("testPostgresChannel"); + this.postgresSubscribableChannel.afterPropertiesSet(); } @AfterEach diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java index 3f31cbf8e9c..a82a8dbb384 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,10 +67,9 @@ public class SubscribableRedisChannel extends AbstractMessageChannel private final String topicName; - private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(true); + private Executor taskExecutor; - // defaults - private Executor taskExecutor = new SimpleAsyncTaskExecutor(); + private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(true); private RedisSerializer serializer = new StringRedisSerializer(); @@ -148,6 +147,11 @@ public void onInit() { ((BeanFactoryAware) this.messageConverter).setBeanFactory(beanFactory); } this.container.setConnectionFactory(this.connectionFactory); + + if (this.taskExecutor == null) { + this.taskExecutor = new SimpleAsyncTaskExecutor(getBeanName() + "-"); + } + if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { ErrorHandler errorHandler = ChannelUtils.getErrorHandler(beanFactory); this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler); diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/leader/RedisLockRegistryLeaderInitiatorTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/leader/RedisLockRegistryLeaderInitiatorTests.java index cfd4175e88d..667b6ef9fdf 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/leader/RedisLockRegistryLeaderInitiatorTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/leader/RedisLockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,12 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.leader.Context; import org.springframework.integration.leader.DefaultCandidate; @@ -33,7 +33,6 @@ import org.springframework.integration.redis.util.RedisLockRegistry; import org.springframework.integration.support.leader.LockRegistryLeaderInitiator; import org.springframework.integration.test.condition.LogLevels; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import static org.assertj.core.api.Assertions.assertThat; @@ -64,8 +63,7 @@ void testDistributedLeaderElection() throws Exception { for (int i = 0; i < 2; i++) { LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(registry, new DefaultCandidate("foo:" + i, "bar")); - initiator.setExecutorService( - Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-" + i + "-"))); + initiator.setTaskExecutor(new SimpleAsyncTaskExecutor("lock-leadership-" + i + "-")); initiator.setLeaderEventPublisher(countingPublisher); initiators.add(initiator); } diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java index 9b5956fca0d..681f4fd0a86 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.integration.zeromq; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -33,8 +32,8 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.SmartLifecycle; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.lang.Nullable; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; /** @@ -82,8 +81,6 @@ public class ZeroMqProxy implements InitializingBean, SmartLifecycle, BeanNameAw private Executor proxyExecutor; - private boolean proxyExecutorExplicitlySet; - @Nullable private Consumer frontendSocketConfigurer; @@ -126,13 +123,12 @@ public ZeroMqProxy(ZContext context, Type type) { /** * Configure an executor to perform a ZeroMQ proxy loop. * The thread is held until ZeroMQ proxy loop is terminated. - * By default an internal {@link Executors#newSingleThreadExecutor} instance is used. + * By default, an internal {@link Executors#newSingleThreadExecutor} instance is used. * @param proxyExecutor the {@link Executor} to use for ZeroMQ proxy loop */ public void setProxyExecutor(Executor proxyExecutor) { Assert.notNull(proxyExecutor, "'proxyExecutor' must not be null"); this.proxyExecutor = proxyExecutor; - this.proxyExecutorExplicitlySet = true; } /** @@ -242,7 +238,7 @@ public int getPhase() { @Override public void afterPropertiesSet() { if (this.proxyExecutor == null) { - this.proxyExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(this.beanName)); + this.proxyExecutor = new SimpleAsyncTaskExecutor(this.beanName + "-"); } this.controlAddress = "inproc://" + this.beanName + ".control"; if (this.exposeCaptureSocket) { @@ -319,9 +315,7 @@ public boolean isRunning() { @Override public void destroy() { - if (!this.proxyExecutorExplicitlySet) { - ((ExecutorService) this.proxyExecutor).shutdown(); - } + stop(); } private static int bindSocket(ZMQ.Socket socket, int port) {