Skip to content

Add EventCacheCount as member of BinlogSyncerConfig to limit streamer's event channel size #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,17 @@ func (s *BinlogStreamer) closeWithError(err error) {
}

func NewBinlogStreamer() *BinlogStreamer {
return NewBinlogStreamerWithChanSize(10240)
}

func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer {
s := new(BinlogStreamer)

s.ch = make(chan *BinlogEvent, 10240)
if chanSize <= 0 {
chanSize = 10240
}

s.ch = make(chan *BinlogEvent, chanSize)
s.ech = make(chan error, 4)

return s
Expand Down
7 changes: 6 additions & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type BinlogSyncerConfig struct {
RowsEventDecodeFunc func(*RowsEvent, []byte) error

DiscardGTIDSet bool

EventCacheSize int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "XXXSize" makes me think it's using the unit of memory, (EventCacheSize = 1024 means the cache will not exceed 1KB), maybe "EventCacheCount" is a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I'll work on the fixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly limiting memory usage requires dynamically calculating the memory occupancy of the Event Cache during the streaming process, which might lead to a decrease in overall efficiency.
In scenarios where the network conditions are not too poor, even setting the EventCacheCount to a single digit won't have much worse performance compared to 10240, while at the same time, the memory of the Event Cache will be limited to the size of a few Events, which is enough to achieve a balance between efficiency and resource occupancy.

}

// BinlogSyncer syncs binlog event from server.
Expand Down Expand Up @@ -166,6 +168,9 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
dialer := &net.Dialer{}
cfg.Dialer = dialer.DialContext
}
if cfg.EventCacheSize == 0 {
cfg.EventCacheSize = 10240
}

// Clear the Password to avoid outputing it in log.
pass := cfg.Password
Expand Down Expand Up @@ -393,7 +398,7 @@ func (b *BinlogSyncer) prepare() error {
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
b.running = true

s := NewBinlogStreamer()
s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheSize)

b.wg.Add(1)
go b.onStream(s)
Expand Down