Skip to content

Commit dd15a3c

Browse files
author
Fu Wenhui
committed
changes from dtle go-mysql-org#477 by 790493303
1 parent 60acd9c commit dd15a3c

File tree

4 files changed

+23
-8
lines changed

4 files changed

+23
-8
lines changed

replication/binlogstreamer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package replication
33
import (
44
"context"
55
"github.com/pingcap/errors"
6+
"github.com/opentracing/opentracing-go"
67
"github.com/siddontang/go-log/log"
78
"time"
89
)
@@ -28,6 +29,10 @@ func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) {
2829

2930
select {
3031
case c := <-s.ch:
32+
span := opentracing.StartSpan("send binlogEvent from go-mysql", opentracing.FollowsFrom(c.SpanContest))
33+
span.SetTag("send event from go mysql time ", time.Now().Unix())
34+
c.SpanContest = span.Context()
35+
span.Finish()
3136
return c, nil
3237
case s.err = <-s.ech:
3338
return nil, s.err
@@ -56,7 +61,7 @@ func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime ti
5661
}
5762
}
5863

59-
// DumpEvents dumps all left events
64+
// DumpEvents dumps all left eventsmax_payload
6065
func (s *BinlogStreamer) DumpEvents() []*BinlogEvent {
6166
count := len(s.ch)
6267
events := make([]*BinlogEvent, 0, count)

replication/binlogsyncer.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/pingcap/errors"
15+
"github.com/opentracing/opentracing-go"
1516
uuid "github.com/satori/go.uuid"
1617
"github.com/siddontang/go-log/log"
1718
"github.com/siddontang/go-mysql/client"
@@ -634,7 +635,10 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
634635
}()
635636

636637
for {
638+
span := opentracing.StartSpan("data source: get incremental data from ReadPacket()")
639+
span.SetTag("before get incremental data time:", time.Now().Unix())
637640
data, err := b.c.ReadPacket()
641+
span.SetTag("after get incremental data time:", time.Now().Unix())
638642
if err != nil {
639643
log.Error(err)
640644

@@ -671,7 +675,6 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
671675
// we connect the server and begin to re-sync again.
672676
continue
673677
}
674-
675678
//set read timeout
676679
if b.cfg.ReadTimeout > 0 {
677680
b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
@@ -682,7 +685,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
682685

683686
switch data[0] {
684687
case OK_HEADER:
685-
if err = b.parseEvent(s, data); err != nil {
688+
if err = b.parseEvent(span.Context(), s, data); err != nil {
686689
s.closeWithError(err)
687690
return
688691
}
@@ -701,13 +704,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
701704
log.Errorf("invalid stream header %c", data[0])
702705
continue
703706
}
707+
span.Finish()
704708
}
705709
}
706710

707-
func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
711+
func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *BinlogStreamer, data []byte) error {
708712
//skip OK byte, 0x00
709713
data = data[1:]
710-
714+
span := opentracing.GlobalTracer().StartSpan(" incremental data are conversion to BinlogEvent", opentracing.ChildOf(spanContext))
715+
span.SetTag("time", time.Now().Unix())
716+
defer span.Finish()
711717
needACK := false
712718
if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) {
713719
needACK = (data[1] == 0x01)
@@ -716,6 +722,8 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
716722
}
717723

718724
e, err := b.parser.Parse(data)
725+
e.SpanContest = span.Context()
726+
span.SetTag("tx timestap", e.Header.Timestamp)
719727
if err != nil {
720728
return errors.Trace(err)
721729
}

replication/event.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"unicode"
1111

1212
"github.com/pingcap/errors"
13+
"github.com/opentracing/opentracing-go"
1314
"github.com/satori/go.uuid"
1415
. "github.com/siddontang/go-mysql/mysql"
1516
)
@@ -26,8 +27,9 @@ type BinlogEvent struct {
2627
// raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
2728
RawData []byte
2829

29-
Header *EventHeader
30-
Event Event
30+
Header *EventHeader
31+
Event Event
32+
SpanContest opentracing.SpanContext
3133
}
3234

3335
func (e *BinlogEvent) Dump(w io.Writer) {

replication/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
324324
return nil, err
325325
}
326326

327-
return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil
327+
return &BinlogEvent{RawData: rawData, Header: h, Event: e, SpanContest: nil}, nil
328328
}
329329

330330
func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {

0 commit comments

Comments
 (0)