Skip to content

Commit 5a63210

Browse files
shyikoashleycoxley
authored andcommitted
Revert "Atomic processing of create/remove of keepalive thread"
This reverts commit 4680cb8 (caused shyiko#321).
1 parent c19b1ad commit 5a63210

File tree

1 file changed

+42
-58
lines changed

1 file changed

+42
-58
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 42 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public X509Certificate[] getAcceptedIssuers() {
167167
private volatile ExecutorService keepAliveThreadExecutor;
168168

169169
private final Lock connectLock = new ReentrantLock();
170-
private final Lock keepAliveThreadExecutorLock = new ReentrantLock();
171170

172171
/**
173172
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
@@ -768,51 +767,46 @@ public Thread newThread(Runnable runnable) {
768767
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
769768
}
770769
});
771-
try {
772-
keepAliveThreadExecutorLock.lock();
773-
threadExecutor.submit(new Runnable() {
774-
@Override
775-
public void run() {
776-
while (!threadExecutor.isShutdown()) {
770+
threadExecutor.submit(new Runnable() {
771+
@Override
772+
public void run() {
773+
while (!threadExecutor.isShutdown()) {
774+
try {
775+
Thread.sleep(keepAliveInterval);
776+
} catch (InterruptedException e) {
777+
// expected in case of disconnect
778+
}
779+
if (threadExecutor.isShutdown()) {
780+
return;
781+
}
782+
boolean connectionLost = false;
783+
if (heartbeatInterval > 0) {
784+
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
785+
} else {
777786
try {
778-
Thread.sleep(keepAliveInterval);
779-
} catch (InterruptedException e) {
780-
// expected in case of disconnect
787+
channel.write(new PingCommand());
788+
} catch (IOException e) {
789+
connectionLost = true;
781790
}
782-
if (threadExecutor.isShutdown()) {
783-
return;
784-
}
785-
boolean connectionLost = false;
786-
if (heartbeatInterval > 0) {
787-
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
788-
} else {
789-
try {
790-
channel.write(new PingCommand());
791-
} catch (IOException e) {
792-
connectionLost = true;
793-
}
791+
}
792+
if (connectionLost) {
793+
if (logger.isLoggable(Level.INFO)) {
794+
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
794795
}
795-
if (connectionLost) {
796-
if (logger.isLoggable(Level.INFO)) {
797-
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
798-
}
799-
try {
800-
terminateConnect();
801-
connect(connectTimeout);
802-
} catch (Exception ce) {
803-
if (logger.isLoggable(Level.WARNING)) {
804-
logger.warning("Failed to restore connection to " + hostname + ":" + port +
805-
". Next attempt in " + keepAliveInterval + "ms");
806-
}
796+
try {
797+
terminateConnect();
798+
connect(connectTimeout);
799+
} catch (Exception ce) {
800+
if (logger.isLoggable(Level.WARNING)) {
801+
logger.warning("Failed to restore connection to " + hostname + ":" + port +
802+
". Next attempt in " + keepAliveInterval + "ms");
807803
}
808804
}
809805
}
810806
}
811-
});
812-
keepAliveThreadExecutor = threadExecutor;
813-
} finally {
814-
keepAliveThreadExecutorLock.unlock();
815-
}
807+
}
808+
});
809+
keepAliveThreadExecutor = threadExecutor;
816810
}
817811

818812
private Thread newNamedThread(Runnable runnable, String threadName) {
@@ -822,12 +816,7 @@ private Thread newNamedThread(Runnable runnable, String threadName) {
822816
}
823817

824818
boolean isKeepAliveThreadRunning() {
825-
try {
826-
keepAliveThreadExecutorLock.lock();
827-
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
828-
} finally {
829-
keepAliveThreadExecutorLock.unlock();
830-
}
819+
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
831820
}
832821

833822
/**
@@ -1178,19 +1167,14 @@ public void disconnect() throws IOException {
11781167
}
11791168

11801169
private void terminateKeepAliveThread() {
1181-
try {
1182-
keepAliveThreadExecutorLock.lock();
1183-
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1184-
if (keepAliveThreadExecutor == null) {
1185-
return;
1186-
}
1187-
keepAliveThreadExecutor.shutdownNow();
1188-
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1189-
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1190-
// ignore
1191-
}
1192-
} finally {
1193-
keepAliveThreadExecutorLock.unlock();
1170+
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1171+
if (keepAliveThreadExecutor == null) {
1172+
return;
1173+
}
1174+
keepAliveThreadExecutor.shutdownNow();
1175+
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1176+
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1177+
// ignore
11941178
}
11951179
}
11961180

0 commit comments

Comments
 (0)