Skip to content

Add feature to start stream from position #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 24, 2013

Conversation

lxyu
Copy link
Contributor

@lxyu lxyu commented May 22, 2013

This pull request add the feature to start stream from a pre-set position.

@lxyu
Copy link
Contributor Author

lxyu commented May 22, 2013

Just found a bug. I'll push later.

@liorsion
Copy link
Collaborator

what are you trying to accomplish here?

just adding a location doesn't help - because you need to extract it and save it as well. Also, you can only save the place after it was processed successfully.

Internally I have a version of BinLogStreamReader that optionally receives a persistencer interface (with two methods: load and save) that takes care of the last know location and resuming, if you guys think it's the right way to go I can push it.

My sample persistancer btw, is a redis layer that saves the last log pos.

@lxyu
Copy link
Contributor Author

lxyu commented May 22, 2013

Yah I also found it and just updated the code.

I am using a redis layer to save the log_file and log_pos so when anything crashed, it can start from where it dies.

The problem I'm trying to solve is:

"Do replication jobs without lost any sqls even when script crashs, mysql crashes, mysql restarts, etc."

The original code use the "SHOW MASTER STATUS" and will always start from the time when the script starts, so in previos situation I mentioned, it'll lost sqls.

So I have to record the log_file and log_pos in order to get where the replication goes.

I also enabled the "STOP_EVENT" for the reason in commit message:

when manually pass log_file and log_pos, if mysql restart, it'll generate a stop event and cause exception in stream.

@bjoernhaeuser
Copy link
Collaborator

Hey there,

the stream currently already supports to start at every position you provide by manually overriding the "private" properties for the file and the position.
In the first step I would start to make them public accessible (removing the two underscores would be the first step).

Second we need to think about how to persist the state the stream is currently in. Having an interface called when every event would be a solution. The User of the stream has to provide an implementation which cares about persisting the state somewhere (e.g. a File or Redis).

I also think that we would need an Implementation of the STOP_EVENt to handle restarts gracefully.

I also needed this feature and ended up persisting the current state every x seconds by reading the properties by myself and saving them to a file.

@noplay what do you think?

Regards
Björn

@julien-duponchelle
Copy link
Owner

I think a interface for storing the position is the perfect solution. We can provide a file and redis based implementation (i'm think also about zookeeper and apache mesos for HA)

@bjoernhaeuser
Copy link
Collaborator

When would this interface be called? After every event?

@lxyu
Copy link
Contributor Author

lxyu commented May 22, 2013

About the interface, I have the same concern with @liorsion, the position can only be saved when the events processed successfully, which should be outside the scope of this module.

So I think provide a read interface is enough.

@liorsion
Copy link
Collaborator

In my implementation the class goes down all the way to the event, that has a new "mark_processed" method. This is called by the user of the BinLogStreamReader - it's not as clean or automatic, but only the user of the reader knows when an event was processed successfully.

@julien-duponchelle
Copy link
Owner

Perhaps we can improve the iterator returning events. When the reader ask for the next event we mark the event as processed. We have no reason to continue reading if event havent been processed. Also we need to call mark_processed also for non supported event by the reader.

@lxyu
Copy link
Contributor Author

lxyu commented May 22, 2013

@noplay Good idea.

@julien-duponchelle
Copy link
Owner

I have a concern about correctly supporting the reader calling break in the iterator. I don't know if we can catch it.

@bjoernhaeuser
Copy link
Collaborator

Why not just call the interface before we pass the event to the stream-reader?

@liorsion
Copy link
Collaborator

I thought about that but that's exactly what I was trying to avoid. If we do it like that it means there has to be another persistency level at the user of the lib - if an event was read and then the user service crashed - it needs to be re-read and acted on, right?

I have a sample branch that shows how I thought about it: https://github.com/liorsion/python-mysql-replication/tree/add_persistance_layer

a basic class is something like:

class LogPersistanceLayer(object):

def __init__(self):
    pass

def save(self, log_num):
    pass

def load(self):
    pass

a sample redis class is:

from LogPersistanceLayer import LogPersistanceLayer

class RedisPersistanceLayer(LogPersistanceLayer):
PERSISTANCE_KEY = "RedisPersistanceLayer::COUNTER"

def __init__(self, redis_client):
    super(RedisPersistanceLayer, self).__init__()
    self.__redisClient = redis_client

def save(self, log_num):
    self.__redisClient.set(self.PERSISTANCE_KEY, log_num)

def load(self):
    last_log_num = self.__redisClient.get(self.PERSISTANCE_KEY)
    if last_log_num and long(last_log_num) > 0: return long(last_log_num)
    return None

@bjoernhaeuser
Copy link
Collaborator

What I wanted to say is that we could call the storage-layer with the last event position before we pass the current event down the road.

So if the handler of the event crashes / fails the value of the last successful event will be stored.

@bjoernhaeuser
Copy link
Collaborator

An illustration, pseudo-code-style:

def read_from_stream(self):
    self.storage_layer.persist(current_position)

    packet = self.read_packet_from()
    event = self.read_event_from_packet()

    return event

@julien-duponchelle
Copy link
Owner

The problem for me is how did you manage when we quit the program? The next time we will start from the last read event which have already been processed.

Perhaps in the close method.

Le mercredi 22 mai 2013 à 11:33, bjoernhaeuser a écrit :

An illustration, pseudo-code-style:
def read_from_stream(self): self.storage_layer.persist(current_position) packet = self.read_packet_from() event = self.read_event_from_packet() return event


Reply to this email directly or view it on GitHub (#24 (comment)).

@lxyu
Copy link
Contributor Author

lxyu commented May 22, 2013

Again I think the simplest solution is just provide a simple read interface, and let the user store location himself.

It'll be enough for whoever need the log position.

@bjoernhaeuser
Copy link
Collaborator

Or we provide an interface which also support setting the position and just letting the user the option to persist it somewhere. What do you think?

@liorsion
Copy link
Collaborator

Letting the user persist is also a good option I guess. Maybe as a declaration it's even better since it's very clear that it's the user's responsibility and not half and half like in my example.

@lxyu
Copy link
Contributor Author

lxyu commented May 23, 2013

I have updated my code, now it exposes log_pos and log_file in stream object.

So far works fine for me.

@julien-duponchelle
Copy link
Owner

Sound good, i will test it later. But i agree at this time a simple solution is enough

@liorsion
Copy link
Collaborator

Is it possible for you to add tests for the change?

@lxyu
Copy link
Contributor Author

lxyu commented May 23, 2013

All right, I'll try it.

@liorsion
Copy link
Collaborator

Cool thanks if you get in trouble I can help
On May 23, 2013 10:50 AM, "Lx Yu" [email protected] wrote:

All right, I'll try it.


Reply to this email directly or view it on GitHubhttps://github.com//pull/24#issuecomment-18328398
.

@lxyu
Copy link
Contributor Author

lxyu commented May 23, 2013

Test added.

While I found it tricky to write this:

for i in range(6):
    self.stream.fetchone()

And this:

# RotateEvent
self.stream.fetchone()
# FormatDescription
self.stream.fetchone()
# XvidEvent
self.stream.fetchone()
# QueryEvent for the BEGIN
self.stream.fetchone()

Cause there's some "noises" in events, such as RotateEvent, FormatDescripton, XvidEvent, etc.

If with this line, it'll be much cleaner:

only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent]

I didn't add it here cause other tests don't, but I suppose it may be good to do so.

@julien-duponchelle julien-duponchelle merged commit c59bb95 into julien-duponchelle:master May 24, 2013
@julien-duponchelle
Copy link
Owner

Commit merged thanks a lot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants