Skip to content

A bunch of small changes #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 26, 2013
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ The project is test with:

It's not tested in real production situation.

TESTING
=======

Make sure you have the following configuration set in your mysql config file (usually my.cnf on development env):

log-bin=mysql-bin
server-id=1
binlog_do_db=pymysqlreplication_test

Limitations
=============

Expand Down
8 changes: 6 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@

from .packet import BinLogPacketWrapper
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT

from .event import NotImplementedEvent

class BinLogStreamReader(object):
"""Connect to replication stream and read event
"""

def __init__(self, connection_settings={}, resume_stream=False,
blocking=False, only_events=None, server_id=255,
log_file=None, log_pos=None):
log_file=None, log_pos=None, filter_non_implemented_events=True):
"""
Attributes:
resume_stream: Start for event from position or the latest event of
Expand All @@ -35,6 +35,7 @@ def __init__(self, connection_settings={}, resume_stream=False,
self.__resume_stream = resume_stream
self.__blocking = blocking
self.__only_events = only_events
self.__filter_non_implemented_events = filter_non_implemented_events
self.__server_id = server_id

#Store table meta information
Expand Down Expand Up @@ -132,6 +133,9 @@ def fetchone(self):
return binlog_event.event

def __filter_event(self, event):
if self.__filter_non_implemented_events and isinstance(event, NotImplementedEvent):
return True

if self.__only_events is not None:
for allowed_event in self.__only_events:
if isinstance(event, allowed_event):
Expand Down
2 changes: 1 addition & 1 deletion pymysqlreplication/constants/BINLOG.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
DELETE_ROWS_EVENT_V2 = 0x20
GTID_LOG_EVENT = 0x21
ANONYMOUS_GTID_LOG_EVENT = 0x22
PREVIOUS_GTIDS_LOG_EVENT = 0x23
PREVIOUS_GTIDS_LOG_EVENT = 0x23
2 changes: 1 addition & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,4 @@ class NotImplementedEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection):
super(NotImplementedEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection)
self.packet.advance(event_size)
self.packet.advance(event_size)
6 changes: 6 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class BinLogPacketWrapper(object):
constants.WRITE_ROWS_EVENT_V2: row_event.WriteRowsEvent,
constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent,
constants.TABLE_MAP_EVENT: row_event.TableMapEvent,
#5.6 GTID enabled replication events
constants.INTVAR_EVENT: event.NotImplementedEvent,
constants.GTID_LOG_EVENT: event.NotImplementedEvent,
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent

}

def __init__(self, from_packet, table_map, ctl_connection):
Expand Down
4 changes: 4 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def test_connection_stream_lost_event(self):
self.stream.fetchone()

event = self.stream.fetchone()

while (type(event) == NotImplementedEvent):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event = self.stream.fetchone()

self.assertIsInstance(event, QueryEvent)
self.assertEqual(event.query, query)

Expand Down