Skip to content

Commit c435689

Browse files
dt8269Dylan Terry
and
Dylan Terry
authored
Ensure ACKs are sent after backup (#921)
* This commit addresses an issue where acknowledgments (ACKs) were sometimes sent to the master before binlog events were fully written and fsynced to disk during backup operations. Sending ACKs prematurely in semi-synchronous replication could lead to data loss if the replica fails after sending the ACK but before persisting the event. Key changes: - Introduced an `EventHandler` interface with a `HandleEvent` method for processing binlog events. This allows custom event handling logic to be injected into the replication stream. - Added an `eventHandler` field to `BinlogSyncer` and provided a `SetEventHandler` method to assign an event handler. This enables `BinlogSyncer` to delegate event processing to the assigned handler. - Implemented `BackupEventHandler` which writes binlog events to disk and ensures that each event is fsynced before returning. This ensures data durability before ACKs are sent. - Modified the `onStream` method in `BinlogSyncer` to separate event parsing (`parseEvent`) from event handling and ACK sending (`handleEventAndACK`). This adheres to the single-responsibility principle and makes the code cleaner. - Moved state updates (e.g., updating `b.nextPos`) and GTID set handling from `parseEvent` to `handleEventAndACK` to avoid side effects during parsing. - Ensured that ACKs are sent only after the event has been fully processed and fsynced by sending the ACK in `handleEventAndACK` after event handling. * Refactor event handling by replacing SyncMode and EventHandleMode with SynchronousEventHandler. Simplify the event processing in BinlogSyncerConfig by introducing SynchronousEventHandler for synchronous event handling. Update StartBackup, StartBackupWithHandler, and associated tests to reflect these changes. * Add some comments and remember to remove SetEventHandler and the eventHandler attribute * Remove the timeout for synchronous backup, revert the timeout move to return the behavior to 30 days _between_ events, restore some comments, use struct instead of bool as recommended, add a note about SynchronousEventHandler and the parseEvent return values * Make sure to assign the timeout on the syncer so the backup doesn't fail * Make sure to add NewBackupHandler in order to expose the otherwise private handler outside the package --------- Co-authored-by: Dylan Terry <[email protected]>
1 parent a9f9c23 commit c435689

File tree

3 files changed

+276
-86
lines changed

3 files changed

+276
-86
lines changed

replication/backup.go

+124-49
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,31 @@ import (
55
"io"
66
"os"
77
"path"
8+
"sync"
89
"time"
910

1011
. "github.com/go-mysql-org/go-mysql/mysql"
1112
"github.com/pingcap/errors"
1213
)
1314

14-
// StartBackup: Like mysqlbinlog remote raw backup
15-
// Backup remote binlog from position (filename, offset) and write in backupDir
15+
// StartBackup starts the backup process for the binary log and writes to the backup directory.
1616
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
1717
err := os.MkdirAll(backupDir, 0755)
1818
if err != nil {
1919
return errors.Trace(err)
2020
}
21-
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
22-
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
23-
})
21+
if b.cfg.SynchronousEventHandler == nil {
22+
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
23+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
24+
})
25+
} else {
26+
return b.StartSynchronousBackup(p, timeout)
27+
}
2428
}
2529

2630
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
2731
// The process will continue until the timeout is reached or an error occurs.
32+
// This method should not be used together with SynchronousEventHandler.
2833
//
2934
// Parameters:
3035
// - p: The starting position in the binlog from which to begin the backup.
@@ -37,81 +42,151 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
3742
// a very long timeout here
3843
timeout = 30 * 3600 * 24 * time.Second
3944
}
45+
if b.cfg.SynchronousEventHandler != nil {
46+
return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.")
47+
}
4048

4149
// Force use raw mode
4250
b.parser.SetRawMode(true)
4351

52+
// Set up the backup event handler
53+
backupHandler := &BackupEventHandler{
54+
handler: handler,
55+
}
56+
4457
s, err := b.StartSync(p)
4558
if err != nil {
4659
return errors.Trace(err)
4760
}
4861

49-
var filename string
50-
var offset uint32
51-
52-
var w io.WriteCloser
5362
defer func() {
54-
var closeErr error
55-
if w != nil {
56-
closeErr = w.Close()
57-
}
58-
if retErr == nil {
59-
retErr = closeErr
63+
if backupHandler.w != nil {
64+
closeErr := backupHandler.w.Close()
65+
if retErr == nil {
66+
retErr = closeErr
67+
}
6068
}
6169
}()
6270

6371
for {
6472
ctx, cancel := context.WithTimeout(context.Background(), timeout)
65-
e, err := s.GetEvent(ctx)
66-
cancel()
73+
defer cancel()
6774

68-
if err == context.DeadlineExceeded {
75+
select {
76+
case <-ctx.Done():
6977
return nil
70-
}
71-
72-
if err != nil {
78+
case <-b.ctx.Done():
79+
return nil
80+
case err := <-s.ech:
7381
return errors.Trace(err)
82+
case e := <-s.ch:
83+
err = backupHandler.HandleEvent(e)
84+
if err != nil {
85+
return errors.Trace(err)
86+
}
7487
}
88+
}
89+
}
7590

76-
offset = e.Header.LogPos
91+
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
92+
func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error {
93+
if b.cfg.SynchronousEventHandler == nil {
94+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
95+
}
7796

78-
if e.Header.EventType == ROTATE_EVENT {
79-
rotateEvent := e.Event.(*RotateEvent)
80-
filename = string(rotateEvent.NextLogName)
97+
s, err := b.StartSync(p)
98+
if err != nil {
99+
return errors.Trace(err)
100+
}
81101

82-
if e.Header.Timestamp == 0 || offset == 0 {
83-
// fake rotate event
84-
continue
85-
}
86-
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
87-
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new
88-
89-
if w != nil {
90-
if err = w.Close(); err != nil {
91-
w = nil
92-
return errors.Trace(err)
93-
}
94-
}
102+
var ctx context.Context
103+
var cancel context.CancelFunc
95104

96-
if len(filename) == 0 {
97-
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
98-
}
105+
if timeout > 0 {
106+
ctx, cancel = context.WithTimeout(context.Background(), timeout)
107+
defer cancel()
108+
} else {
109+
ctx = context.Background()
110+
}
99111

100-
w, err = handler(filename)
101-
if err != nil {
102-
return errors.Trace(err)
103-
}
112+
select {
113+
case <-ctx.Done():
114+
// The timeout has been reached
115+
return nil
116+
case <-b.ctx.Done():
117+
// The BinlogSyncer has been closed
118+
return nil
119+
case err := <-s.ech:
120+
// An error occurred during streaming
121+
return errors.Trace(err)
122+
}
123+
}
124+
125+
// BackupEventHandler handles writing events for backup
126+
type BackupEventHandler struct {
127+
handler func(binlogFilename string) (io.WriteCloser, error)
128+
w io.WriteCloser
129+
mutex sync.Mutex
130+
131+
filename string
132+
}
133+
134+
func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler {
135+
return &BackupEventHandler{
136+
handler: handlerFunction,
137+
}
138+
}
139+
140+
// HandleEvent processes a single event for the backup.
141+
func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
142+
h.mutex.Lock()
143+
defer h.mutex.Unlock()
144+
145+
var err error
146+
offset := e.Header.LogPos
104147

105-
// write binlog header fe'bin'
106-
if _, err = w.Write(BinLogFileHeader); err != nil {
148+
if e.Header.EventType == ROTATE_EVENT {
149+
rotateEvent := e.Event.(*RotateEvent)
150+
h.filename = string(rotateEvent.NextLogName)
151+
if e.Header.Timestamp == 0 || offset == 0 {
152+
// fake rotate event
153+
return nil
154+
}
155+
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
156+
if h.w != nil {
157+
if err = h.w.Close(); err != nil {
158+
h.w = nil
107159
return errors.Trace(err)
108160
}
109161
}
110162

111-
if n, err := w.Write(e.RawData); err != nil {
163+
if len(h.filename) == 0 {
164+
return errors.Errorf("empty binlog filename for FormatDescriptionEvent")
165+
}
166+
167+
h.w, err = h.handler(h.filename)
168+
if err != nil {
112169
return errors.Trace(err)
113-
} else if n != len(e.RawData) {
170+
}
171+
172+
// Write binlog header 0xfebin
173+
_, err = h.w.Write(BinLogFileHeader)
174+
if err != nil {
175+
return errors.Trace(err)
176+
}
177+
}
178+
179+
if h.w != nil {
180+
n, err := h.w.Write(e.RawData)
181+
if err != nil {
182+
return errors.Trace(err)
183+
}
184+
if n != len(e.RawData) {
114185
return errors.Trace(io.ErrShortWrite)
115186
}
187+
} else {
188+
return errors.New("writer is not initialized")
116189
}
190+
191+
return nil
117192
}

replication/backup_test.go

+72-2
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ package replication
22

33
import (
44
"context"
5+
"io"
56
"os"
7+
"path"
68
"time"
79

810
"github.com/stretchr/testify/require"
911

1012
"github.com/go-mysql-org/go-mysql/mysql"
1113
)
1214

15+
// TestStartBackupEndInGivenTime tests the backup process completes within a given time.
1316
func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
1417
t.setupTest(mysql.MySQLFlavor)
1518

@@ -30,12 +33,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
3033
os.RemoveAll(binlogDir)
3134
timeout := 2 * time.Second
3235

33-
done := make(chan bool)
36+
done := make(chan struct{})
3437

3538
go func() {
3639
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
3740
require.NoError(t.T(), err)
38-
done <- true
41+
close(done)
3942
}()
4043
failTimeout := 5 * timeout
4144
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
@@ -47,3 +50,70 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
4750
t.T().Fatal("time out error")
4851
}
4952
}
53+
54+
// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation.
55+
func (t *testSyncerSuite) TestAsyncBackup() {
56+
testBackup(t, false) // false indicates asynchronous mode
57+
}
58+
59+
// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation.
60+
func (t *testSyncerSuite) TestSyncBackup() {
61+
testBackup(t, true) // true indicates synchronous mode
62+
}
63+
64+
// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
65+
func testBackup(t *testSyncerSuite, isSynchronous bool) {
66+
t.setupTest(mysql.MySQLFlavor)
67+
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled
68+
69+
binlogDir := "./var"
70+
os.RemoveAll(binlogDir)
71+
timeout := 3 * time.Second
72+
73+
if isSynchronous {
74+
// Set up a BackupEventHandler for synchronous mode
75+
backupHandler := NewBackupEventHandler(
76+
func(filename string) (io.WriteCloser, error) {
77+
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
78+
},
79+
)
80+
t.b.cfg.SynchronousEventHandler = backupHandler
81+
} else {
82+
// Ensure SynchronousEventHandler is nil for asynchronous mode
83+
t.b.cfg.SynchronousEventHandler = nil
84+
}
85+
86+
done := make(chan bool)
87+
88+
// Start the backup process in a goroutine
89+
go func() {
90+
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
91+
require.NoError(t.T(), err)
92+
done <- true
93+
}()
94+
95+
failTimeout := 2 * timeout
96+
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
97+
defer cancel()
98+
99+
// Wait for the backup to complete or timeout
100+
select {
101+
case <-done:
102+
// Check if binlog files are written to the specified directory
103+
files, err := os.ReadDir(binlogDir)
104+
require.NoError(t.T(), err, "Failed to read binlog directory")
105+
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
106+
mode := modeLabel(isSynchronous)
107+
t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files))
108+
case <-ctx.Done():
109+
mode := modeLabel(isSynchronous)
110+
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
111+
}
112+
}
113+
114+
func modeLabel(isSynchronous bool) string {
115+
if isSynchronous {
116+
return "synchronous"
117+
}
118+
return "asynchronous"
119+
}

0 commit comments

Comments
 (0)