Skip to content

Commit 51f0f00

Browse files
committed
GH-8642: Revise executors in the project
Fixes #8642 * Rework some `Executors.newSingleThreadExecutor()` to `ExecutorServiceAdapter(new SimpleAsyncTaskExecutor())` * Expose `TaskExecutor` setters; deprecate `ExecutorService`-based * Some other code clean up in the effected classes: `LogAccessor`, no `synchronized` in critical blocks * Give a meaningful prefix for default threads in the context of components, e.g. `SubscribableRedisChannel` - `getBeanName() + "-"`
1 parent b576748 commit 51f0f00

File tree

8 files changed

+132
-162
lines changed

8 files changed

+132
-162
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java

Lines changed: 47 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,24 @@
1818

1919
import java.util.concurrent.Callable;
2020
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
2221
import java.util.concurrent.Future;
2322
import java.util.concurrent.TimeUnit;
2423
import java.util.concurrent.locks.Lock;
2524

26-
import org.apache.commons.logging.Log;
27-
import org.apache.commons.logging.LogFactory;
28-
2925
import org.springframework.beans.factory.DisposableBean;
3026
import org.springframework.context.ApplicationEventPublisher;
3127
import org.springframework.context.ApplicationEventPublisherAware;
3228
import org.springframework.context.SmartLifecycle;
29+
import org.springframework.core.log.LogAccessor;
30+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
31+
import org.springframework.core.task.TaskExecutor;
32+
import org.springframework.core.task.support.ExecutorServiceAdapter;
3333
import org.springframework.integration.leader.Candidate;
3434
import org.springframework.integration.leader.Context;
3535
import org.springframework.integration.leader.DefaultCandidate;
3636
import org.springframework.integration.leader.event.DefaultLeaderEventPublisher;
3737
import org.springframework.integration.leader.event.LeaderEventPublisher;
3838
import org.springframework.integration.support.locks.LockRegistry;
39-
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
4039
import org.springframework.util.Assert;
4140

4241
/**
@@ -67,9 +66,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
6766

6867
public static final long DEFAULT_BUSY_WAIT_TIME = 50L;
6968

70-
private static final Log LOGGER = LogFactory.getLog(LockRegistryLeaderInitiator.class);
71-
72-
private final Object lifecycleMonitor = new Object();
69+
private static final LogAccessor LOGGER = new LogAccessor(LockRegistryLeaderInitiator.class);
7370

7471
/**
7572
* A lock registry. The locks it manages should be global (whatever that means for the
@@ -104,13 +101,7 @@ public String getRole() {
104101
* Executor service for running leadership daemon.
105102
*/
106103
private ExecutorService executorService =
107-
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-"));
108-
109-
/**
110-
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
111-
* thus should not be shutdown when {@link #destroy()} is called.
112-
*/
113-
private boolean executorServiceExplicitlySet;
104+
new ExecutorServiceAdapter(new SimpleAsyncTaskExecutor("lock-leadership-"));
114105

115106
/**
116107
* Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
@@ -192,10 +183,20 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
192183
* single thread Executor will be used.
193184
* @param executorService the executor service
194185
* @since 5.0.2
186+
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(TaskExecutor)}
195187
*/
188+
@Deprecated(since = "6.2", forRemoval = true)
196189
public void setExecutorService(ExecutorService executorService) {
197190
this.executorService = executorService;
198-
this.executorServiceExplicitlySet = true;
191+
}
192+
193+
/**
194+
* Set a {@link TaskExecutor} for running leadership daemon.
195+
* @param taskExecutor the {@link TaskExecutor} to use.
196+
* @since 6.2
197+
*/
198+
public void setTaskExecutor(TaskExecutor taskExecutor) {
199+
this.executorService = new ExecutorServiceAdapter(taskExecutor);
199200
}
200201

201202
public void setHeartBeatMillis(long heartBeatMillis) {
@@ -224,9 +225,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
224225
*/
225226
@Override
226227
public boolean isRunning() {
227-
synchronized (this.lifecycleMonitor) {
228-
return this.running;
229-
}
228+
return this.running;
230229
}
231230

232231
@Override
@@ -287,43 +286,36 @@ public void setPublishFailedEvents(boolean publishFailedEvents) {
287286
* Start the registration of the {@link #candidate} for leader election.
288287
*/
289288
@Override
290-
public void start() {
289+
public synchronized void start() {
291290
if (this.leaderEventPublisher == null && this.applicationEventPublisher != null) {
292291
this.leaderEventPublisher = new DefaultLeaderEventPublisher(this.applicationEventPublisher);
293292
}
294-
synchronized (this.lifecycleMonitor) {
295-
if (!this.running) {
296-
this.leaderSelector = new LeaderSelector(buildLeaderPath());
297-
this.running = true;
298-
this.future = this.executorService.submit(this.leaderSelector);
299-
LOGGER.debug("Started LeaderInitiator");
300-
}
293+
if (!this.running) {
294+
this.leaderSelector = new LeaderSelector(buildLeaderPath());
295+
this.running = true;
296+
this.future = this.executorService.submit(this.leaderSelector);
297+
LOGGER.debug("Started LeaderInitiator");
301298
}
302299
}
303300

304301
@Override
305302
public void destroy() {
306303
stop();
307-
if (!this.executorServiceExplicitlySet) {
308-
this.executorService.shutdown();
309-
}
310304
}
311305

312306
/**
313307
* Stop the registration of the {@link #candidate} for leader election. If the
314308
* candidate is currently leader, its leadership will be revoked.
315309
*/
316310
@Override
317-
public void stop() {
318-
synchronized (this.lifecycleMonitor) {
319-
if (this.running) {
320-
this.running = false;
321-
if (this.future != null) {
322-
this.future.cancel(true);
323-
}
324-
this.future = null;
325-
LOGGER.debug("Stopped LeaderInitiator for " + getContext());
311+
public synchronized void stop() {
312+
if (this.running) {
313+
this.running = false;
314+
if (this.future != null) {
315+
this.future.cancel(true);
326316
}
317+
this.future = null;
318+
LOGGER.debug(() -> "Stopped LeaderInitiator for " + getContext());
327319
}
328320
}
329321

@@ -382,9 +374,9 @@ public Void call() {
382374
try {
383375
this.lock.unlock();
384376
}
385-
catch (Exception e) {
386-
LOGGER.debug("Could not unlock during stop for " + this.context
387-
+ " - treat as broken. Revoking...", e);
377+
catch (Exception ex) {
378+
LOGGER.debug(ex, () ->
379+
"Could not unlock during stop for " + this.context + " - treat as broken. Revoking...");
388380
}
389381
// We are stopping, therefore not leading anymore
390382
handleRevoked();
@@ -394,9 +386,7 @@ public Void call() {
394386
}
395387

396388
private void tryAcquireLock() throws InterruptedException {
397-
if (LOGGER.isDebugEnabled()) {
398-
LOGGER.debug("Acquiring the lock for " + this.context);
399-
}
389+
LOGGER.debug(() -> "Acquiring the lock for " + this.context);
400390
// We always try to acquire the lock, in case it expired
401391
boolean acquired =
402392
this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS);
@@ -436,8 +426,8 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
436426
this.lock.unlock();
437427
}
438428
catch (Exception e1) {
439-
LOGGER.debug("Could not unlock - treat as broken " + this.context +
440-
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
429+
LOGGER.debug(e1, () -> "Could not unlock - treat as broken " + this.context +
430+
". Revoking " + (isRunning() ? " and retrying..." : "..."));
441431

442432
}
443433
// The lock was broken and we are no longer leader
@@ -462,16 +452,14 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
462452
Thread.currentThread().interrupt();
463453
}
464454
}
465-
if (LOGGER.isDebugEnabled()) {
466-
LOGGER.debug("Error acquiring the lock for " + this.context +
467-
". " + (isRunning() ? "Retrying..." : ""), ex);
468-
}
455+
LOGGER.debug(ex, () ->
456+
"Error acquiring the lock for " + this.context + ". " + (isRunning() ? "Retrying..." : ""));
469457
}
470458
return false;
471459
}
472460

473461
private void restartSelectorBecauseOfError(Exception ex) {
474-
LOGGER.warn("Restarting LeaderSelector for " + this.context + " because of error.", ex);
462+
LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error.");
475463
LockRegistryLeaderInitiator.this.future =
476464
LockRegistryLeaderInitiator.this.executorService.submit(
477465
() -> {
@@ -492,8 +480,8 @@ private void handleGranted() throws InterruptedException {
492480
LockRegistryLeaderInitiator.this.leaderEventPublisher.publishOnGranted(
493481
LockRegistryLeaderInitiator.this, this.context, this.lockKey);
494482
}
495-
catch (Exception e) {
496-
LOGGER.warn("Error publishing OnGranted event.", e);
483+
catch (Exception ex) {
484+
LOGGER.warn(ex, "Error publishing OnGranted event.");
497485
}
498486
}
499487
}
@@ -506,8 +494,8 @@ private void handleRevoked() {
506494
LockRegistryLeaderInitiator.this, this.context,
507495
LockRegistryLeaderInitiator.this.candidate.getRole());
508496
}
509-
catch (Exception e) {
510-
LOGGER.warn("Error publishing OnRevoked event.", e);
497+
catch (Exception ex) {
498+
LOGGER.warn(ex, "Error publishing OnRevoked event.");
511499
}
512500
}
513501
}
@@ -520,8 +508,8 @@ private void publishFailedToAcquire() {
520508
this.context,
521509
LockRegistryLeaderInitiator.this.candidate.getRole());
522510
}
523-
catch (Exception e) {
524-
LOGGER.warn("Error publishing OnFailedToAcquire event.", e);
511+
catch (Exception ex) {
512+
LOGGER.warn(ex, "Error publishing OnFailedToAcquire event.");
525513
}
526514
}
527515
}
@@ -543,9 +531,7 @@ public boolean isLeader() {
543531

544532
@Override
545533
public void yield() {
546-
if (LOGGER.isDebugEnabled()) {
547-
LOGGER.debug("Yielding leadership from " + this);
548-
}
534+
LOGGER.debug(() -> "Yielding leadership from " + this);
549535
LockRegistryLeaderInitiator.this.leaderSelector.yielding = true;
550536
}
551537

spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2022 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,16 +17,14 @@
1717
package org.springframework.integration.support.leader;
1818

1919
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
2220
import java.util.concurrent.TimeUnit;
2321
import java.util.concurrent.atomic.AtomicBoolean;
2422
import java.util.concurrent.atomic.AtomicReference;
2523
import java.util.concurrent.locks.Lock;
2624
import java.util.concurrent.locks.ReentrantLock;
2725

28-
import org.junit.Before;
29-
import org.junit.Test;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
3028

3129
import org.springframework.beans.DirectFieldAccessor;
3230
import org.springframework.core.task.SyncTaskExecutor;
@@ -37,7 +35,6 @@
3735
import org.springframework.integration.leader.event.LeaderEventPublisher;
3836
import org.springframework.integration.support.locks.DefaultLockRegistry;
3937
import org.springframework.integration.support.locks.LockRegistry;
40-
import org.springframework.integration.test.util.TestUtils;
4138

4239
import static org.assertj.core.api.Assertions.assertThat;
4340
import static org.mockito.ArgumentMatchers.any;
@@ -69,7 +66,7 @@ public class LockRegistryLeaderInitiatorTests {
6966
private final LockRegistryLeaderInitiator initiator =
7067
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());
7168

72-
@Before
69+
@BeforeEach
7370
public void init() {
7471
this.initiator.setLeaderEventPublisher(new CountingPublisher(this.granted, this.revoked));
7572
}
@@ -297,29 +294,6 @@ public void testExceptionFromLock() throws Exception {
297294
another.stop();
298295
}
299296

300-
@Test
301-
public void shouldShutdownInternalExecutorService() {
302-
this.initiator.start();
303-
this.initiator.destroy();
304-
305-
ExecutorService executorService =
306-
TestUtils.getPropertyValue(this.initiator, "executorService", ExecutorService.class);
307-
308-
assertThat(executorService.isShutdown()).isTrue();
309-
}
310-
311-
@Test
312-
public void doNotShutdownProvidedExecutorService() {
313-
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(this.registry);
314-
ExecutorService executorService = Executors.newSingleThreadExecutor();
315-
another.setExecutorService(executorService);
316-
317-
another.start();
318-
another.destroy();
319-
320-
assertThat(executorService.isShutdown()).isFalse();
321-
}
322-
323297
private static class CountingPublisher implements LeaderEventPublisher {
324298

325299
private final CountDownLatch granted;

0 commit comments

Comments
 (0)