From e59006e7b4566cdf5365bbef6ba212847ec4e253 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Thu, 4 Jun 2020 23:48:13 -0700 Subject: [PATCH 1/6] Revert "Atomic processing of create/remove of keepalive thread" This reverts commit 4680cb82 (caused https://github.com/shyiko/mysql-binlog-connector-java/issues/321). --- .../shyiko/mysql/binlog/BinaryLogClient.java | 100 ++++++++---------- 1 file changed, 42 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index c395aa7b..440c83d0 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -164,7 +164,6 @@ public X509Certificate[] getAcceptedIssuers() { private volatile ExecutorService keepAliveThreadExecutor; private final Lock connectLock = new ReentrantLock(); - private final Lock keepAliveThreadExecutorLock = new ReentrantLock(); /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -771,51 +770,46 @@ public Thread newThread(Runnable runnable) { return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port); } }); - try { - keepAliveThreadExecutorLock.lock(); - threadExecutor.submit(new Runnable() { - @Override - public void run() { - while (!threadExecutor.isShutdown()) { + threadExecutor.submit(new Runnable() { + @Override + public void run() { + while (!threadExecutor.isShutdown()) { + try { + Thread.sleep(keepAliveInterval); + } catch (InterruptedException e) { + // expected in case of disconnect + } + if (threadExecutor.isShutdown()) { + return; + } + boolean connectionLost = false; + if (heartbeatInterval > 0) { + connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval; + } else { try { - Thread.sleep(keepAliveInterval); - } catch (InterruptedException e) { - // expected in case of disconnect + channel.write(new PingCommand()); + } catch (IOException e) { + connectionLost = true; } - if (threadExecutor.isShutdown()) { - return; - } - boolean connectionLost = false; - if (heartbeatInterval > 0) { - connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval; - } else { - try { - channel.write(new PingCommand()); - } catch (IOException e) { - connectionLost = true; - } + } + if (connectionLost) { + if (logger.isLoggable(Level.INFO)) { + logger.info("Trying to restore lost connection to " + hostname + ":" + port); } - if (connectionLost) { - if (logger.isLoggable(Level.INFO)) { - logger.info("Trying to restore lost connection to " + hostname + ":" + port); - } - try { - terminateConnect(); - connect(connectTimeout); - } catch (Exception ce) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Failed to restore connection to " + hostname + ":" + port + - ". Next attempt in " + keepAliveInterval + "ms"); - } + try { + terminateConnect(); + connect(connectTimeout); + } catch (Exception ce) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Failed to restore connection to " + hostname + ":" + port + + ". Next attempt in " + keepAliveInterval + "ms"); } } } } - }); - keepAliveThreadExecutor = threadExecutor; - } finally { - keepAliveThreadExecutorLock.unlock(); - } + } + }); + keepAliveThreadExecutor = threadExecutor; } private Thread newNamedThread(Runnable runnable, String threadName) { @@ -825,12 +819,7 @@ private Thread newNamedThread(Runnable runnable, String threadName) { } boolean isKeepAliveThreadRunning() { - try { - keepAliveThreadExecutorLock.lock(); - return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); - } finally { - keepAliveThreadExecutorLock.unlock(); - } + return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); } /** @@ -1180,19 +1169,14 @@ public void disconnect() throws IOException { } private void terminateKeepAliveThread() { - try { - keepAliveThreadExecutorLock.lock(); - ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; - if (keepAliveThreadExecutor == null) { - return; - } - keepAliveThreadExecutor.shutdownNow(); - while (!awaitTerminationInterruptibly(keepAliveThreadExecutor, - Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - // ignore - } - } finally { - keepAliveThreadExecutorLock.unlock(); + ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; + if (keepAliveThreadExecutor == null) { + return; + } + keepAliveThreadExecutor.shutdownNow(); + while (!awaitTerminationInterruptibly(keepAliveThreadExecutor, + Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + // ignore } } From c54b4c18ca1c5299792a4be32fa0296a49259362 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Mon, 8 Jun 2020 03:17:37 -0700 Subject: [PATCH 2/6] Fixed potential deadlock when keepAlive is on (#321) --- .../shyiko/mysql/binlog/BinaryLogClient.java | 335 ++++++++++-------- 1 file changed, 195 insertions(+), 140 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 440c83d0..939b9b0c 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -63,7 +63,6 @@ import java.net.Socket; import java.net.SocketException; import java.security.GeneralSecurityException; -import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Collections; @@ -98,12 +97,10 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { new X509TrustManager() { @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) - throws CertificateException { } + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { } @Override - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) - throws CertificateException { } + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) { } @Override public X509Certificate[] getAcceptedIssuers() { @@ -164,6 +161,7 @@ public X509Certificate[] getAcceptedIssuers() { private volatile ExecutorService keepAliveThreadExecutor; private final Lock connectLock = new ReentrantLock(); + private volatile CountDownLatch connectLatch; /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). @@ -489,113 +487,143 @@ public void setThreadFactory(ThreadFactory threadFactory) { * @throws IOException if anything goes wrong while trying to connect */ public void connect() throws IOException { - if (!connectLock.tryLock()) { - throw new IllegalStateException("BinaryLogClient is already connected"); - } - boolean notifyWhenDisconnected = false; + connectWithTimeout(connectTimeout); + } + + private void connectWithTimeout(final long connectTimeout) throws IOException { + CountDownLatch latch = new CountDownLatch(1); + boolean connected = false; try { - Callable cancelDisconnect = null; + PacketChannel localChannel; + connectLock.lock(); try { - try { - long start = System.currentTimeMillis(); - channel = openChannel(); - if (connectTimeout > 0 && !isKeepAliveThreadRunning()) { - cancelDisconnect = scheduleDisconnectIn(connectTimeout - - (System.currentTimeMillis() - start)); - } - if (channel.getInputStream().peek() == -1) { - throw new EOFException(); - } - } catch (IOException e) { - throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + - ". Please make sure it's running.", e); + if (connectLatch != null) { + throw new IllegalStateException("BinaryLogClient is already connected"); } - GreetingPacket greetingPacket = receiveGreeting(); - authenticate(greetingPacket); - connectionId = greetingPacket.getThreadId(); - if ("".equals(binlogFilename)) { - synchronized (gtidSetAccessLock) { - if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { - gtidSet = new GtidSet(fetchGtidPurged()); - } - } + connectLatch = latch; + localChannel = openChannelToBinaryLogStream(connectTimeout); + channel = localChannel; + if (keepAlive && !isKeepAliveThreadRunning()) { + keepAliveThreadExecutor = spawnKeepAliveThread(connectTimeout); } - if (binlogFilename == null) { - fetchBinlogFilenameAndPosition(); + } finally { + connectLock.unlock(); + } + connected = true; + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onConnect(this); + } + ensureEventDeserializerHasRequiredEDDs(); + listenForEventPackets(localChannel); + } finally { + connectLock.lock(); + try { + latch.countDown(); + if (latch == connectLatch) { + connectLatch = null; } - if (binlogPosition < 4) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4); - } - binlogPosition = 4; + } finally { + connectLock.unlock(); + } + if (connected) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onDisconnect(this); } - ChecksumType checksumType = fetchBinlogChecksum(); - if (checksumType != ChecksumType.NONE) { - confirmSupportOfChecksum(checksumType); + } + } + } + + private PacketChannel openChannelToBinaryLogStream(final long connectTimeout) throws IOException { + PacketChannel channel = null; + Callable cancelCloseChannel = null; + try { + try { + long start = System.currentTimeMillis(); + channel = openChannel(connectTimeout); + if (connectTimeout > 0 && !isKeepAliveThreadRunning()) { + cancelCloseChannel = scheduleCloseChannel(channel, connectTimeout - + (System.currentTimeMillis() - start)); } - if (heartbeatInterval > 0) { - enableHeartbeat(); + if (channel.getInputStream().peek() == -1) { + throw new EOFException(); } - gtid = null; - tx = false; - requestBinaryLogStream(); } catch (IOException e) { - disconnectChannel(); - throw e; - } finally { - if (cancelDisconnect != null) { - try { - cancelDisconnect.call(); - } catch (Exception e) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("\"" + e.getMessage() + - "\" was thrown while canceling scheduled disconnect call"); - } - } - } + throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + + ". Please make sure it's running.", e); } - connected = true; - notifyWhenDisconnected = true; - if (logger.isLoggable(Level.INFO)) { - String position; + GreetingPacket greetingPacket = receiveGreeting(channel); + authenticate(channel, greetingPacket); + connectionId = greetingPacket.getThreadId(); + if ("".equals(binlogFilename)) { synchronized (gtidSetAccessLock) { - position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition; + if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) { + gtidSet = new GtidSet(fetchGtidPurged(channel)); + } } - logger.info("Connected to " + hostname + ":" + port + " at " + position + - " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")"); } - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onConnect(this); + if (binlogFilename == null) { + fetchBinlogFilenameAndPosition(channel); } - if (keepAlive && !isKeepAliveThreadRunning()) { - spawnKeepAliveThread(); - } - ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); - synchronized (gtidSetAccessLock) { - if (gtidSet != null) { - ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); - ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class); + if (binlogPosition < 4) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4); } + binlogPosition = 4; + } + ChecksumType checksumType = fetchBinlogChecksum(channel); + if (checksumType != ChecksumType.NONE) { + confirmSupportOfChecksum(channel, checksumType); + } + if (heartbeatInterval > 0) { + enableHeartbeat(channel); } - listenForEventPackets(); + gtid = null; + tx = false; + requestBinaryLogStream(channel); + } catch (IOException e) { + closeChannel(channel); + throw e; } finally { - connectLock.unlock(); - if (notifyWhenDisconnected) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onDisconnect(this); + if (cancelCloseChannel != null) { + try { + cancelCloseChannel.call(); + } catch (Exception e) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("\"" + e.getMessage() + + "\" was thrown while canceling scheduled disconnect call"); + } } } } + connected = true; + if (logger.isLoggable(Level.INFO)) { + String position; + synchronized (gtidSetAccessLock) { + position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition; + } + logger.info("Connected to " + hostname + ":" + port + " at " + position + + " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")"); + } + return channel; } - private PacketChannel openChannel() throws IOException { + private void ensureEventDeserializerHasRequiredEDDs() { + ensureEventDataDeserializerIfPresent(EventType.ROTATE, RotateEventDataDeserializer.class); + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + ensureEventDataDeserializerIfPresent(EventType.GTID, GtidEventDataDeserializer.class); + ensureEventDataDeserializerIfPresent(EventType.QUERY, QueryEventDataDeserializer.class); + } + } + } + + private PacketChannel openChannel(final long connectTimeout) throws IOException { Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); socket.connect(new InetSocketAddress(hostname, port), (int) connectTimeout); return new PacketChannel(socket); } - private Callable scheduleDisconnectIn(final long timeout) { - final BinaryLogClient self = this; + private Callable scheduleCloseChannel(final PacketChannel channel, final long timeout) { final CountDownLatch connectLatch = new CountDownLatch(1); final Thread thread = newNamedThread(new Runnable() { @Override @@ -613,7 +641,7 @@ public void run() { "Forcing disconnect."); } try { - self.disconnectChannel(); + closeChannel(channel); } catch (IOException e) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, e.getMessage()); @@ -623,9 +651,9 @@ public void run() { } }, "blc-disconnect-" + hostname + ":" + port); thread.start(); - return new Callable() { + return new Callable() { - public Object call() throws Exception { + public Void call() throws Exception { connectLatch.countDown(); thread.join(); return null; @@ -633,7 +661,7 @@ public Object call() throws Exception { }; } - private GreetingPacket receiveGreeting() throws IOException { + private GreetingPacket receiveGreeting(final PacketChannel channel) throws IOException { byte[] initialHandshakePacket = channel.read(); if (initialHandshakePacket[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length); @@ -644,7 +672,7 @@ private GreetingPacket receiveGreeting() throws IOException { return new GreetingPacket(initialHandshakePacket); } - private void enableHeartbeat() throws IOException { + private void enableHeartbeat(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); if (statementResult[0] == (byte) 0xFF /* error */) { @@ -655,7 +683,7 @@ private void enableHeartbeat() throws IOException { } } - private void requestBinaryLogStream() throws IOException { + private void requestBinaryLogStream(final PacketChannel channel) throws IOException { long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178 Command dumpBinaryLogCommand; synchronized (gtidSetAccessLock) { @@ -671,12 +699,12 @@ private void requestBinaryLogStream() throws IOException { channel.write(dumpBinaryLogCommand); } - private void ensureEventDataDeserializer(EventType eventType, - Class eventDataDeserializerClass) { - EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); + private void ensureEventDataDeserializerIfPresent(EventType eventType, + Class> eventDataDeserializerClass) { + EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); if (eventDataDeserializer.getClass() != eventDataDeserializerClass && eventDataDeserializer.getClass() != EventDataWrapper.Deserializer.class) { - EventDataDeserializer internalEventDataDeserializer; + EventDataDeserializer internalEventDataDeserializer; try { internalEventDataDeserializer = eventDataDeserializerClass.newInstance(); } catch (Exception e) { @@ -688,7 +716,7 @@ private void ensureEventDataDeserializer(EventType eventType, } } - private void authenticate(GreetingPacket greetingPacket) throws IOException { + private void authenticate(final PacketChannel channel, GreetingPacket greetingPacket) throws IOException { int collation = greetingPacket.getServerCollation(); int packetNumber = 1; @@ -726,20 +754,22 @@ private void authenticate(GreetingPacket greetingPacket) throws IOException { throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } else if (authenticationResult[0] == (byte) 0xFE) { - switchAuthentication(authenticationResult, usingSSLSocket); + switchAuthentication(channel, authenticationResult, usingSSLSocket); } else { throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); } } } - private void switchAuthentication(byte[] authenticationResult, boolean usingSSLSocket) throws IOException { + private void switchAuthentication(final PacketChannel channel, byte[] authenticationResult, boolean usingSSLSocket) + throws IOException { /* Azure-MySQL likes to tell us to switch authentication methods, even though we haven't advertised that we support any. It uses this for some-odd reason to send the real password scramble. */ ByteArrayInputStream buffer = new ByteArrayInputStream(authenticationResult); + //noinspection ResultOfMethodCallIgnored buffer.read(1); String authName = buffer.readZeroTerminatedString(); @@ -761,7 +791,7 @@ private void switchAuthentication(byte[] authenticationResult, boolean usingSSLS } } - private void spawnKeepAliveThread() { + private ExecutorService spawnKeepAliveThread(final long connectTimeout) { final ExecutorService threadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { @@ -773,8 +803,11 @@ public Thread newThread(Runnable runnable) { threadExecutor.submit(new Runnable() { @Override public void run() { + connectLock.lock(); // wait for connect() to finish initialization sequence + connectLock.unlock(); while (!threadExecutor.isShutdown()) { try { + //noinspection BusyWait Thread.sleep(keepAliveInterval); } catch (InterruptedException e) { // expected in case of disconnect @@ -809,7 +842,7 @@ public void run() { } } }); - keepAliveThreadExecutor = threadExecutor; + return threadExecutor; } private Thread newNamedThread(Runnable runnable, String threadName) { @@ -845,8 +878,7 @@ public void onConnect(BinaryLogClient client) { @Override public void run() { try { - setConnectTimeout(timeout); - connect(); + connectWithTimeout(timeout); } catch (IOException e) { exceptionReference.set(e); countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout" @@ -868,10 +900,16 @@ public void run() { } if (!started) { try { + // NOTE: we don't call disconnect here and so if client is able to connect right after timeout expires - + // keep-alive thread may be left running. terminateConnect(); - } finally { - throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms"); + } catch (IOException e) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("\"" + e.getMessage() + + "\" was thrown while terminating connection due to timeout"); + } } + throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms"); } } @@ -879,22 +917,21 @@ public void run() { * @return true if client is connected, false otherwise */ public boolean isConnected() { - return connected; + return connectLatch != null; } - private String fetchGtidPurged() throws IOException { + private String fetchGtidPurged(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show global variables like 'gtid_purged'")); - ResultSetRowPacket[] resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length != 0) { return resultSet[0].getValue(1).toUpperCase(); } return ""; } - private void fetchBinlogFilenameAndPosition() throws IOException { - ResultSetRowPacket[] resultSet; + private void fetchBinlogFilenameAndPosition(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show master status")); - resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length == 0) { throw new IOException("Failed to determine binlog filename/position"); } @@ -903,16 +940,16 @@ private void fetchBinlogFilenameAndPosition() throws IOException { binlogPosition = Long.parseLong(resultSetRow.getValue(1)); } - private ChecksumType fetchBinlogChecksum() throws IOException { + private ChecksumType fetchBinlogChecksum(final PacketChannel channel) throws IOException { channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); - ResultSetRowPacket[] resultSet = readResultSet(); + ResultSetRowPacket[] resultSet = readResultSet(channel); if (resultSet.length == 0) { return ChecksumType.NONE; } return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); } - private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { + private void confirmSupportOfChecksum(final PacketChannel channel, ChecksumType checksumType) throws IOException { channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); byte[] statementResult = channel.read(); if (statementResult[0] == (byte) 0xFF /* error */) { @@ -924,12 +961,13 @@ private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOExcept eventDeserializer.setChecksumType(checksumType); } - private void listenForEventPackets() throws IOException { + private void listenForEventPackets(final PacketChannel channel) throws IOException { ByteArrayInputStream inputStream = channel.getInputStream(); boolean completeShutdown = false; try { while (inputStream.peek() != -1) { int packetLength = inputStream.readInteger(3); + //noinspection ResultOfMethodCallIgnored inputStream.skip(1); // 1 byte for sequence int marker = inputStream.read(); if (marker == 0xFF) { @@ -954,14 +992,14 @@ private void listenForEventPackets() throws IOException { if (cause instanceof EOFException || cause instanceof SocketException) { throw e; } - if (isConnected()) { + if (connected) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onEventDeserializationFailure(this, e); } } continue; } - if (isConnected()) { + if (connected) { eventLastSeen = System.currentTimeMillis(); updateGtidSet(event); notifyEventListeners(event); @@ -969,17 +1007,17 @@ private void listenForEventPackets() throws IOException { } } } catch (Exception e) { - if (isConnected()) { + if (connected) { for (LifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onCommunicationFailure(this, e); } } } finally { - if (isConnected()) { + if (connected) { if (completeShutdown) { disconnect(); // initiate complete shutdown sequence (which includes keep alive thread) } else { - disconnectChannel(); + closeChannel(channel); } } } @@ -990,6 +1028,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac int chunkLength; do { chunkLength = inputStream.readInteger(3); + //noinspection ResultOfMethodCallIgnored inputStream.skip(1); // 1 byte for sequence result = Arrays.copyOf(result, result.length + chunkLength); inputStream.fill(result, result.length - chunkLength, chunkLength); @@ -1061,7 +1100,7 @@ private void commitGtid() { } } - private ResultSetRowPacket[] readResultSet() throws IOException { + private ResultSetRowPacket[] readResultSet(final PacketChannel channel) throws IOException { List resultSet = new LinkedList(); byte[] statementResult = channel.read(); if (statementResult[0] == (byte) 0xFF /* error */) { @@ -1074,7 +1113,7 @@ private ResultSetRowPacket[] readResultSet() throws IOException { for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { resultSet.add(new ResultSetRowPacket(bytes)); } - return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); + return resultSet.toArray(new ResultSetRowPacket[0]); } /** @@ -1160,27 +1199,32 @@ public void unregisterLifecycleListener(LifecycleListener eventListener) { /** * Disconnect from the replication stream. - * Note that this does not cause binlogFilename/binlogPosition to be cleared out. - * As the result following {@link #connect()} resumes client from where it left off. + * Note that this does not reset binlogFilename/binlogPosition. Calling {@link #connect()} or + * {@link #connect(long)}} again resumes client from where it left off. */ public void disconnect() throws IOException { - terminateKeepAliveThread(); - terminateConnect(); + connectLock.lock(); + ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; + PacketChannel channel = this.channel; + CountDownLatch connectLatch = this.connectLatch; + connectLock.unlock(); + + terminateKeepAliveThread(keepAliveThreadExecutor); + closeChannel(channel); + waitForConnectToTerminate(connectLatch); } - private void terminateKeepAliveThread() { - ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor; - if (keepAliveThreadExecutor == null) { + private void terminateKeepAliveThread(final ExecutorService threadExecutor) { + if (threadExecutor == null) { return; } - keepAliveThreadExecutor.shutdownNow(); - while (!awaitTerminationInterruptibly(keepAliveThreadExecutor, - Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - // ignore - } + threadExecutor.shutdownNow(); + while (!awaitTerminationInterruptibly(threadExecutor, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { /* retry */ } } - private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) { + @SuppressWarnings("SameParameterValue") + private static boolean awaitTerminationInterruptibly(final ExecutorService executorService, + final long timeout, final TimeUnit unit) { try { return executorService.awaitTermination(timeout, unit); } catch (InterruptedException e) { @@ -1189,21 +1233,32 @@ private static boolean awaitTerminationInterruptibly(ExecutorService executorSer } private void terminateConnect() throws IOException { - do { - disconnectChannel(); - } while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS)); + connectLock.lock(); + PacketChannel channel = this.channel; + CountDownLatch connectLatch = this.connectLatch; connectLock.unlock(); + + closeChannel(channel); + waitForConnectToTerminate(connectLatch); + } + + private void waitForConnectToTerminate(final CountDownLatch connectLatch) { + if (connectLatch != null) { + while (!awaitInterruptibly(connectLatch, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { /* retry */ } + } } - private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit) { + @SuppressWarnings("SameParameterValue") + private static boolean awaitInterruptibly(final CountDownLatch countDownLatch, + final long time, final TimeUnit unit) { try { - return lock.tryLock(time, unit); + return countDownLatch.await(time, unit); } catch (InterruptedException e) { return false; } } - private void disconnectChannel() throws IOException { + private void closeChannel(final PacketChannel channel) throws IOException { connected = false; if (channel != null && channel.isOpen()) { channel.close(); From 5dde1e66159c919933b6b5cbf8c377a12ea88cd8 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Mon, 8 Jun 2020 03:17:58 -0700 Subject: [PATCH 3/6] Fix testMySQL8TableMetadata --- .../shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 71dfb524..9896e504 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -994,6 +994,7 @@ public void execute(Statement statement) throws SQLException { @Test public void testMySQL8TableMetadata() throws Exception { + master.execute("drop table if exists test_metameta"); master.execute("create table test_metameta ( " + "a date, b date, c date, d date, e date, f date, g date, " + "h date, i date, j int)"); From 2ef7a92636298787a017c1060d5ee53be82b1a1a Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Mon, 8 Jun 2020 03:24:55 -0700 Subject: [PATCH 4/6] Add 0.21.0 release notes --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd3c07ac..b397c516 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [0.21.0](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.20.1...0.21.0) - 2020-06-08 + +### Fixed +- Potential deadlock when keepAlive is on ([#321](https://github.com/shyiko/mysql-binlog-connector-java/issues/321)). + +### Changed +- `BinaryLogClient.LifecycleListener::onConnect()` order relative to keepAlive thread `start()`. +Calling `disconnect()` inside `onConnect()` is now guaranteed to terminate keepAlive thread ([#213](https://github.com/shyiko/mysql-binlog-connector-java/pull/213), +[260](https://github.com/shyiko/mysql-binlog-connector-java/pull/260)). +A side effect of this change is that throwing RuntimeException inside `onConnect()` will no longer prevent keepAlive thread from starting. + ## [0.20.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.20.0...0.20.1) - 2019-05-12 ### Added From 01b3e8ecc43a397c8395ca4f9b439ca0d5a0656a Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Mon, 8 Jun 2020 03:26:28 -0700 Subject: [PATCH 5/6] Bump version to 0.21.0 in the README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 595feb03..351880fb 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Get the latest JAR(s) from [here](http://search.maven.org/#search%7Cga%7C1%7Cg%3 com.github.shyiko mysql-binlog-connector-java - 0.18.1 + 0.21.0 ``` From dd710a5466381faa57442977b24fceff56a0820e Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Mon, 8 Jun 2020 03:27:48 -0700 Subject: [PATCH 6/6] Add shyiko/mysql-binlog-connector-java deprecation notice --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 351880fb..93ec1a73 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # mysql-binlog-connector-java [![Build Status](https://travis-ci.org/shyiko/mysql-binlog-connector-java.svg?branch=master)](https://travis-ci.org/shyiko/mysql-binlog-connector-java) [![Coverage Status](https://coveralls.io/repos/shyiko/mysql-binlog-connector-java/badge.svg?branch=master)](https://coveralls.io/r/shyiko/mysql-binlog-connector-java?branch=master) [![Maven Central](https://img.shields.io/maven-central/v/com.github.shyiko/mysql-binlog-connector-java.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.github.shyiko%22%20AND%20a%3A%22mysql-binlog-connector-java%22) +## ATTENTION: This repository is no longer maintained. I recommend migrating to [osheroff/mysql-binlog-connector-java](https://github.com/osheroff/mysql-binlog-connector-java). + MySQL Binary Log connector. Initially project was started as a fork of [open-replicator](https://code.google.com/p/open-replicator),