Skip to content

Commit d2d3698

Browse files
committed
Use an AsyncTaskExecutor injection instead of ExecutorServiceAdapter wrapping
1 parent 8d5398d commit d2d3698

File tree

3 files changed

+35
-38
lines changed

3 files changed

+35
-38
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.springframework.context.ApplicationEventPublisherAware;
2828
import org.springframework.context.SmartLifecycle;
2929
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.core.task.AsyncTaskExecutor;
3031
import org.springframework.core.task.SimpleAsyncTaskExecutor;
31-
import org.springframework.core.task.TaskExecutor;
32-
import org.springframework.core.task.support.ExecutorServiceAdapter;
32+
import org.springframework.core.task.support.TaskExecutorAdapter;
3333
import org.springframework.integration.leader.Candidate;
3434
import org.springframework.integration.leader.Context;
3535
import org.springframework.integration.leader.DefaultCandidate;
@@ -100,8 +100,7 @@ public String getRole() {
100100
/**
101101
* Executor service for running leadership daemon.
102102
*/
103-
private ExecutorService executorService =
104-
new ExecutorServiceAdapter(new SimpleAsyncTaskExecutor("lock-leadership-"));
103+
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("lock-leadership-");
105104

106105
/**
107106
* Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
@@ -152,7 +151,7 @@ public String getRole() {
152151

153152
/**
154153
* Future returned by submitting an {@link LeaderSelector} to
155-
* {@link #executorService}. This is used to cancel leadership.
154+
* {@link #taskExecutor}. This is used to cancel leadership.
156155
*/
157156
private volatile Future<?> future;
158157

@@ -181,22 +180,23 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
181180
/**
182181
* Set the {@link ExecutorService}, where is not provided then a default of
183182
* single thread Executor will be used.
184-
* @param executorService the executor service
183+
* @param taskExecutor the executor service
185184
* @since 5.0.2
186-
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(TaskExecutor)}
185+
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)}
187186
*/
188187
@Deprecated(since = "6.2", forRemoval = true)
189-
public void setExecutorService(ExecutorService executorService) {
190-
this.executorService = executorService;
188+
public void setTaskExecutor(ExecutorService taskExecutor) {
189+
setTaskExecutor(new TaskExecutorAdapter(taskExecutor));
191190
}
192191

193192
/**
194-
* Set a {@link TaskExecutor} for running leadership daemon.
195-
* @param taskExecutor the {@link TaskExecutor} to use.
193+
* Set a {@link AsyncTaskExecutor} for running leadership daemon.
194+
* @param taskExecutor the {@link AsyncTaskExecutor} to use.
196195
* @since 6.2
197196
*/
198-
public void setTaskExecutor(TaskExecutor taskExecutor) {
199-
this.executorService = new ExecutorServiceAdapter(taskExecutor);
197+
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
198+
Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null.");
199+
this.taskExecutor = taskExecutor;
200200
}
201201

202202
public void setHeartBeatMillis(long heartBeatMillis) {
@@ -293,7 +293,7 @@ public synchronized void start() {
293293
if (!this.running) {
294294
this.leaderSelector = new LeaderSelector(buildLeaderPath());
295295
this.running = true;
296-
this.future = this.executorService.submit(this.leaderSelector);
296+
this.future = this.taskExecutor.submit(this.leaderSelector);
297297
LOGGER.debug("Started LeaderInitiator");
298298
}
299299
}
@@ -461,7 +461,7 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
461461
private void restartSelectorBecauseOfError(Exception ex) {
462462
LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error.");
463463
LockRegistryLeaderInitiator.this.future =
464-
LockRegistryLeaderInitiator.this.executorService.submit(
464+
LockRegistryLeaderInitiator.this.taskExecutor.submit(
465465
() -> {
466466
// Give it a chance to elect some other leader.
467467
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);

spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/leader/LeaderInitiator.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919

2020
import java.util.concurrent.Callable;
21-
import java.util.concurrent.ExecutorService;
2221
import java.util.concurrent.Future;
2322
import java.util.concurrent.Semaphore;
2423
import java.util.concurrent.TimeUnit;
@@ -33,9 +32,8 @@
3332
import org.springframework.context.SmartLifecycle;
3433
import org.springframework.core.log.LogAccessor;
3534
import org.springframework.core.log.LogMessage;
35+
import org.springframework.core.task.AsyncTaskExecutor;
3636
import org.springframework.core.task.SimpleAsyncTaskExecutor;
37-
import org.springframework.core.task.TaskExecutor;
38-
import org.springframework.core.task.support.ExecutorServiceAdapter;
3937
import org.springframework.integration.leader.Candidate;
4038
import org.springframework.integration.leader.Context;
4139
import org.springframework.integration.leader.DefaultCandidate;
@@ -75,8 +73,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
7573
/**
7674
* Executor service for running leadership daemon.
7775
*/
78-
private ExecutorService executorService =
79-
new ExecutorServiceAdapter(new SimpleAsyncTaskExecutor("Hazelcast-leadership-"));
76+
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("Hazelcast-leadership-");
8077

8178
private long heartBeatMillis = LockRegistryLeaderInitiator.DEFAULT_HEART_BEAT_TIME;
8279

@@ -94,7 +91,7 @@ public class LeaderInitiator implements SmartLifecycle, DisposableBean, Applicat
9491
private int phase;
9592

9693
/**
97-
* Future returned by submitting an {@link LeaderSelector} to {@link #executorService}.
94+
* Future returned by submitting an {@link LeaderSelector} to {@link #taskExecutor}.
9895
* This is used to cancel leadership.
9996
*/
10097
private volatile Future<Void> future;
@@ -126,12 +123,13 @@ public LeaderInitiator(HazelcastInstance client, Candidate candidate) {
126123
}
127124

128125
/**
129-
* Set a {@link TaskExecutor} for running leadership daemon.
130-
* @param taskExecutor the {@link TaskExecutor} to use.
126+
* Set a {@link AsyncTaskExecutor} for running leadership daemon.
127+
* @param taskExecutor the {@link AsyncTaskExecutor} to use.
131128
* @since 6.2
132129
*/
133-
public void setTaskExecutor(TaskExecutor taskExecutor) {
134-
this.executorService = new ExecutorServiceAdapter(taskExecutor);
130+
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
131+
Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null.");
132+
this.taskExecutor = taskExecutor;
135133
}
136134

137135
/**
@@ -214,7 +212,7 @@ public synchronized void start() {
214212
if (!this.running) {
215213
this.leaderSelector = new LeaderSelector();
216214
this.running = true;
217-
this.future = this.executorService.submit(this.leaderSelector);
215+
this.future = this.taskExecutor.submit(this.leaderSelector);
218216
}
219217
}
220218

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232

3333
import org.springframework.context.SmartLifecycle;
3434
import org.springframework.core.log.LogAccessor;
35+
import org.springframework.core.task.AsyncTaskExecutor;
3536
import org.springframework.core.task.SimpleAsyncTaskExecutor;
36-
import org.springframework.core.task.TaskExecutor;
37-
import org.springframework.core.task.support.ExecutorServiceAdapter;
37+
import org.springframework.core.task.support.TaskExecutorAdapter;
3838
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
3939
import org.springframework.integration.util.UUIDConverter;
4040
import org.springframework.lang.Nullable;
@@ -74,8 +74,7 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
7474

7575
private final String tablePrefix;
7676

77-
private ExecutorService executor =
78-
new ExecutorServiceAdapter(new SimpleAsyncTaskExecutor("postgres-channel-message-table-subscriber-"));
77+
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("postgres-channel-message-table-subscriber-");
7978

8079
private CountDownLatch latch = new CountDownLatch(0);
8180

@@ -109,21 +108,21 @@ public PostgresChannelMessageTableSubscriber(PgConnectionSupplier connectionSupp
109108
* listening for notifications as a blocking operation which will permanently block a thread of this executor
110109
* while running.
111110
* @param executor The executor to use or {@code null} if an executor should be created by this class.
112-
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(TaskExecutor)}
111+
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)}
113112
*/
114113
@Deprecated(since = "6.2", forRemoval = true)
115114
public synchronized void setExecutor(ExecutorService executor) {
116-
Assert.notNull(executor, "An 'executor' must not be null.");
117-
this.executor = executor;
115+
setTaskExecutor(new TaskExecutorAdapter(executor));
118116
}
119117

120118
/**
121-
* Provide a managed {@link TaskExecutor} for Postgres listener daemon.
122-
* @param taskExecutor the {@link TaskExecutor} to use.
119+
* Provide a managed {@link AsyncTaskExecutor} for Postgres listener daemon.
120+
* @param taskExecutor the {@link AsyncTaskExecutor} to use.
123121
* @since 6.2
124122
*/
125-
public void setTaskExecutor(TaskExecutor taskExecutor) {
126-
this.executor = new ExecutorServiceAdapter(taskExecutor);
123+
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
124+
Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null.");
125+
this.taskExecutor = taskExecutor;
127126
}
128127

129128
/**
@@ -158,7 +157,7 @@ public synchronized void start() {
158157
this.latch = new CountDownLatch(1);
159158

160159
CountDownLatch startingLatch = new CountDownLatch(1);
161-
this.future = this.executor.submit(() -> {
160+
this.future = this.taskExecutor.submit(() -> {
162161
try {
163162
while (isActive()) {
164163
try {

0 commit comments

Comments
 (0)