-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathlocal.go
117 lines (98 loc) · 4.09 KB
/
local.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package canal
import (
"context"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/errors"
)
// BinlogFileDownloader downloads the binlog file and return the path to it. It's often used to download binlog backup from RDS service.
type BinlogFileDownloader func(mysql.Position) (localBinFilePath string, err error)
// WithLocalBinlogDownloader registers the local bin file downloader,
// that allows download the backup binlog file from RDS service to local
func (c *Canal) WithLocalBinlogDownloader(d BinlogFileDownloader) {
c.binFileDownloader = d
}
func (c *Canal) adaptLocalBinFileStreamer(remoteBinlogStreamer *replication.BinlogStreamer, err error) (*localBinFileAdapterStreamer, error) {
return &localBinFileAdapterStreamer{
BinlogStreamer: remoteBinlogStreamer,
syncMasterStreamer: remoteBinlogStreamer,
canal: c,
binFileDownloader: c.binFileDownloader,
}, err
}
// localBinFileAdapterStreamer will support to download flushed binlog file for continuous sync in cloud computing platform
type localBinFileAdapterStreamer struct {
*replication.BinlogStreamer // the running streamer, it will be localStreamer or sync master streamer
syncMasterStreamer *replication.BinlogStreamer // syncMasterStreamer is the streamer from canal startSyncer
canal *Canal
binFileDownloader BinlogFileDownloader
}
// GetEvent will auto switch the local and remote streamer to get binlog event if possible.
func (s *localBinFileAdapterStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) {
if s.binFileDownloader == nil { // not support to use local bin file
return s.BinlogStreamer.GetEvent(ctx)
}
ev, err := s.BinlogStreamer.GetEvent(ctx)
if err == nil {
switch ev.Event.(type) {
case *replication.RotateEvent: // RotateEvent means need to change steamer back to sync master to retry sync
s.BinlogStreamer = s.syncMasterStreamer
}
return ev, err
}
if err == replication.ErrNeedSyncAgain { // restart master if last sync master syncer has error
s.canal.syncer.Close()
_ = s.canal.prepareSyncer()
newStreamer, startErr := s.canal.startSyncer()
if startErr != nil {
return nil, startErr
}
// set all streamer to the new sync master streamer
s.BinlogStreamer = newStreamer
s.syncMasterStreamer = newStreamer
ev, err = newStreamer.GetEvent(ctx)
}
mysqlErr, ok := err.(*mysql.MyError)
// only 'Could not find first log' can create local streamer, ignore other errors
if !ok || mysqlErr.Code != mysql.ER_MASTER_FATAL_ERROR_READING_BINLOG ||
mysqlErr.Message != "Could not find first log file name in binary log index file" {
return ev, err
}
s.canal.cfg.Logger.Info("Could not find first log, try to download the local binlog for retry")
// local binlog need next position to find binlog file and begin event
pos := s.canal.master.Position()
newStreamer := s.newLocalBinFileStreamer(s.binFileDownloader, pos)
s.BinlogStreamer = newStreamer
return newStreamer.GetEvent(ctx)
}
func (s *localBinFileAdapterStreamer) newLocalBinFileStreamer(download BinlogFileDownloader, position mysql.Position) *replication.BinlogStreamer {
streamer := replication.NewBinlogStreamer()
binFilePath, err := download(position)
if err != nil {
streamer.CloseWithError(errors.New("local binlog file not exist"))
return streamer
}
go func(binFilePath string, streamer *replication.BinlogStreamer) {
beginFromHere := false
err := s.canal.syncer.GetBinlogParser().ParseFile(binFilePath, 0, func(be *replication.BinlogEvent) error {
if position.Pos < 4 { // binlog first pos is 4, if pos < 4 means canal gives error position info
return nil
}
if be.Header.LogPos == position.Pos || position.Pos == 4 { // go ahead to check if begin
beginFromHere = true
}
if beginFromHere {
if err := s.canal.syncer.StorePosAndGTID(be); err != nil {
streamer.CloseWithError(err)
return nil
}
streamer.PutEvent(be)
}
return nil
})
if err != nil {
streamer.CloseWithError(err)
}
}(binFilePath, streamer)
return streamer
}