diff --git a/replication/backup.go b/replication/backup.go index f1d7dd2..4056787 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -50,7 +50,7 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, b.parser.SetRawMode(true) // Set up the backup event handler - backupHandler := &BackupEventHandler{ + backupHandler := &backupEventHandler{ handler: handler, } @@ -69,21 +69,19 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, }() for { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(b.ctx, timeout) defer cancel() - select { - case <-ctx.Done(): + e, err := s.GetEvent(ctx) + if err == context.DeadlineExceeded { return nil - case <-b.ctx.Done(): - return nil - case err := <-s.ech: + } + if err != nil { + return errors.Trace(err) + } + err = backupHandler.HandleEvent(e) + if err != nil { return errors.Trace(err) - case e := <-s.ch: - err = backupHandler.HandleEvent(e) - if err != nil { - return errors.Trace(err) - } } } } @@ -122,8 +120,8 @@ func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) } } -// BackupEventHandler handles writing events for backup -type BackupEventHandler struct { +// backupEventHandler handles writing events for backup +type backupEventHandler struct { handler func(binlogFilename string) (io.WriteCloser, error) w io.WriteCloser mutex sync.Mutex @@ -131,14 +129,14 @@ type BackupEventHandler struct { filename string } -func NewBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *BackupEventHandler { - return &BackupEventHandler{ +func newBackupEventHandler(handlerFunction func(filename string) (io.WriteCloser, error)) *backupEventHandler { + return &backupEventHandler{ handler: handlerFunction, } } // HandleEvent processes a single event for the backup. -func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error { +func (h *backupEventHandler) HandleEvent(e *BinlogEvent) error { h.mutex.Lock() defer h.mutex.Unlock() diff --git a/replication/backup_test.go b/replication/backup_test.go index 769f61e..a5990f0 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -71,8 +71,8 @@ func testBackup(t *testSyncerSuite, isSynchronous bool) { timeout := 3 * time.Second if isSynchronous { - // Set up a BackupEventHandler for synchronous mode - backupHandler := NewBackupEventHandler( + // Set up a backupEventHandler for synchronous mode + backupHandler := newBackupEventHandler( func(filename string) (io.WriteCloser, error) { return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644) },