Skip to content

Commit 4db9bad

Browse files
authored
GH-8642: Revise executors in the project (#8647)
* GH-8642: Revise executors in the project Fixes #8642 * Rework some `Executors.newSingleThreadExecutor()` to `ExecutorServiceAdapter(new SimpleAsyncTaskExecutor())` * Expose `TaskExecutor` setters; deprecate `ExecutorService`-based * Some other code clean up in the effected classes: `LogAccessor`, no `synchronized` in critical blocks * Give a meaningful prefix for default threads in the context of components, e.g. `SubscribableRedisChannel` - `getBeanName() + "-"` * * Fix `PostgresChannelMessageTableSubscriberTests` for `PostgresSubscribableChannel` initialization to let it create its internal `Executor` * Use an `AsyncTaskExecutor` injection instead of `ExecutorServiceAdapter` wrapping * Fix `LockRegistryLeaderInitiatorTests` for `taskExecutor` injection * Bring back `LockRegistryLeaderInitiator.setExecutorService()` as an accident after property auto-renaming
1 parent aac2adb commit 4db9bad

File tree

9 files changed

+142
-174
lines changed

9 files changed

+142
-174
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

+51-65
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,24 @@
1818

1919
import java.util.concurrent.Callable;
2020
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
2221
import java.util.concurrent.Future;
2322
import java.util.concurrent.TimeUnit;
2423
import java.util.concurrent.locks.Lock;
2524

26-
import org.apache.commons.logging.Log;
27-
import org.apache.commons.logging.LogFactory;
28-
2925
import org.springframework.beans.factory.DisposableBean;
3026
import org.springframework.context.ApplicationEventPublisher;
3127
import org.springframework.context.ApplicationEventPublisherAware;
3228
import org.springframework.context.SmartLifecycle;
29+
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.core.task.AsyncTaskExecutor;
31+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
32+
import org.springframework.core.task.support.TaskExecutorAdapter;
3333
import org.springframework.integration.leader.Candidate;
3434
import org.springframework.integration.leader.Context;
3535
import org.springframework.integration.leader.DefaultCandidate;
3636
import org.springframework.integration.leader.event.DefaultLeaderEventPublisher;
3737
import org.springframework.integration.leader.event.LeaderEventPublisher;
3838
import org.springframework.integration.support.locks.LockRegistry;
39-
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
4039
import org.springframework.util.Assert;
4140

4241
/**
@@ -67,9 +66,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
6766

6867
public static final long DEFAULT_BUSY_WAIT_TIME = 50L;
6968

70-
private static final Log LOGGER = LogFactory.getLog(LockRegistryLeaderInitiator.class);
71-
72-
private final Object lifecycleMonitor = new Object();
69+
private static final LogAccessor LOGGER = new LogAccessor(LockRegistryLeaderInitiator.class);
7370

7471
/**
7572
* A lock registry. The locks it manages should be global (whatever that means for the
@@ -103,14 +100,7 @@ public String getRole() {
103100
/**
104101
* Executor service for running leadership daemon.
105102
*/
106-
private ExecutorService executorService =
107-
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-"));
108-
109-
/**
110-
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
111-
* thus should not be shutdown when {@link #destroy()} is called.
112-
*/
113-
private boolean executorServiceExplicitlySet;
103+
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("lock-leadership-");
114104

115105
/**
116106
* Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
@@ -161,7 +151,7 @@ public String getRole() {
161151

162152
/**
163153
* Future returned by submitting an {@link LeaderSelector} to
164-
* {@link #executorService}. This is used to cancel leadership.
154+
* {@link #taskExecutor}. This is used to cancel leadership.
165155
*/
166156
private volatile Future<?> future;
167157

@@ -192,10 +182,21 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
192182
* single thread Executor will be used.
193183
* @param executorService the executor service
194184
* @since 5.0.2
185+
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)}
195186
*/
187+
@Deprecated(since = "6.2", forRemoval = true)
196188
public void setExecutorService(ExecutorService executorService) {
197-
this.executorService = executorService;
198-
this.executorServiceExplicitlySet = true;
189+
setTaskExecutor(new TaskExecutorAdapter(executorService));
190+
}
191+
192+
/**
193+
* Set a {@link AsyncTaskExecutor} for running leadership daemon.
194+
* @param taskExecutor the {@link AsyncTaskExecutor} to use.
195+
* @since 6.2
196+
*/
197+
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
198+
Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null.");
199+
this.taskExecutor = taskExecutor;
199200
}
200201

