From e8ffe9476e044768b2388980351fa4a367e17dc9 Mon Sep 17 00:00:00 2001 From: alkaaf Date: Tue, 8 Aug 2023 11:03:08 +0700 Subject: [PATCH] add throttle to prevent cpu load --- canal/canal.go | 2 ++ canal/config.go | 4 ++++ replication/binlogsyncer.go | 10 ++++++++++ 3 files changed, 16 insertions(+) diff --git a/canal/canal.go b/canal/canal.go index 6c07bb741..ab919433b 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -451,6 +451,8 @@ func (c *Canal) prepareSyncer() error { Logger: c.cfg.Logger, Dialer: c.cfg.Dialer, Localhost: c.cfg.Localhost, + ThrottleCap: c.cfg.ThrottleCap, + ThrottleDuration: c.cfg.ThrottleDuration, } if strings.Contains(c.cfg.Addr, "/") { diff --git a/canal/config.go b/canal/config.go index 29c58f66b..589d0cdce 100644 --- a/canal/config.go +++ b/canal/config.go @@ -88,6 +88,10 @@ type Config struct { // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` + // do throttle when the event channel is more than this value. 0 means no throttle + ThrottleCap int `toml:"throttle_cap"` + // delay read for this duration when throttle is triggered + ThrottleDuration time.Duration `toml:"throttle_duration"` // Set TLS config TLSConfig *tls.Config diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 9c70740fc..8ad0023f3 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -122,6 +122,11 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error DiscardGTIDSet bool + + // do throttle when the event channel is more than this value. 0 means no throttle + ThrottleCap int + // delay read for this duration when throttle is triggered + ThrottleDuration time.Duration } // BinlogSyncer syncs binlog event from server. @@ -852,6 +857,11 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } needStop := false + // throttle the event, prevent cpu flood + if b.cfg.ThrottleCap > 0 && len(s.ch) >= b.cfg.ThrottleCap { + b.cfg.Logger.Infof(`throttling binlog read`) + time.Sleep(b.cfg.ThrottleDuration) + } select { case s.ch <- e: case <-b.ctx.Done():