diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 61254fba0..72bc7ddd0 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -85,9 +85,17 @@ func (s *BinlogStreamer) closeWithError(err error) { } func NewBinlogStreamer() *BinlogStreamer { + return NewBinlogStreamerWithChanSize(10240) +} + +func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer { s := new(BinlogStreamer) - s.ch = make(chan *BinlogEvent, 10240) + if chanSize <= 0 { + chanSize = 10240 + } + + s.ch = make(chan *BinlogEvent, chanSize) s.ech = make(chan error, 4) return s diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 9c70740fc..a5ca96359 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -122,6 +122,8 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error DiscardGTIDSet bool + + EventCacheCount int } // BinlogSyncer syncs binlog event from server. @@ -166,6 +168,9 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { dialer := &net.Dialer{} cfg.Dialer = dialer.DialContext } + if cfg.EventCacheCount == 0 { + cfg.EventCacheCount = 10240 + } // Clear the Password to avoid outputing it in log. pass := cfg.Password @@ -393,7 +398,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := NewBinlogStreamer() + s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheCount) b.wg.Add(1) go b.onStream(s)