201202
public void setHeartBeatMillis(long heartBeatMillis) {
@@ -224,9 +225,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
224225
*/
225226
@Override
226227
public boolean isRunning() {
227-
synchronized (this.lifecycleMonitor) {
228-
return this.running;
229-
}
228+
return this.running;
230229
}
231230

232231
@Override
@@ -287,43 +286,36 @@ public void setPublishFailedEvents(boolean publishFailedEvents) {
287286
* Start the registration of the {@link #candidate} for leader election.
288287
*/
289288
@Override
290-
public void start() {
289+
public synchronized void start() {
291290
if (this.leaderEventPublisher == null && this.applicationEventPublisher != null) {
292291
this.leaderEventPublisher = new DefaultLeaderEventPublisher(this.applicationEventPublisher);
293292
}
294-
synchronized (this.lifecycleMonitor) {
295-
if (!this.running) {
296-
this.leaderSelector = new LeaderSelector(buildLeaderPath());
297-
this.running = true;
298-
this.future = this.executorService.submit(this.leaderSelector);
299-
LOGGER.debug("Started LeaderInitiator");
300-
}
293+
if (!this.running) {
294+
this.leaderSelector = new LeaderSelector(buildLeaderPath());
295+
this.running = true;
296+
this.future = this.taskExecutor.submit(this.leaderSelector);
297+
LOGGER.debug("Started LeaderInitiator");
301298
}
302299
}
303300

304301
@Override
305302
public void destroy() {
306303
stop();
307-
if (!this.executorServiceExplicitlySet) {
308-
this.executorService.shutdown();
309-
}
310304
}
311305

312306
/**
313307
* Stop the registration of the {@link #candidate} for leader election. If the
314308
* candidate is currently leader, its leadership will be revoked.
315309
*/
316310
@Override
317-
public void stop() {
318-
synchronized (this.lifecycleMonitor) {
319-
if (this.running) {
320-
this.running = false;
321-
if (this.future != null) {
322-
this.future.cancel(true);
323-
}
324-
this.future = null;
325-
LOGGER.debug("Stopped LeaderInitiator for " + getContext());
311+
public synchronized void stop() {
312+
if (this.running) {
313+
this.running = false;
314+
if (this.future != null) {
315+
this.future.cancel(true);
326316
}
317+
this.future = null;
318+
LOGGER.debug(() -> "Stopped LeaderInitiator for " + getContext());
327319
}
328320
}
329321

@@ -382,9 +374,9 @@ public Void call() {
382374
try {
383375
this.lock.unlock();
384376
}
385-
catch (Exception e) {
386-
LOGGER.debug("Could not unlock during stop for " + this.context
387-
+ " - treat as broken. Revoking...", e);
377+
catch (Exception ex) {
378+
LOGGER.debug(ex, () ->
379+
"Could not unlock during stop for " + this.context + " - treat as broken. Revoking...");
388380
}
389381
// We are stopping, therefore not leading anymore
390382
handleRevoked();
@@ -394,9 +386,7 @@ public Void call() {
394386
}
395387

396388
private void tryAcquireLock() throws InterruptedException {
397-
if (LOGGER.isDebugEnabled()) {
398-
LOGGER.debug("Acquiring the lock for " + this.context);
399-
}
389+
LOGGER.debug(() -> "Acquiring the lock for " + this.context);
400390
// We always try to acquire the lock, in case it expired
401391
boolean acquired =
402392
this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS);
@@ -436,8 +426,8 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
436426
this.lock.unlock();
437427
}
438428
catch (Exception e1) {
439-
LOGGER.debug("Could not unlock - treat as broken " + this.context +
440-
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
429+
LOGGER.debug(e1, () -> "Could not unlock - treat as broken " + this.context +
430+
". Revoking " + (isRunning() ? " and retrying..." : "..."));
441431

442432
}
443433
// The lock was broken and we are no longer leader
@@ -462,18 +452,16 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
462452
Thread.currentThread().interrupt();
463453
}
464454
}
465-
if (LOGGER.isDebugEnabled()) {
466-
LOGGER.debug("Error acquiring the lock for " + this.context +
467-
". " + (isRunning() ? "Retrying..." : ""), ex);
468-
}
455+
LOGGER.debug(ex, () ->
456+
"Error acquiring the lock for " + this.context + ". " + (isRunning() ? "Retrying..." : ""));
469457
}
470458
return false;
471459
}
472460

473461
private void restartSelectorBecauseOfError(Exception ex) {
474-
LOGGER.warn("Restarting LeaderSelector for " + this.context + " because of error.", ex);
462+
LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error.");
475463
LockRegistryLeaderInitiator.this.future =
476-
LockRegistryLeaderInitiator.this.executorService.submit(
464+
LockRegistryLeaderInitiator.this.taskExecutor.submit(
477465
() -> {
478466
// Give it a chance to elect some other leader.
479467
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
@@ -492,8 +480,8 @@ private void handleGranted() throws InterruptedException {
492480
LockRegistryLeaderInitiator.this.leaderEventPublisher.publishOnGranted(
493481
LockRegistryLeaderInitiator.this, this.context, this.lockKey);
494482
}
495-
catch (Exception e) {
496-
LOGGER.warn("Error publishing OnGranted event.", e);
483+
catch (Exception ex) {
484+
LOGGER.warn(ex, "Error publishing OnGranted event.");
497485
}
498486
}
499487
}
@@ -506,8 +494,8 @@ private void handleRevoked() {
506494
LockRegistryLeaderInitiator.this, this.context,
507495
LockRegistryLeaderInitiator.this.candidate.getRole());
508496
}
509-
catch (Exception e) {
510-
LOGGER.warn("Error publishing OnRevoked event.", e);
497+
catch (Exception ex) {
498+
LOGGER.warn(ex, "Error publishing OnRevoked event.");
511499
}
512500
}
513501
}
@@ -520,8 +508,8 @@ private void publishFailedToAcquire() {
520508
this.context,
521509
LockRegistryLeaderInitiator.this.candidate.getRole());
522510
}
523-
catch (Exception e) {
524-
LOGGER.warn("Error publishing OnFailedToAcquire event.", e);
511+
catch (Exception ex) {
512+
LOGGER.warn(ex, "Error publishing OnFailedToAcquire event.");
525513
}
526514
}
527515
}
@@ -543,9 +531,7 @@ public boolean isLeader() {
543531

544532
@Override
545533
public void yield() {
546-
if (LOGGER.isDebugEnabled()) {
547-
LOGGER.debug("Yielding leadership from " + this);
548-
}
534+
LOGGER.debug(() -> "Yielding leadership from " + this);
549535
LockRegistryLeaderInitiator.this.leaderSelector.yielding = true;
550536
}
551537

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

+7-34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2022 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,27 +17,24 @@
1717
package org.springframework.integration.support.leader;
1818

1919
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
2220
import java.util.concurrent.TimeUnit;
2321
import java.util.concurrent.atomic.AtomicBoolean;
2422
import java.util.concurrent.atomic.AtomicReference;
2523
import java.util.concurrent.locks.Lock;
2624
import java.util.concurrent.locks.ReentrantLock;
2725

28-
import org.junit.Before;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
3028

3129
import org.springframework.beans.DirectFieldAccessor;
3230
import org.springframework.core.task.SyncTaskExecutor;
33-
import org.springframework.core.task.support.ExecutorServiceAdapter;
31+
import org.springframework.core.task.support.TaskExecutorAdapter;
3432
import org.springframework.integration.leader.Context;
3533
import org.springframework.integration.leader.DefaultCandidate;
3634
import org.springframework.integration.leader.event.DefaultLeaderEventPublisher;
3735
import org.springframework.integration.leader.event.LeaderEventPublisher;
3836
import org.springframework.integration.support.locks.DefaultLockRegistry;
3937
import org.springframework.integration.support.locks.LockRegistry;
40-
import org.springframework.integration.test.util.TestUtils;
4138

4239
import static org.assertj.core.api.Assertions.assertThat;
4340
import static org.mockito.ArgumentMatchers.any;
@@ -69,7 +66,7 @@ public class LockRegistryLeaderInitiatorTests {
6966
private final LockRegistryLeaderInitiator initiator =
7067
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
7168

72-
@Before
69+
@BeforeEach
7370
public void init() {
7471
this.initiator.setLeaderEventPublisher(new CountingPublisher(this.granted, this.revoked));
7572
}
@@ -255,9 +252,8 @@ public void testGracefulLeaderSelectorExit() throws Exception {
255252
.given(lock)
256253
.tryLock(anyLong(), eq(TimeUnit.MILLISECONDS));
257254

258-
new DirectFieldAccessor(another).setPropertyValue("executorService",
259-
new ExecutorServiceAdapter(
260-
new SyncTaskExecutor()));
255+
new DirectFieldAccessor(another).setPropertyValue("taskExecutor",
256+
new TaskExecutorAdapter(new SyncTaskExecutor()));
261257

262258
another.start();
263259

@@ -297,29 +293,6 @@ public void testExceptionFromLock() throws Exception {
297293
another.stop();
298294
}
299295

300-
@Test
301-
public void shouldShutdownInternalExecutorService() {
302-
this.initiator.start();
303-
this.initiator.destroy();
304-
305-
ExecutorService executorService =
306-
TestUtils.getPropertyValue(this.initiator, "executorService", ExecutorService.class);
307-
308-
assertThat(executorService.isShutdown()).isTrue();
309-
}
310-
311-
@Test
312-
public void doNotShutdownProvidedExecutorService() {
313-
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(this.registry);
314-
ExecutorService executorService = Executors.newSingleThreadExecutor();
315-
another.setExecutorService(executorService);
316-
317-
another.start();
318-
another.destroy();
319-
320-
assertThat(executorService.isShutdown()).isFalse();
321-
}
322-
323296
private static class CountingPublisher implements LeaderEventPublisher {
324297

325298
private final CountDownLatch granted;

0 commit comments

Comments
 (0)