Skip to content

GH-8642: Revise executors in the project #8647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.integration.leader.Candidate;
import org.springframework.integration.leader.Context;
import org.springframework.integration.leader.DefaultCandidate;
import org.springframework.integration.leader.event.DefaultLeaderEventPublisher;
import org.springframework.integration.leader.event.LeaderEventPublisher;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -67,9 +66,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe

public static final long DEFAULT_BUSY_WAIT_TIME = 50L;

private static final Log LOGGER = LogFactory.getLog(LockRegistryLeaderInitiator.class);

private final Object lifecycleMonitor = new Object();
private static final LogAccessor LOGGER = new LogAccessor(LockRegistryLeaderInitiator.class);

/**
* A lock registry. The locks it manages should be global (whatever that means for the
Expand Down Expand Up @@ -103,14 +100,7 @@ public String getRole() {
/**
* Executor service for running leadership daemon.
*/
private ExecutorService executorService =
Executors.newSingleThreadExecutor(new CustomizableThreadFactory("lock-leadership-"));

/**
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
* thus should not be shutdown when {@link #destroy()} is called.
*/
private boolean executorServiceExplicitlySet;
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("lock-leadership-");

/**
* Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
Expand Down Expand Up @@ -161,7 +151,7 @@ public String getRole() {

/**
* Future returned by submitting an {@link LeaderSelector} to
* {@link #executorService}. This is used to cancel leadership.
* {@link #taskExecutor}. This is used to cancel leadership.
*/
private volatile Future<?> future;

Expand Down Expand Up @@ -190,12 +180,23 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
/**
* Set the {@link ExecutorService}, where is not provided then a default of
* single thread Executor will be used.
* @param executorService the executor service
* @param taskExecutor the executor service
* @since 5.0.2
* @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)}
*/
@Deprecated(since = "6.2", forRemoval = true)
public void setTaskExecutor(ExecutorService taskExecutor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the name change (and param)? Need to deprecate the existing method.

setTaskExecutor(new TaskExecutorAdapter(taskExecutor));
}

/**
* Set a {@link AsyncTaskExecutor} for running leadership daemon.
* @param taskExecutor the {@link AsyncTaskExecutor} to use.
* @since 6.2
*/
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
this.executorServiceExplicitlySet = true;
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null.");
this.taskExecutor = taskExecutor;
}

public void setHeartBeatMillis(long heartBeatMillis) {
Expand Down Expand Up @@ -224,9 +225,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
*/
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
}
return this.running;
}

@Override
Expand Down Expand Up @@ -287,43 +286,36 @@ public void setPublishFailedEvents(boolean publishFailedEvents) {
* Start the registration of the {@link #candidate} for leader election.
*/
@Override
public void start() {
public synchronized void start() {
if (this.leaderEventPublisher == null && this.applicationEventPublisher != null) {
this.leaderEventPublisher = new DefaultLeaderEventPublisher(this.applicationEventPublisher);
}
synchronized (this.lifecycleMonitor) {
if (!this.running) {
this.leaderSelector = new LeaderSelector(buildLeaderPath());
this.running = true;
this.future = this.executorService.submit(this.leaderSelector);
LOGGER.debug("Started LeaderInitiator");
}
if (!this.running) {
this.leaderSelector = new LeaderSelector(buildLeaderPath());
this.running = true;
this.future = this.taskExecutor.submit(this.leaderSelector);
LOGGER.debug("Started LeaderInitiator");
}
}

@Override
public void destroy() {
stop();
if (!this.executorServiceExplicitlySet) {
this.executorService.shutdown();
}
}

/**
* Stop the registration of the {@link #candidate} for leader election. If the
* candidate is currently leader, its leadership will be revoked.
*/
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (this.running) {
this.running = false;
if (this.future != null) {
this.future.cancel(true);
}
this.future = null;
LOGGER.debug("Stopped LeaderInitiator for " + getContext());
public synchronized void stop() {
if (this.running) {
this.running = false;
if (this.future != null) {
this.future.cancel(true);
}
this.future = null;
LOGGER.debug(() -> "Stopped LeaderInitiator for " + getContext());
}
}

Expand Down Expand Up @@ -382,9 +374,9 @@ public Void call() {
try {
this.lock.unlock();
}
catch (Exception e) {
LOGGER.debug("Could not unlock during stop for " + this.context
+ " - treat as broken. Revoking...", e);
catch (Exception ex) {
LOGGER.debug(ex, () ->
"Could not unlock during stop for " + this.context + " - treat as broken. Revoking...");
}
// We are stopping, therefore not leading anymore
handleRevoked();
Expand All @@ -394,9 +386,7 @@ public Void call() {
}

private void tryAcquireLock() throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Acquiring the lock for " + this.context);
}
LOGGER.debug(() -> "Acquiring the lock for " + this.context);
// We always try to acquire the lock, in case it expired
boolean acquired =
this.lock.tryLock(LockRegistryLeaderInitiator.this.heartBeatMillis, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -436,8 +426,8 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
this.lock.unlock();
}
catch (Exception e1) {
LOGGER.debug("Could not unlock - treat as broken " + this.context +
". Revoking " + (isRunning() ? " and retrying..." : "..."), e1);
LOGGER.debug(e1, () -> "Could not unlock - treat as broken " + this.context +
". Revoking " + (isRunning() ? " and retrying..." : "..."));

}
// The lock was broken and we are no longer leader
Expand All @@ -462,18 +452,16 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
Thread.currentThread().interrupt();
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Error acquiring the lock for " + this.context +
". " + (isRunning() ? "Retrying..." : ""), ex);
}
LOGGER.debug(ex, () ->
"Error acquiring the lock for " + this.context + ". " + (isRunning() ? "Retrying..." : ""));
}
return false;
}

private void restartSelectorBecauseOfError(Exception ex) {
LOGGER.warn("Restarting LeaderSelector for " + this.context + " because of error.", ex);
LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error.");
LockRegistryLeaderInitiator.this.future =
LockRegistryLeaderInitiator.this.executorService.submit(
LockRegistryLeaderInitiator.this.taskExecutor.submit(
() -> {
// Give it a chance to elect some other leader.
Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis);
Expand All @@ -492,8 +480,8 @@ private void handleGranted() throws InterruptedException {
LockRegistryLeaderInitiator.this.leaderEventPublisher.publishOnGranted(
LockRegistryLeaderInitiator.this, this.context, this.lockKey);
}
catch (Exception e) {
LOGGER.warn("Error publishing OnGranted event.", e);
catch (Exception ex) {
LOGGER.warn(ex, "Error publishing OnGranted event.");
}
}
}
Expand All @@ -506,8 +494,8 @@ private void handleRevoked() {
LockRegistryLeaderInitiator.this, this.context,
LockRegistryLeaderInitiator.this.candidate.getRole());
}
catch (Exception e) {
LOGGER.warn("Error publishing OnRevoked event.", e);
catch (Exception ex) {
LOGGER.warn(ex, "Error publishing OnRevoked event.");
}
}
}
Expand All @@ -520,8 +508,8 @@ private void publishFailedToAcquire() {
this.context,
LockRegistryLeaderInitiator.this.candidate.getRole());
}
catch (Exception e) {
LOGGER.warn("Error publishing OnFailedToAcquire event.", e);
catch (Exception ex) {
LOGGER.warn(ex, "Error publishing OnFailedToAcquire event.");
}
}
}
Expand All @@ -543,9 +531,7 @@ public boolean isLeader() {

@Override
public void yield() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Yielding leadership from " + this);
}
LOGGER.debug(() -> "Yielding leadership from " + this);
LockRegistryLeaderInitiator.this.leaderSelector.yielding = true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,16 +17,14 @@
package org.springframework.integration.support.leader;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.task.SyncTaskExecutor;
Expand All @@ -37,7 +35,6 @@
import org.springframework.integration.leader.event.LeaderEventPublisher;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.test.util.TestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -69,7 +66,7 @@ public class LockRegistryLeaderInitiatorTests {
private final LockRegistryLeaderInitiator initiator =
new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate());

@Before
@BeforeEach
public void init() {
this.initiator.setLeaderEventPublisher(new CountingPublisher(this.granted, this.revoked));
}
Expand Down Expand Up @@ -297,29 +294,6 @@ public void testExceptionFromLock() throws Exception {
another.stop();
}

@Test
public void shouldShutdownInternalExecutorService() {
this.initiator.start();
this.initiator.destroy();

ExecutorService executorService =
TestUtils.getPropertyValue(this.initiator, "executorService", ExecutorService.class);

assertThat(executorService.isShutdown()).isTrue();
}

@Test
public void doNotShutdownProvidedExecutorService() {
LockRegistryLeaderInitiator another = new LockRegistryLeaderInitiator(this.registry);
ExecutorService executorService = Executors.newSingleThreadExecutor();
another.setExecutorService(executorService);

another.start();
another.destroy();

assertThat(executorService.isShutdown()).isFalse();
}

private static class CountingPublisher implements LeaderEventPublisher {

private final CountDownLatch granted;
Expand Down
Loading