|
5 | 5 | "io"
|
6 | 6 | "os"
|
7 | 7 | "path"
|
| 8 | + "sync" |
8 | 9 | "time"
|
9 | 10 |
|
10 | 11 | . "github.com/go-mysql-org/go-mysql/mysql"
|
@@ -41,77 +42,143 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
|
41 | 42 | // Force use raw mode
|
42 | 43 | b.parser.SetRawMode(true)
|
43 | 44 |
|
| 45 | + // Set up the backup event handler |
| 46 | + backupHandler := &BackupEventHandler{ |
| 47 | + handler: handler, |
| 48 | + } |
| 49 | + |
| 50 | + if b.cfg.SyncMode == SyncModeSync { |
| 51 | + // Set the event handler in BinlogSyncer for synchronous mode |
| 52 | + b.SetEventHandler(backupHandler) |
| 53 | + } |
| 54 | + |
44 | 55 | s, err := b.StartSync(p)
|
45 | 56 | if err != nil {
|
46 | 57 | return errors.Trace(err)
|
47 | 58 | }
|
48 | 59 |
|
49 |
| - var filename string |
50 |
| - var offset uint32 |
51 |
| - |
52 |
| - var w io.WriteCloser |
53 | 60 | defer func() {
|
54 |
| - var closeErr error |
55 |
| - if w != nil { |
56 |
| - closeErr = w.Close() |
57 |
| - } |
58 |
| - if retErr == nil { |
59 |
| - retErr = closeErr |
| 61 | + b.SetEventHandler(nil) // Reset the event handler |
| 62 | + if backupHandler.w != nil { |
| 63 | + closeErr := backupHandler.w.Close() |
| 64 | + if retErr == nil { |
| 65 | + retErr = closeErr |
| 66 | + } |
60 | 67 | }
|
61 | 68 | }()
|
62 | 69 |
|
63 |
| - for { |
64 |
| - ctx, cancel := context.WithTimeout(context.Background(), timeout) |
65 |
| - e, err := s.GetEvent(ctx) |
66 |
| - cancel() |
| 70 | + ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| 71 | + defer cancel() |
67 | 72 |
|
68 |
| - if err == context.DeadlineExceeded { |
| 73 | + if b.cfg.SyncMode == SyncModeSync { |
| 74 | + // Synchronous mode: wait for completion or error |
| 75 | + select { |
| 76 | + case <-ctx.Done(): |
69 | 77 | return nil
|
70 |
| - } |
71 |
| - |
72 |
| - if err != nil { |
| 78 | + case <-b.ctx.Done(): |
| 79 | + return nil |
| 80 | + case err := <-s.ech: |
73 | 81 | return errors.Trace(err)
|
74 | 82 | }
|
| 83 | + } else { |
| 84 | + // Asynchronous mode: consume events from the streamer |
| 85 | + for { |
| 86 | + select { |
| 87 | + case <-ctx.Done(): |
| 88 | + return nil |
| 89 | + case <-b.ctx.Done(): |
| 90 | + return nil |
| 91 | + case err := <-s.ech: |
| 92 | + return errors.Trace(err) |
| 93 | + case e := <-s.ch: |
| 94 | + err = backupHandler.HandleEvent(e) |
| 95 | + if err != nil { |
| 96 | + return errors.Trace(err) |
| 97 | + } |
| 98 | + } |
| 99 | + } |
| 100 | + } |
| 101 | +} |
75 | 102 |
|
76 |
| - offset = e.Header.LogPos |
| 103 | +// BackupEventHandler handles writing events for backup |
| 104 | +type BackupEventHandler struct { |
| 105 | + handler func(binlogFilename string) (io.WriteCloser, error) |
| 106 | + w io.WriteCloser |
| 107 | + mutex sync.Mutex |
| 108 | + fsyncedChan chan struct{} |
| 109 | + eventCount int // eventCount used for testing |
77 | 110 |
|
78 |
| - if e.Header.EventType == ROTATE_EVENT { |
79 |
| - rotateEvent := e.Event.(*RotateEvent) |
80 |
| - filename = string(rotateEvent.NextLogName) |
| 111 | + filename string |
| 112 | +} |
81 | 113 |
|
82 |
| - if e.Header.Timestamp == 0 || offset == 0 { |
83 |
| - // fake rotate event |
84 |
| - continue |
85 |
| - } |
86 |
| - } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { |
87 |
| - // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new |
| 114 | +func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { |
| 115 | + h.mutex.Lock() |
| 116 | + defer h.mutex.Unlock() |
88 | 117 |
|
89 |
| - if w != nil { |
90 |
| - if err = w.Close(); err != nil { |
91 |
| - w = nil |
92 |
| - return errors.Trace(err) |
93 |
| - } |
94 |
| - } |
| 118 | + var err error |
95 | 119 |
|
96 |
| - if len(filename) == 0 { |
97 |
| - return errors.Errorf("empty binlog filename for FormateDescriptionEvent") |
98 |
| - } |
| 120 | + // Update the offset |
| 121 | + offset := e.Header.LogPos |
99 | 122 |
|
100 |
| - w, err = handler(filename) |
101 |
| - if err != nil { |
102 |
| - return errors.Trace(err) |
103 |
| - } |
| 123 | + if e.Header.EventType == ROTATE_EVENT { |
| 124 | + rotateEvent := e.Event.(*RotateEvent) |
| 125 | + h.filename = string(rotateEvent.NextLogName) |
104 | 126 |
|
105 |
| - // write binlog header fe'bin' |
106 |
| - if _, err = w.Write(BinLogFileHeader); err != nil { |
| 127 | + if e.Header.Timestamp == 0 || offset == 0 { |
| 128 | + // Fake rotate event, skip processing |
| 129 | + return nil |
| 130 | + } |
| 131 | + } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { |
| 132 | + // Close the current writer and open a new one |
| 133 | + if h.w != nil { |
| 134 | + if err = h.w.Close(); err != nil { |
| 135 | + h.w = nil |
107 | 136 | return errors.Trace(err)
|
108 | 137 | }
|
109 | 138 | }
|
110 | 139 |
|
111 |
| - if n, err := w.Write(e.RawData); err != nil { |
| 140 | + if len(h.filename) == 0 { |
| 141 | + return errors.Errorf("empty binlog filename for FormatDescriptionEvent") |
| 142 | + } |
| 143 | + |
| 144 | + h.w, err = h.handler(h.filename) |
| 145 | + if err != nil { |
| 146 | + return errors.Trace(err) |
| 147 | + } |
| 148 | + |
| 149 | + // Write binlog header fe'bin' |
| 150 | + _, err = h.w.Write(BinLogFileHeader) |
| 151 | + if err != nil { |
112 | 152 | return errors.Trace(err)
|
113 |
| - } else if n != len(e.RawData) { |
| 153 | + } |
| 154 | + } |
| 155 | + |
| 156 | + // Write raw event data to the current writer |
| 157 | + if h.w != nil { |
| 158 | + n, err := h.w.Write(e.RawData) |
| 159 | + if err != nil { |
| 160 | + return errors.Trace(err) |
| 161 | + } |
| 162 | + if n != len(e.RawData) { |
114 | 163 | return errors.Trace(io.ErrShortWrite)
|
115 | 164 | }
|
| 165 | + |
| 166 | + // Perform Sync if the writer supports it |
| 167 | + if f, ok := h.w.(*os.File); ok { |
| 168 | + if err := f.Sync(); err != nil { |
| 169 | + return errors.Trace(err) |
| 170 | + } |
| 171 | + // Signal that fsync has completed |
| 172 | + if h.fsyncedChan != nil { |
| 173 | + h.fsyncedChan <- struct{}{} |
| 174 | + } |
| 175 | + } |
| 176 | + } else { |
| 177 | + // If writer is nil and event is not FORMAT_DESCRIPTION_EVENT, we can't write |
| 178 | + // This should not happen if events are in expected order |
| 179 | + return errors.New("writer is not initialized") |
116 | 180 | }
|
| 181 | + |
| 182 | + h.eventCount++ |
| 183 | + return nil |
117 | 184 | }
|
0 commit comments