-
Notifications
You must be signed in to change notification settings - Fork 683
Add feature to start stream from position #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Just found a bug. I'll push later. |
what are you trying to accomplish here? just adding a location doesn't help - because you need to extract it and save it as well. Also, you can only save the place after it was processed successfully. Internally I have a version of BinLogStreamReader that optionally receives a persistencer interface (with two methods: load and save) that takes care of the last know location and resuming, if you guys think it's the right way to go I can push it. My sample persistancer btw, is a redis layer that saves the last log pos. |
Yah I also found it and just updated the code. I am using a redis layer to save the The problem I'm trying to solve is: "Do replication jobs without lost any sqls even when script crashs, mysql crashes, mysql restarts, etc." The original code use the "SHOW MASTER STATUS" and will always start from the time when the script starts, so in previos situation I mentioned, it'll lost sqls. So I have to record the I also enabled the "STOP_EVENT" for the reason in commit message:
|
Hey there, the stream currently already supports to start at every position you provide by manually overriding the "private" properties for the file and the position. Second we need to think about how to persist the state the stream is currently in. Having an interface called when every event would be a solution. The User of the stream has to provide an implementation which cares about persisting the state somewhere (e.g. a File or Redis). I also think that we would need an Implementation of the STOP_EVENt to handle restarts gracefully. I also needed this feature and ended up persisting the current state every x seconds by reading the properties by myself and saving them to a file. @noplay what do you think? Regards |
I think a interface for storing the position is the perfect solution. We can provide a file and redis based implementation (i'm think also about zookeeper and apache mesos for HA) |
When would this interface be called? After every event? |
About the interface, I have the same concern with @liorsion, the position can only be saved when the events processed successfully, which should be outside the scope of this module. So I think provide a read interface is enough. |
In my implementation the class goes down all the way to the event, that has a new "mark_processed" method. This is called by the user of the BinLogStreamReader - it's not as clean or automatic, but only the user of the reader knows when an event was processed successfully. |
Perhaps we can improve the iterator returning events. When the reader ask for the next event we mark the event as processed. We have no reason to continue reading if event havent been processed. Also we need to call mark_processed also for non supported event by the reader. |
@noplay Good idea. |
I have a concern about correctly supporting the reader calling break in the iterator. I don't know if we can catch it. |
Why not just call the interface before we pass the event to the stream-reader? |
I thought about that but that's exactly what I was trying to avoid. If we do it like that it means there has to be another persistency level at the user of the lib - if an event was read and then the user service crashed - it needs to be re-read and acted on, right? I have a sample branch that shows how I thought about it: https://github.com/liorsion/python-mysql-replication/tree/add_persistance_layer a basic class is something like: class LogPersistanceLayer(object):
a sample redis class is: from LogPersistanceLayer import LogPersistanceLayer class RedisPersistanceLayer(LogPersistanceLayer):
|
What I wanted to say is that we could call the storage-layer with the last event position before we pass the current event down the road. So if the handler of the event crashes / fails the value of the last successful event will be stored. |
An illustration, pseudo-code-style:
|
The problem for me is how did you manage when we quit the program? The next time we will start from the last read event which have already been processed. Perhaps in the close method. Le mercredi 22 mai 2013 à 11:33, bjoernhaeuser a écrit :
|
Again I think the simplest solution is just provide a simple read interface, and let the user store location himself. It'll be enough for whoever need the log position. |
Or we provide an interface which also support setting the position and just letting the user the option to persist it somewhere. What do you think? |
Letting the user persist is also a good option I guess. Maybe as a declaration it's even better since it's very clear that it's the user's responsibility and not half and half like in my example. |
I have updated my code, now it exposes log_pos and log_file in stream object. So far works fine for me. |
Sound good, i will test it later. But i agree at this time a simple solution is enough |
Is it possible for you to add tests for the change? |
All right, I'll try it. |
Cool thanks if you get in trouble I can help
|
Test added. While I found it tricky to write this: for i in range(6):
self.stream.fetchone() And this: # RotateEvent
self.stream.fetchone()
# FormatDescription
self.stream.fetchone()
# XvidEvent
self.stream.fetchone()
# QueryEvent for the BEGIN
self.stream.fetchone() Cause there's some "noises" in events, such as RotateEvent, FormatDescripton, XvidEvent, etc. If with this line, it'll be much cleaner: only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent] I didn't add it here cause other tests don't, but I suppose it may be good to do so. |
Commit merged thanks a lot |
This pull request add the feature to start stream from a pre-set position.