diff --git a/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java
new file mode 100644
index 00000000..05112666
--- /dev/null
+++ b/src/main/java/com/github/shyiko/mysql/binlog/AbstractBinaryLogClient.java
@@ -0,0 +1,469 @@
+/*
+ * Copyright 2013 Stanley Shyiko
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.shyiko.mysql.binlog;
+
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeader;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
+import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
+import com.github.shyiko.mysql.binlog.network.AuthenticationException;
+import com.github.shyiko.mysql.binlog.network.SocketFactory;
+import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
+import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
+import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
+import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
+import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
+import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Abstract base class for MySQL replication stream client.
+ *
+ * @author Stanley Shyiko
+ */
+public abstract class AbstractBinaryLogClient implements BinaryLogClientMXBean {
+ protected final String hostname;
+ protected final int port;
+ protected final String schema;
+ protected final String username;
+ protected final String password;
+ private final Logger logger = Logger.getLogger(getClass().getName());
+ private final Lock shutdownLock = new ReentrantLock();
+ private long serverId = 65535;
+ private volatile String binlogFilename;
+ private volatile long binlogPosition;
+ private EventDeserializer eventDeserializer = new EventDeserializer();
+ private SocketFactory socketFactory;
+ private PacketChannel channel;
+ private volatile boolean connected;
+ private boolean keepAlive = true;
+ private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1);
+ private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3);
+ private volatile ThreadPoolExecutor keepAliveThreadExecutor;
+ private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);
+
+ public AbstractBinaryLogClient(String username, int port, String hostname, String password, String schema) {
+ this.username = username;
+ this.port = port;
+ this.hostname = hostname;
+ this.password = password;
+ this.schema = schema;
+ }
+
+ /**
+ * @return server id (65535 by default)
+ */
+ public long getServerId() {
+ return serverId;
+ }
+
+ /**
+ * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication
+ * group (that is, different from any other server id being used by any master or slave). Keep in mind that each
+ * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a
+ * simplified slave and thus MUST also use a different server id.
+ */
+ public void setServerId(long serverId) {
+ this.serverId = serverId;
+ }
+
+ /**
+ * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus
+ * is subject to change (in response to {@link com.github.shyiko.mysql.binlog.event.EventType#ROTATE}, for example).
+ */
+ public String getBinlogFilename() {
+ return binlogFilename;
+ }
+
+ /**
+ * @param binlogFilename binary log filename (null indicates automatic resolution).
+ */
+ public void setBinlogFilename(String binlogFilename) {
+ this.binlogFilename = binlogFilename;
+ }
+
+ /**
+ * @return binary log position of the next event. Note that this value changes with each incoming event.
+ */
+ public long getBinlogPosition() {
+ return binlogPosition;
+ }
+
+ /**
+ * @param binlogPosition binary log position
+ */
+ public void setBinlogPosition(long binlogPosition) {
+ this.binlogPosition = binlogPosition;
+ }
+
+ /**
+ * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default),
+ * false otherwise.
+ */
+ public void setKeepAlive(boolean keepAlive) {
+ this.keepAlive = keepAlive;
+ }
+
+ /**
+ * @param keepAliveInterval "keep alive" interval in milliseconds.
+ */
+ public void setKeepAliveInterval(long keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ /**
+ * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds.
+ */
+ public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) {
+ this.keepAliveConnectTimeout = keepAliveConnectTimeout;
+ }
+
+ /**
+ * @param eventDeserializer custom event deserializer
+ */
+ public void setEventDeserializer(EventDeserializer eventDeserializer) {
+ if (eventDeserializer == null) {
+ throw new IllegalArgumentException("Event deserializer cannot be NULL");
+ }
+ this.eventDeserializer = eventDeserializer;
+ }
+
+ /**
+ * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()".
+ */
+ public void setSocketFactory(SocketFactory socketFactory) {
+ this.socketFactory = socketFactory;
+ }
+
+ /**
+ * Connect to the replication stream. Note that this method blocks until disconnected.
+ * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication
+ * @throws java.io.IOException if anything goes wrong while trying to connect
+ */
+ public void connect() throws IOException {
+ if (connected) {
+ throw new IllegalStateException("BinaryLogClient is already connected");
+ }
+ try {
+ try {
+ Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
+ socket.connect(new InetSocketAddress(hostname, port));
+ channel = new PacketChannel(socket);
+ if (channel.getInputStream().peek() == -1) {
+ throw new EOFException();
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
+ ". Please make sure it's running.", e);
+ }
+ GreetingPacket greetingPacket = new GreetingPacket(channel.read());
+ AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password,
+ greetingPacket.getScramble());
+ authenticateCommand.setCollation(greetingPacket.getServerCollation());
+ channel.write(authenticateCommand);
+ byte[] authenticationResult = channel.read();
+ if (authenticationResult[0] != (byte) 0x00 /* ok */) {
+ if (authenticationResult[0] == (byte) 0xFF /* error */) {
+ byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);
+ throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage());
+ }
+ throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");
+ }
+ if (binlogFilename == null) {
+ fetchBinlogFilenameAndPosition();
+ }
+ ChecksumType checksumType = fetchBinlogChecksum();
+ if (checksumType != ChecksumType.NONE) {
+ confirmSupportOfChecksum(checksumType);
+ }
+ channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition));
+ } catch (IOException e) {
+ if (channel != null && channel.isOpen()) {
+ channel.close();
+ }
+ throw e;
+ }
+ connected = true;
+ if (logger.isLoggable(Level.INFO)) {
+ logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition);
+ }
+ onConnect();
+ if (keepAlive && !isKeepAliveThreadRunning()) {
+ spawnKeepAliveThread();
+ }
+ listenForEventPackets();
+ }
+
+ private void spawnKeepAliveThread() {
+ keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port);
+ keepAliveThreadExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(keepAliveInterval);
+ } catch (InterruptedException e) {
+ // expected in case of disconnect
+ }
+ shutdownLock.lock();
+ try {
+ if (keepAliveThreadExecutor.isShutdown()) {
+ return;
+ }
+ try {
+ channel.write(new PingCommand());
+ } catch (IOException e) {
+ if (logger.isLoggable(Level.INFO)) {
+ logger.info("Trying to restore lost connection to " + hostname + ":" + port);
+ }
+ try {
+ if (isConnected()) {
+ disconnectChannel();
+ }
+ connect(keepAliveConnectTimeout);
+ } catch (Exception ce) {
+ if (logger.isLoggable(Level.WARNING)) {
+ logger.warning("Failed to restore connection to " + hostname + ":" + port +
+ ". Next attempt in " + keepAliveInterval + "ms");
+ }
+ }
+ }
+ } finally {
+ shutdownLock.unlock();
+ }
+ }
+ }
+ });
+ }
+
+ private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) {
+ return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue(), new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, threadName);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+
+ protected boolean isKeepAliveThreadRunning() {
+ return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
+ }
+
+ /**
+ * @return true if client is connected, false otherwise
+ */
+ public boolean isConnected() {
+ return connected;
+ }
+
+ private void fetchBinlogFilenameAndPosition() throws IOException {
+ ResultSetRowPacket[] resultSet;
+ channel.write(new QueryCommand("show master status"));
+ resultSet = readResultSet();
+ if (resultSet.length == 0) {
+ throw new IOException("Failed to determine binlog filename/position");
+ }
+ ResultSetRowPacket resultSetRow = resultSet[0];
+ binlogFilename = resultSetRow.getValue(0);
+ binlogPosition = Long.parseLong(resultSetRow.getValue(1));
+ }
+
+ private ChecksumType fetchBinlogChecksum() throws IOException {
+ channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
+ ResultSetRowPacket[] resultSet = readResultSet();
+ if (resultSet.length == 0) {
+ return ChecksumType.NONE;
+ }
+ return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());
+ }
+
+ private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException {
+ channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum"));
+ byte[] statementResult = channel.read();
+ if (statementResult[0] == (byte) 0xFF /* error */) {
+ byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
+ throw new IOException(new ErrorPacket(bytes).getErrorMessage());
+ }
+ eventDeserializer.setChecksumType(checksumType);
+ }
+
+ private void listenForEventPackets() throws IOException {
+ ByteArrayInputStream inputStream = channel.getInputStream();
+ try {
+ while (inputStream.peek() != -1) {
+ int packetLength = inputStream.readInteger(3);
+ inputStream.skip(1); // 1 byte for sequence
+ int marker = inputStream.read();
+ if (marker == 0xFF) {
+ ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
+ throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage());
+ }
+ Event event;
+ try {
+ event = eventDeserializer.nextEvent(inputStream);
+ } catch (Exception e) {
+ if (isConnected()) {
+ onEventDeserializationFailure(e);
+ }
+ continue;
+ }
+ if (isConnected()) {
+ notifyEventListener(event);
+ updateClientBinlogFilenameAndPosition(event);
+ }
+ }
+ } catch (Exception e) {
+ if (isConnected()) {
+ onCommunicationFailure(e);
+ }
+ } finally {
+ if (isConnected()) {
+ disconnectChannel();
+ }
+ }
+ }
+
+ private void updateClientBinlogFilenameAndPosition(Event event) {
+ EventHeader eventHeader = event.getHeader();
+ if (eventHeader.getEventType() == EventType.ROTATE) {
+ RotateEventData eventData = event.getData();
+ if (eventData != null) {
+ binlogFilename = eventData.getBinlogFilename();
+ binlogPosition = eventData.getBinlogPosition();
+ }
+ } else
+ if (eventHeader instanceof EventHeaderV4) {
+ EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
+ long nextBinlogPosition = trackableEventHeader.getNextPosition();
+ if (nextBinlogPosition > 0) {
+ binlogPosition = nextBinlogPosition;
+ }
+ }
+ }
+
+ private ResultSetRowPacket[] readResultSet() throws IOException {
+ List resultSet = new LinkedList();
+ while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ }
+ for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) {
+ resultSet.add(new ResultSetRowPacket(bytes));
+ }
+ return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]);
+ }
+
+ private void notifyEventListener(Event event) {
+ onEvent(event);
+ }
+
+ /**
+ * Disconnect from the replication stream.
+ * Note that this does not cause binlogFilename/binlogPosition to be cleared out.
+ * As the result following {@link #connect()} resumes client from where it left off.
+ */
+ public void disconnect() throws IOException {
+ shutdownLock.lock();
+ try {
+ if (isKeepAliveThreadRunning()) {
+ keepAliveThreadExecutor.shutdownNow();
+ }
+ disconnectChannel();
+ } finally {
+ shutdownLock.unlock();
+ }
+ if (isKeepAliveThreadRunning()) {
+ waitForKeepAliveThreadToBeTerminated();
+ }
+ }
+
+ private void waitForKeepAliveThreadToBeTerminated() {
+ boolean terminated = false;
+ try {
+ terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ if (logger.isLoggable(Level.WARNING)) {
+ logger.log(Level.WARNING, e.getMessage());
+ }
+ }
+ if (!terminated) {
+ throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " +
+ keepAliveThreadShutdownTimeout + "ms");
+ }
+ }
+
+ private void disconnectChannel() throws IOException {
+ try {
+ connected = false;
+ if (channel != null && channel.isOpen()) {
+ channel.close();
+ }
+ } finally {
+ onDisconnect();
+ }
+ }
+
+ /**
+ * Invoked once for each {@link Event}, in the order they are processed.
+ */
+ protected abstract void onEvent(Event event);
+
+ /**
+ * Invoked when a connection is established.
+ */
+ protected abstract void onConnect();
+
+ /**
+ * It's guarantied to be called before {@link #onDisconnect()}) in case of communication failure.
+ */
+ protected abstract void onCommunicationFailure(Exception ex);
+
+ /**
+ * Called in case of failed event deserialization. Note this type of error does NOT cause client to
+ * disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.
+ */
+ protected abstract void onEventDeserializationFailure(Exception ex);
+
+ /**
+ * Called upon disconnect (regardless of the reason).
+ */
+ protected abstract void onDisconnect();
+
+}
diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
index 997b8d2a..660e69c2 100644
--- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
+++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
@@ -16,43 +16,17 @@
package com.github.shyiko.mysql.binlog;
import com.github.shyiko.mysql.binlog.event.Event;
-import com.github.shyiko.mysql.binlog.event.EventHeader;
-import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
-import com.github.shyiko.mysql.binlog.event.EventType;
-import com.github.shyiko.mysql.binlog.event.RotateEventData;
-import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
-import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
-import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
-import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
-import com.github.shyiko.mysql.binlog.network.AuthenticationException;
-import com.github.shyiko.mysql.binlog.network.SocketFactory;
-import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
-import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
-import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
-import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
-import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
-import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
-import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
-import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
-
-import java.io.EOFException;
+
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -61,41 +35,13 @@
*
* @author Stanley Shyiko
*/
-public class BinaryLogClient implements BinaryLogClientMXBean {
+public class BinaryLogClient extends AbstractBinaryLogClient {
private final Logger logger = Logger.getLogger(getClass().getName());
-
- private final String hostname;
- private final int port;
- private final String schema;
- private final String username;
- private final String password;
-
- private long serverId = 65535;
- private volatile String binlogFilename;
- private volatile long binlogPosition;
-
- private EventDeserializer eventDeserializer = new EventDeserializer();
-
private final List eventListeners = new LinkedList();
private final List lifecycleListeners = new LinkedList();
-
- private SocketFactory socketFactory;
-
- private PacketChannel channel;
- private volatile boolean connected;
-
private ThreadFactory threadFactory;
- private boolean keepAlive = true;
- private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1);
- private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3);
-
- private volatile ThreadPoolExecutor keepAliveThreadExecutor;
- private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);
-
- private final Lock shutdownLock = new ReentrantLock();
-
/**
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
* @see BinaryLogClient#BinaryLogClient(String, int, String, String, String)
@@ -129,96 +75,7 @@ public BinaryLogClient(String hostname, int port, String username, String passwo
* @param password password
*/
public BinaryLogClient(String hostname, int port, String schema, String username, String password) {
- this.hostname = hostname;
- this.port = port;
- this.schema = schema;
- this.username = username;
- this.password = password;
- }
-
- /**
- * @return server id (65535 by default)
- */
- public long getServerId() {
- return serverId;
- }
-
- /**
- * @param serverId server id (in the range from 1 to 2^32 – 1). This value MUST be unique across whole replication
- * group (that is, different from any other server id being used by any master or slave). Keep in mind that each
- * binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a
- * simplified slave and thus MUST also use a different server id.
- */
- public void setServerId(long serverId) {
- this.serverId = serverId;
- }
-
- /**
- * @return binary log filename, nullable. Note that this value is automatically tracked by the client and thus
- * is subject to change (in response to {@link EventType#ROTATE}, for example).
- */
- public String getBinlogFilename() {
- return binlogFilename;
- }
-
- /**
- * @param binlogFilename binary log filename (null indicates automatic resolution).
- */
- public void setBinlogFilename(String binlogFilename) {
- this.binlogFilename = binlogFilename;
- }
-
- /**
- * @return binary log position of the next event. Note that this value changes with each incoming event.
- */
- public long getBinlogPosition() {
- return binlogPosition;
- }
-
- /**
- * @param binlogPosition binary log position
- */
- public void setBinlogPosition(long binlogPosition) {
- this.binlogPosition = binlogPosition;
- }
-
- /**
- * @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default),
- * false otherwise.
- */
- public void setKeepAlive(boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
-
- /**
- * @param keepAliveInterval "keep alive" interval in milliseconds.
- */
- public void setKeepAliveInterval(long keepAliveInterval) {
- this.keepAliveInterval = keepAliveInterval;
- }
-
- /**
- * @param keepAliveConnectTimeout "keep alive" connect interval in milliseconds.
- */
- public void setKeepAliveConnectTimeout(long keepAliveConnectTimeout) {
- this.keepAliveConnectTimeout = keepAliveConnectTimeout;
- }
-
- /**
- * @param eventDeserializer custom event deserializer
- */
- public void setEventDeserializer(EventDeserializer eventDeserializer) {
- if (eventDeserializer == null) {
- throw new IllegalArgumentException("Event deserializer cannot be NULL");
- }
- this.eventDeserializer = eventDeserializer;
- }
-
- /**
- * @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()".
- */
- public void setSocketFactory(SocketFactory socketFactory) {
- this.socketFactory = socketFactory;
+ super(username, port, hostname, password, schema);
}
/**
@@ -229,132 +86,10 @@ public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
- /**
- * Connect to the replication stream. Note that this method blocks until disconnected.
- * @throws AuthenticationException in case of failed authentication
- * @throws IOException if anything goes wrong while trying to connect
- */
- public void connect() throws IOException {
- if (connected) {
- throw new IllegalStateException("BinaryLogClient is already connected");
- }
- try {
- try {
- Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
- socket.connect(new InetSocketAddress(hostname, port));
- channel = new PacketChannel(socket);
- if (channel.getInputStream().peek() == -1) {
- throw new EOFException();
- }
- } catch (IOException e) {
- throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
- ". Please make sure it's running.", e);
- }
- GreetingPacket greetingPacket = new GreetingPacket(channel.read());
- AuthenticateCommand authenticateCommand = new AuthenticateCommand(schema, username, password,
- greetingPacket.getScramble());
- authenticateCommand.setCollation(greetingPacket.getServerCollation());
- channel.write(authenticateCommand);
- byte[] authenticationResult = channel.read();
- if (authenticationResult[0] != (byte) 0x00 /* ok */) {
- if (authenticationResult[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);
- throw new AuthenticationException(new ErrorPacket(bytes).getErrorMessage());
- }
- throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");
- }
- if (binlogFilename == null) {
- fetchBinlogFilenameAndPosition();
- }
- ChecksumType checksumType = fetchBinlogChecksum();
- if (checksumType != ChecksumType.NONE) {
- confirmSupportOfChecksum(checksumType);
- }
- channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition));
- } catch (IOException e) {
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
- throw e;
- }
- connected = true;
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition);
- }
- synchronized (lifecycleListeners) {
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onConnect(this);
- }
- }
- if (keepAlive && !isKeepAliveThreadRunning()) {
- spawnKeepAliveThread();
- }
- listenForEventPackets();
- }
-
- private void spawnKeepAliveThread() {
- keepAliveThreadExecutor = newSingleDaemonThreadExecutor("blc-keepalive-" + hostname + ":" + port);
- keepAliveThreadExecutor.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(keepAliveInterval);
- } catch (InterruptedException e) {
- // expected in case of disconnect
- }
- shutdownLock.lock();
- try {
- if (keepAliveThreadExecutor.isShutdown()) {
- return;
- }
- try {
- channel.write(new PingCommand());
- } catch (IOException e) {
- if (logger.isLoggable(Level.INFO)) {
- logger.info("Trying to restore lost connection to " + hostname + ":" + port);
- }
- try {
- if (isConnected()) {
- disconnectChannel();
- }
- connect(keepAliveConnectTimeout);
- } catch (Exception ce) {
- if (logger.isLoggable(Level.WARNING)) {
- logger.warning("Failed to restore connection to " + hostname + ":" + port +
- ". Next attempt in " + keepAliveInterval + "ms");
- }
- }
- }
- } finally {
- shutdownLock.unlock();
- }
- }
- }
- });
- }
-
- private ThreadPoolExecutor newSingleDaemonThreadExecutor(final String threadName) {
- return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue(), new ThreadFactory() {
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, threadName);
- thread.setDaemon(true);
- return thread;
- }
- });
- }
-
- protected boolean isKeepAliveThreadRunning() {
- return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
- }
-
/**
* Connect to the replication stream in a separate thread.
* @param timeoutInMilliseconds timeout in milliseconds
- * @throws AuthenticationException in case of failed authentication
+ * @throws com.github.shyiko.mysql.binlog.network.AuthenticationException in case of failed authentication
* @throws IOException if anything goes wrong while trying to connect
* @throws TimeoutException if client wasn't able to connect in the requested period of time
*/
@@ -399,115 +134,6 @@ public void run() {
}
}
- /**
- * @return true if client is connected, false otherwise
- */
- public boolean isConnected() {
- return connected;
- }
-
- private void fetchBinlogFilenameAndPosition() throws IOException {
- ResultSetRowPacket[] resultSet;
- channel.write(new QueryCommand("show master status"));
- resultSet = readResultSet();
- if (resultSet.length == 0) {
- throw new IOException("Failed to determine binlog filename/position");
- }
- ResultSetRowPacket resultSetRow = resultSet[0];
- binlogFilename = resultSetRow.getValue(0);
- binlogPosition = Long.parseLong(resultSetRow.getValue(1));
- }
-
- private ChecksumType fetchBinlogChecksum() throws IOException {
- channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
- ResultSetRowPacket[] resultSet = readResultSet();
- if (resultSet.length == 0) {
- return ChecksumType.NONE;
- }
- return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());
- }
-
- private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException {
- channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum"));
- byte[] statementResult = channel.read();
- if (statementResult[0] == (byte) 0xFF /* error */) {
- byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
- throw new IOException(new ErrorPacket(bytes).getErrorMessage());
- }
- eventDeserializer.setChecksumType(checksumType);
- }
-
- private void listenForEventPackets() throws IOException {
- ByteArrayInputStream inputStream = channel.getInputStream();
- try {
- while (inputStream.peek() != -1) {
- int packetLength = inputStream.readInteger(3);
- inputStream.skip(1); // 1 byte for sequence
- int marker = inputStream.read();
- if (marker == 0xFF) {
- ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
- throw new IOException(errorPacket.getErrorCode() + " - " + errorPacket.getErrorMessage());
- }
- Event event;
- try {
- event = eventDeserializer.nextEvent(inputStream);
- } catch (Exception e) {
- if (isConnected()) {
- synchronized (lifecycleListeners) {
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onEventDeserializationFailure(this, e);
- }
- }
- }
- continue;
- }
- if (isConnected()) {
- notifyEventListeners(event);
- updateClientBinlogFilenameAndPosition(event);
- }
- }
- } catch (Exception e) {
- if (isConnected()) {
- synchronized (lifecycleListeners) {
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onCommunicationFailure(this, e);
- }
- }
- }
- } finally {
- if (isConnected()) {
- disconnectChannel();
- }
- }
- }
-
- private void updateClientBinlogFilenameAndPosition(Event event) {
- EventHeader eventHeader = event.getHeader();
- if (eventHeader.getEventType() == EventType.ROTATE) {
- RotateEventData eventData = event.getData();
- if (eventData != null) {
- binlogFilename = eventData.getBinlogFilename();
- binlogPosition = eventData.getBinlogPosition();
- }
- } else
- if (eventHeader instanceof EventHeaderV4) {
- EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
- long nextBinlogPosition = trackableEventHeader.getNextPosition();
- if (nextBinlogPosition > 0) {
- binlogPosition = nextBinlogPosition;
- }
- }
- }
-
- private ResultSetRowPacket[] readResultSet() throws IOException {
- List resultSet = new LinkedList();
- while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ }
- for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) {
- resultSet.add(new ResultSetRowPacket(bytes));
- }
- return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]);
- }
-
/**
* @return registered event listeners
*/
@@ -549,7 +175,8 @@ public void unregisterEventListener(EventListener eventListener) {
}
}
- private void notifyEventListeners(Event event) {
+ @Override
+ protected void onEvent(Event event) {
synchronized (eventListeners) {
for (EventListener eventListener : eventListeners) {
try {
@@ -604,53 +231,40 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList
}
}
- /**
- * Disconnect from the replication stream.
- * Note that this does not cause binlogFilename/binlogPosition to be cleared out.
- * As the result following {@link #connect()} resumes client from where it left off.
- */
- public void disconnect() throws IOException {
- shutdownLock.lock();
- try {
- if (isKeepAliveThreadRunning()) {
- keepAliveThreadExecutor.shutdownNow();
+ @Override
+ protected void onConnect() {
+ synchronized (lifecycleListeners) {
+ for (LifecycleListener lifecycleListener : lifecycleListeners) {
+ lifecycleListener.onConnect(this);
}
- disconnectChannel();
- } finally {
- shutdownLock.unlock();
- }
- if (isKeepAliveThreadRunning()) {
- waitForKeepAliveThreadToBeTerminated();
}
}
- private void waitForKeepAliveThreadToBeTerminated() {
- boolean terminated = false;
- try {
- terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout,
- TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (logger.isLoggable(Level.WARNING)) {
- logger.log(Level.WARNING, e.getMessage());
+ @Override
+ protected void onCommunicationFailure(Exception ex) {
+ synchronized (lifecycleListeners) {
+ for (LifecycleListener lifecycleListener : lifecycleListeners) {
+ lifecycleListener.onCommunicationFailure(this, ex);
}
}
- if (!terminated) {
- throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " +
- keepAliveThreadShutdownTimeout + "ms");
- }
+
}
- private void disconnectChannel() throws IOException {
- try {
- connected = false;
- if (channel != null && channel.isOpen()) {
- channel.close();
+ @Override
+ protected void onEventDeserializationFailure(Exception ex) {
+ synchronized (lifecycleListeners) {
+ for (LifecycleListener lifecycleListener : lifecycleListeners) {
+ lifecycleListener.onEventDeserializationFailure(this, ex);
}
- } finally {
- synchronized (lifecycleListeners) {
- for (LifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onDisconnect(this);
- }
+ }
+
+ }
+
+ @Override
+ protected void onDisconnect() {
+ synchronized (lifecycleListeners) {
+ for (LifecycleListener lifecycleListener : lifecycleListeners) {
+ lifecycleListener.onDisconnect(this);
}
}
}
@@ -696,15 +310,19 @@ public interface LifecycleListener {
*/
public static abstract class AbstractLifecycleListener implements LifecycleListener {
+ @Override
public void onConnect(BinaryLogClient client) {
}
+ @Override
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
}
+ @Override
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
}
+ @Override
public void onDisconnect(BinaryLogClient client) {
}
diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
index 62e3510d..a495ec71 100644
--- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
+++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java
@@ -201,6 +201,8 @@ public void testDeserializationOfDifferentColumnTypes() throws Exception {
new java.sql.Date(generateTime(1989, 3, 21, 0, 0, 0, 0))});
assertEquals(writeAndCaptureRow("datetime", "'1989-03-21 01:02:03.000000'"), new Serializable[]{
new java.util.Date(generateTime(1989, 3, 21, 1, 2, 3, 0))});
+ // FIXME: '1989-03-18 01:02:03.000000' fails in the America/Los_Angeles timezone (Daylight Saving Time)
+ // '1989-05-18 01:02:03.000000' passes
assertEquals(writeAndCaptureRow("timestamp", "'1989-03-18 01:02:03.000000'"), new Serializable[]{
new java.sql.Timestamp(generateTime(1989, 3, 18, 1, 2, 3, 0))});
assertEquals(writeAndCaptureRow("time", "'1:2:3.000000'"), new Serializable[]{
@@ -558,8 +560,10 @@ public void execute(Statement statement) throws SQLException {
// fixing connection timezone to the "client's one"
TimeZone currentTimeZone = TimeZone.getDefault();
int offset = currentTimeZone.getRawOffset() + currentTimeZone.getDSTSavings();
- String timeZoneAsAString = String.format("%s%02d:%02d", offset >= 0 ? "+" : "-", offset / 3600000,
- (offset / 60000) % 60);
+ String timeZoneAsAString = String.format("%s%02d:%02d",
+ offset >= 0 ? "+" : "-",
+ Math.abs(offset / 3600000),
+ (offset / 60000) % 60);
statement.execute("SET time_zone = '" + timeZoneAsAString + "'");
}
});