diff --git a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java new file mode 100644 index 00000000..05112666 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java @@ -0,0 +1,469 @@ +/* + * Copyright 2013 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeader; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; +import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import com.github.shyiko.mysql.binlog.network.SocketFactory; +import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket; +import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket; +import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; +import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; +import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; + +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Abstract base class for MySQL replication stream client. + * + * @author Stanley Shyiko + */ +public abstract class AbstractBinaryLogClient implements BinaryLogClientMXBean { + protected final String hostname; + protected final int port; + protected final String schema; + protected final String username; + protected final String password; + private final Logger logger = Logger.getLogger(getClass().getName()); + private final Lock shutdownLock = new ReentrantLock(); + private long serverId = 65535; + private volatile String binlogFilename; + private volatile long binlogPosition; + private EventDeserializer eventDeserializer = new EventDeserializer(); + private SocketFactory socketFactory; + private PacketChannel channel; + private volatile boolean connected; + private boolean keepAlive = true; + private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1); + private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3); + private volatile ThreadPoolExecutor keepAliveThreadExecutor; + private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6); + + public AbstractBinaryLogClient(String username, int port, String hostname, String password, String schema) { + this.username = username; + this.port = port; + this.hostname = hostname; + this.password = password; + this.schema = schema; + } + + /** + * @return server id (65535 by default) + */ + public long getServerId() { + return serverId; + } + + /** + * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication + * group (that is, different from any other server id being used by any master or slave). Keep in mind that each + * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a + * simplified slave and thus MUST also use a different server id. + */ + public void setServerId(long serverId) { + this.serverId = serverId; + } + + /** + * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus + * is subject to change (in response to {@link com.github.shyiko.mysql.binlog.event.EventType#ROTATE}, for example). + */ + public String getBinlogFilename() { + return binlogFilename; + } + + /** + * @param binlogFilename binary log filename (null indicates automatic resolution). + */ + public void setBinlogFilename(String binlogFilename) { + this.binlogFilename = binlogFilename; + } + + /** + * @return binary log position of the next event. Note that this value changes with each incoming event. + */ + public long getBinlogPosition() { + return binlogPosition; + } + + /** + * @param binlogPosition binary log position + */ + public void setBinlogPosition(long binlogPosition) { + this.binlogPosition = binlogPosition; + } + + /** + * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default), + * false otherwise. + */ + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + + /** + * @param keepAliveInterval "keep alive" interval in milliseconds. + */ + public void setKeepAliveInterval(long keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + /** + * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds. + */ + public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) { + this.keepAliveConnectTimeout = keepAliveConnectTimeout; + } + + /** + * @param eventDeserializer custom event deserializer + */ + public void setEventDeserializer(EventDeserializer eventDeserializer) { + if (eventDeserializer == null) { + throw new IllegalArgumentException("Event deserializer cannot be NULL"); + } + this.eventDeserializer = eventDeserializer; + } + + /** + * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()". + */ + public void setSocketFactory(SocketFactory socketFactory) { + this.socketFactory = socketFactory; + } + + /** + * Connect to the replication stream. Note that this method blocks until disconnected. + * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication + * @throws java.io.IOException if anything goes wrong while trying to connect + */ + public void connect() throws IOException { + if (connected) { + throw new IllegalStateException("BinaryLogClient is already connected"); + } + try { + try { + Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); + socket.connect(new InetSocketAddress(hostname, port)); + channel = new PacketChannel(socket); + if (channel.getInputStream().peek() == -1) { + throw new EOFException(); + } + } catch (IOException e) { + throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + + ". Please make sure it's running.", e); + } + GreetingPacket greetingPacket = new GreetingPacket(channel.read()); + AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password, + greetingPacket.getScramble()); + authenticateCommand.setCollation(greetingPacket.getServerCollation()); + channel.write(authenticateCommand); + byte[] authenticationResult = channel.read(); + if (authenticationResult[0] != (byte) 0x00 /* ok */) { + if (authenticationResult[0] == (byte) 0xFF /* error */) { + byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length); + throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage()); + } + throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); + } + if (binlogFilename == null) { + fetchBinlogFilenameAndPosition(); + } + ChecksumType checksumType = fetchBinlogChecksum(); + if (checksumType != ChecksumType.NONE) { + confirmSupportOfChecksum(checksumType); + } + channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition)); + } catch (IOException e) { + if (channel != null && channel.isOpen()) { + channel.close(); + } + throw e; + } + connected = true; + if (logger.isLoggable(Level.INFO)) { + logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition); + } + onConnect(); + if (keepAlive && !isKeepAliveThreadRunning()) { + spawnKeepAliveThread(); + } + listenForEventPackets(); + } + + private void spawnKeepAliveThread() { + keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port); + keepAliveThreadExecutor.submit(new Runnable() { + @Override + public void run() { + while (true) { + try { + Thread.sleep(keepAliveInterval); + } catch (InterruptedException e) { + // expected in case of disconnect + } + shutdownLock.lock(); + try { + if (keepAliveThreadExecutor.isShutdown()) { + return; + } + try { + channel.write(new PingCommand()); + } catch (IOException e) { + if (logger.isLoggable(Level.INFO)) { + logger.info("Trying to restore lost connection to " + hostname + ":" + port); + } + try { + if (isConnected()) { + disconnectChannel(); + } + connect(keepAliveConnectTimeout); + } catch (Exception ce) { + if (logger.isLoggable(Level.WARNING)) { + logger.warning("Failed to restore connection to " + hostname + ":" + port + + ". Next attempt in " + keepAliveInterval + "ms"); + } + } + } + } finally { + shutdownLock.unlock(); + } + } + } + }); + } + + private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) { + return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, threadName); + thread.setDaemon(true); + return thread; + } + }); + } + + protected boolean isKeepAliveThreadRunning() { + return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); + } + + /** + * @return true if client is connected, false otherwise + */ + public boolean isConnected() { + return connected; + } + + private void fetchBinlogFilenameAndPosition() throws IOException { + ResultSetRowPacket[] resultSet; + channel.write(new QueryCommand("show master status")); + resultSet = readResultSet(); + if (resultSet.length == 0) { + throw new IOException("Failed to determine binlog filename/position"); + } + ResultSetRowPacket resultSetRow = resultSet[0]; + binlogFilename = resultSetRow.getValue(0); + binlogPosition = Long.parseLong(resultSetRow.getValue(1)); + } + + private ChecksumType fetchBinlogChecksum() throws IOException { + channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); + ResultSetRowPacket[] resultSet = readResultSet(); + if (resultSet.length == 0) { + return ChecksumType.NONE; + } + return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); + } + + private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { + channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); + byte[] statementResult = channel.read(); + if (statementResult[0] == (byte) 0xFF /* error */) { + byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); + throw new IOException(new ErrorPacket(bytes).getErrorMessage()); + } + eventDeserializer.setChecksumType(checksumType); + } + + private void listenForEventPackets() throws IOException { + ByteArrayInputStream inputStream = channel.getInputStream(); + try { + while (inputStream.peek() != -1) { + int packetLength = inputStream.readInteger(3); + inputStream.skip(1); // 1 byte for sequence + int marker = inputStream.read(); + if (marker == 0xFF) { + ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); + throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage()); + } + Event event; + try { + event = eventDeserializer.nextEvent(inputStream); + } catch (Exception e) { + if (isConnected()) { + onEventDeserializationFailure(e); + } + continue; + } + if (isConnected()) { + notifyEventListener(event); + updateClientBinlogFilenameAndPosition(event); + } + } + } catch (Exception e) { + if (isConnected()) { + onCommunicationFailure(e); + } + } finally { + if (isConnected()) { + disconnectChannel(); + } + } + } + + private void updateClientBinlogFilenameAndPosition(Event event) { + EventHeader eventHeader = event.getHeader(); + if (eventHeader.getEventType() == EventType.ROTATE) { + RotateEventData eventData = event.getData(); + if (eventData != null) { + binlogFilename = eventData.getBinlogFilename(); + binlogPosition = eventData.getBinlogPosition(); + } + } else + if (eventHeader instanceof EventHeaderV4) { + EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; + long nextBinlogPosition = trackableEventHeader.getNextPosition(); + if (nextBinlogPosition > 0) { + binlogPosition = nextBinlogPosition; + } + } + } + + private ResultSetRowPacket[] readResultSet() throws IOException { + List resultSet = new LinkedList(); + while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } + for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { + resultSet.add(new ResultSetRowPacket(bytes)); + } + return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); + } + + private void notifyEventListener(Event event) { + onEvent(event); + } + + /** + * Disconnect from the replication stream. + * Note that this does not cause binlogFilename/binlogPosition to be cleared out. + * As the result following {@link #connect()} resumes client from where it left off. + */ + public void disconnect() throws IOException { + shutdownLock.lock(); + try { + if (isKeepAliveThreadRunning()) { + keepAliveThreadExecutor.shutdownNow(); + } + disconnectChannel(); + } finally { + shutdownLock.unlock(); + } + if (isKeepAliveThreadRunning()) { + waitForKeepAliveThreadToBeTerminated(); + } + } + + private void waitForKeepAliveThreadToBeTerminated() { + boolean terminated = false; + try { + terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout, + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, e.getMessage()); + } + } + if (!terminated) { + throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " + + keepAliveThreadShutdownTimeout + "ms"); + } + } + + private void disconnectChannel() throws IOException { + try { + connected = false; + if (channel != null && channel.isOpen()) { + channel.close(); + } + } finally { + onDisconnect(); + } + } + + /** + * Invoked once for each {@link Event}, in the order they are processed. + */ + protected abstract void onEvent(Event event); + + /** + * Invoked when a connection is established. + */ + protected abstract void onConnect(); + + /** + * It's guarantied to be called before {@link #onDisconnect()}) in case of communication failure. + */ + protected abstract void onCommunicationFailure(Exception ex); + + /** + * Called in case of failed event deserialization. Note this type of error does NOT cause client to + * disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually. + */ + protected abstract void onEventDeserializationFailure(Exception ex); + + /** + * Called upon disconnect (regardless of the reason). + */ + protected abstract void onDisconnect(); + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 997b8d2a..660e69c2 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -16,43 +16,17 @@ package com.github.shyiko.mysql.binlog; import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventHeader; -import com.github.shyiko.mysql.binlog.event.EventHeaderV4; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.RotateEventData; -import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; -import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; -import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; -import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; -import com.github.shyiko.mysql.binlog.network.AuthenticationException; -import com.github.shyiko.mysql.binlog.network.SocketFactory; -import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket; -import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket; -import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; -import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; -import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; -import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; - -import java.io.EOFException; + import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,41 +35,13 @@ * * @author Stanley Shyiko */ -public class BinaryLogClient implements BinaryLogClientMXBean { +public class BinaryLogClient extends AbstractBinaryLogClient { private final Logger logger = Logger.getLogger(getClass().getName()); - - private final String hostname; - private final int port; - private final String schema; - private final String username; - private final String password; - - private long serverId = 65535; - private volatile String binlogFilename; - private volatile long binlogPosition; - - private EventDeserializer eventDeserializer = new EventDeserializer(); - private final List eventListeners = new LinkedList(); private final List lifecycleListeners = new LinkedList(); - - private SocketFactory socketFactory; - - private PacketChannel channel; - private volatile boolean connected; - private ThreadFactory threadFactory; - private boolean keepAlive = true; - private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1); - private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3); - - private volatile ThreadPoolExecutor keepAliveThreadExecutor; - private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6); - - private final Lock shutdownLock = new ReentrantLock(); - /** * Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password). * @see BinaryLogClient#BinaryLogClient(String, int, String, String, String) @@ -129,96 +75,7 @@ public BinaryLogClient(String hostname, int port, String username, String passwo * @param password password */ public BinaryLogClient(String hostname, int port, String schema, String username, String password) { - this.hostname = hostname; - this.port = port; - this.schema = schema; - this.username = username; - this.password = password; - } - - /** - * @return server id (65535 by default) - */ - public long getServerId() { - return serverId; - } - - /** - * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication - * group (that is, different from any other server id being used by any master or slave). Keep in mind that each - * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a - * simplified slave and thus MUST also use a different server id. - */ - public void setServerId(long serverId) { - this.serverId = serverId; - } - - /** - * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus - * is subject to change (in response to {@link EventType#ROTATE}, for example). - */ - public String getBinlogFilename() { - return binlogFilename; - } - - /** - * @param binlogFilename binary log filename (null indicates automatic resolution). - */ - public void setBinlogFilename(String binlogFilename) { - this.binlogFilename = binlogFilename; - } - - /** - * @return binary log position of the next event. Note that this value changes with each incoming event. - */ - public long getBinlogPosition() { - return binlogPosition; - } - - /** - * @param binlogPosition binary log position - */ - public void setBinlogPosition(long binlogPosition) { - this.binlogPosition = binlogPosition; - } - - /** - * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default), - * false otherwise. - */ - public void setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; - } - - /** - * @param keepAliveInterval "keep alive" interval in milliseconds. - */ - public void setKeepAliveInterval(long keepAliveInterval) { - this.keepAliveInterval = keepAliveInterval; - } - - /** - * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds. - */ - public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) { - this.keepAliveConnectTimeout = keepAliveConnectTimeout; - } - - /** - * @param eventDeserializer custom event deserializer - */ - public void setEventDeserializer(EventDeserializer eventDeserializer) { - if (eventDeserializer == null) { - throw new IllegalArgumentException("Event deserializer cannot be NULL"); - } - this.eventDeserializer = eventDeserializer; - } - - /** - * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()". - */ - public void setSocketFactory(SocketFactory socketFactory) { - this.socketFactory = socketFactory; + super(username, port, hostname, password, schema); } /** @@ -229,132 +86,10 @@ public void setThreadFactory(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } - /** - * Connect to the replication stream. Note that this method blocks until disconnected. - * @throws AuthenticationException in case of failed authentication - * @throws IOException if anything goes wrong while trying to connect - */ - public void connect() throws IOException { - if (connected) { - throw new IllegalStateException("BinaryLogClient is already connected"); - } - try { - try { - Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); - socket.connect(new InetSocketAddress(hostname, port)); - channel = new PacketChannel(socket); - if (channel.getInputStream().peek() == -1) { - throw new EOFException(); - } - } catch (IOException e) { - throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port + - ". Please make sure it's running.", e); - } - GreetingPacket greetingPacket = new GreetingPacket(channel.read()); - AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password, - greetingPacket.getScramble()); - authenticateCommand.setCollation(greetingPacket.getServerCollation()); - channel.write(authenticateCommand); - byte[] authenticationResult = channel.read(); - if (authenticationResult[0] != (byte) 0x00 /* ok */) { - if (authenticationResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length); - throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage()); - } - throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); - } - if (binlogFilename == null) { - fetchBinlogFilenameAndPosition(); - } - ChecksumType checksumType = fetchBinlogChecksum(); - if (checksumType != ChecksumType.NONE) { - confirmSupportOfChecksum(checksumType); - } - channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition)); - } catch (IOException e) { - if (channel != null && channel.isOpen()) { - channel.close(); - } - throw e; - } - connected = true; - if (logger.isLoggable(Level.INFO)) { - logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition); - } - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onConnect(this); - } - } - if (keepAlive && !isKeepAliveThreadRunning()) { - spawnKeepAliveThread(); - } - listenForEventPackets(); - } - - private void spawnKeepAliveThread() { - keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port); - keepAliveThreadExecutor.submit(new Runnable() { - @Override - public void run() { - while (true) { - try { - Thread.sleep(keepAliveInterval); - } catch (InterruptedException e) { - // expected in case of disconnect - } - shutdownLock.lock(); - try { - if (keepAliveThreadExecutor.isShutdown()) { - return; - } - try { - channel.write(new PingCommand()); - } catch (IOException e) { - if (logger.isLoggable(Level.INFO)) { - logger.info("Trying to restore lost connection to " + hostname + ":" + port); - } - try { - if (isConnected()) { - disconnectChannel(); - } - connect(keepAliveConnectTimeout); - } catch (Exception ce) { - if (logger.isLoggable(Level.WARNING)) { - logger.warning("Failed to restore connection to " + hostname + ":" + port + - ". Next attempt in " + keepAliveInterval + "ms"); - } - } - } - } finally { - shutdownLock.unlock(); - } - } - } - }); - } - - private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) { - return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), new ThreadFactory() { - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, threadName); - thread.setDaemon(true); - return thread; - } - }); - } - - protected boolean isKeepAliveThreadRunning() { - return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown(); - } - /** * Connect to the replication stream in a separate thread. * @param timeoutInMilliseconds timeout in milliseconds - * @throws AuthenticationException in case of failed authentication + * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication * @throws IOException if anything goes wrong while trying to connect * @throws TimeoutException if client wasn't able to connect in the requested period of time */ @@ -399,115 +134,6 @@ public void run() { } } - /** - * @return true if client is connected, false otherwise - */ - public boolean isConnected() { - return connected; - } - - private void fetchBinlogFilenameAndPosition() throws IOException { - ResultSetRowPacket[] resultSet; - channel.write(new QueryCommand("show master status")); - resultSet = readResultSet(); - if (resultSet.length == 0) { - throw new IOException("Failed to determine binlog filename/position"); - } - ResultSetRowPacket resultSetRow = resultSet[0]; - binlogFilename = resultSetRow.getValue(0); - binlogPosition = Long.parseLong(resultSetRow.getValue(1)); - } - - private ChecksumType fetchBinlogChecksum() throws IOException { - channel.write(new QueryCommand("show global variables like 'binlog_checksum'")); - ResultSetRowPacket[] resultSet = readResultSet(); - if (resultSet.length == 0) { - return ChecksumType.NONE; - } - return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase()); - } - - private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { - channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); - byte[] statementResult = channel.read(); - if (statementResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); - throw new IOException(new ErrorPacket(bytes).getErrorMessage()); - } - eventDeserializer.setChecksumType(checksumType); - } - - private void listenForEventPackets() throws IOException { - ByteArrayInputStream inputStream = channel.getInputStream(); - try { - while (inputStream.peek() != -1) { - int packetLength = inputStream.readInteger(3); - inputStream.skip(1); // 1 byte for sequence - int marker = inputStream.read(); - if (marker == 0xFF) { - ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1)); - throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage()); - } - Event event; - try { - event = eventDeserializer.nextEvent(inputStream); - } catch (Exception e) { - if (isConnected()) { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onEventDeserializationFailure(this, e); - } - } - } - continue; - } - if (isConnected()) { - notifyEventListeners(event); - updateClientBinlogFilenameAndPosition(event); - } - } - } catch (Exception e) { - if (isConnected()) { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onCommunicationFailure(this, e); - } - } - } - } finally { - if (isConnected()) { - disconnectChannel(); - } - } - } - - private void updateClientBinlogFilenameAndPosition(Event event) { - EventHeader eventHeader = event.getHeader(); - if (eventHeader.getEventType() == EventType.ROTATE) { - RotateEventData eventData = event.getData(); - if (eventData != null) { - binlogFilename = eventData.getBinlogFilename(); - binlogPosition = eventData.getBinlogPosition(); - } - } else - if (eventHeader instanceof EventHeaderV4) { - EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader; - long nextBinlogPosition = trackableEventHeader.getNextPosition(); - if (nextBinlogPosition > 0) { - binlogPosition = nextBinlogPosition; - } - } - } - - private ResultSetRowPacket[] readResultSet() throws IOException { - List resultSet = new LinkedList(); - while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } - for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { - resultSet.add(new ResultSetRowPacket(bytes)); - } - return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); - } - /** * @return registered event listeners */ @@ -549,7 +175,8 @@ public void unregisterEventListener(EventListener eventListener) { } } - private void notifyEventListeners(Event event) { + @Override + protected void onEvent(Event event) { synchronized (eventListeners) { for (EventListener eventListener : eventListeners) { try { @@ -604,53 +231,40 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList } } - /** - * Disconnect from the replication stream. - * Note that this does not cause binlogFilename/binlogPosition to be cleared out. - * As the result following {@link #connect()} resumes client from where it left off. - */ - public void disconnect() throws IOException { - shutdownLock.lock(); - try { - if (isKeepAliveThreadRunning()) { - keepAliveThreadExecutor.shutdownNow(); + @Override + protected void onConnect() { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onConnect(this); } - disconnectChannel(); - } finally { - shutdownLock.unlock(); - } - if (isKeepAliveThreadRunning()) { - waitForKeepAliveThreadToBeTerminated(); } } - private void waitForKeepAliveThreadToBeTerminated() { - boolean terminated = false; - try { - terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout, - TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, e.getMessage()); + @Override + protected void onCommunicationFailure(Exception ex) { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onCommunicationFailure(this, ex); } } - if (!terminated) { - throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " + - keepAliveThreadShutdownTimeout + "ms"); - } + } - private void disconnectChannel() throws IOException { - try { - connected = false; - if (channel != null && channel.isOpen()) { - channel.close(); + @Override + protected void onEventDeserializationFailure(Exception ex) { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onEventDeserializationFailure(this, ex); } - } finally { - synchronized (lifecycleListeners) { - for (LifecycleListener lifecycleListener : lifecycleListeners) { - lifecycleListener.onDisconnect(this); - } + } + + } + + @Override + protected void onDisconnect() { + synchronized (lifecycleListeners) { + for (LifecycleListener lifecycleListener : lifecycleListeners) { + lifecycleListener.onDisconnect(this); } } } @@ -696,15 +310,19 @@ public interface LifecycleListener { */ public static abstract class AbstractLifecycleListener implements LifecycleListener { + @Override public void onConnect(BinaryLogClient client) { } + @Override public void onCommunicationFailure(BinaryLogClient client, Exception ex) { } + @Override public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { } + @Override public void onDisconnect(BinaryLogClient client) { } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index 62e3510d..a495ec71 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -201,6 +201,8 @@ public void testDeserializationOfDifferentColumnTypes() throws Exception { new java.sql.Date(generateTime(1989, 3, 21, 0, 0, 0, 0))}); assertEquals(writeAndCaptureRow("datetime", "'1989-03-21 01:02:03.000000'"), new Serializable[]{ new java.util.Date(generateTime(1989, 3, 21, 1, 2, 3, 0))}); + // FIXME: '1989-03-18 01:02:03.000000' fails in the America/Los_Angeles timezone (Daylight Saving Time) + // '1989-05-18 01:02:03.000000' passes assertEquals(writeAndCaptureRow("timestamp", "'1989-03-18 01:02:03.000000'"), new Serializable[]{ new java.sql.Timestamp(generateTime(1989, 3, 18, 1, 2, 3, 0))}); assertEquals(writeAndCaptureRow("time", "'1:2:3.000000'"), new Serializable[]{ @@ -558,8 +560,10 @@ public void execute(Statement statement) throws SQLException { // fixing connection timezone to the "client's one" TimeZone currentTimeZone = TimeZone.getDefault(); int offset = currentTimeZone.getRawOffset() + currentTimeZone.getDSTSavings(); - String timeZoneAsAString = String.format("%s%02d:%02d", offset >= 0 ? "+" : "-", offset / 3600000, - (offset / 60000) % 60); + String timeZoneAsAString = String.format("%s%02d:%02d", + offset >= 0 ? "+" : "-", + Math.abs(offset / 3600000), + (offset / 60000) % 60); statement.execute("SET time_zone = '" + timeZoneAsAString + "'"); } });