Skip to content

Commit f0df38a

Browse files
authored
fix: (*RowsEvent).handleUnsigned() panic (#856)
* fix: (*RowsEvent).handleUnsigned() panic * add unit test: TestRowsEvent_handleUnsigned() * import require
1 parent 2199dfb commit f0df38a

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

canal/rows.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ func (r *RowsEvent) handleUnsigned() {
5353

5454
for i := 0; i < len(r.Rows); i++ {
5555
for _, columnIdx := range r.Table.UnsignedColumns {
56+
// When Canal.delay is big, after call Canal.StartFromGTID(),
57+
// we will get the newest table schema (for example, after DDL "alter table add column xxx unsigned..."),
58+
// but the binlog data can be very old (before DDL "alter table add column xxx unsigned..."),
59+
// results in max(columnIdx) >= len(r.Rows[i]), then r.Rows[i][columnIdx] panic.
60+
if columnIdx >= len(r.Rows[i]) {
61+
continue
62+
}
5663
switch value := r.Rows[i][columnIdx].(type) {
5764
case int8:
5865
r.Rows[i][columnIdx] = uint8(value)

canal/rows_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package canal
2+
3+
import (
4+
"testing"
5+
6+
"github.com/go-mysql-org/go-mysql/replication"
7+
"github.com/go-mysql-org/go-mysql/schema"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestRowsEvent_handleUnsigned(t *testing.T) {
12+
type fields struct {
13+
Table *schema.Table
14+
Action string
15+
Rows [][]interface{}
16+
Header *replication.EventHeader
17+
}
18+
tests := []struct {
19+
name string
20+
fields fields
21+
wantRows [][]interface{}
22+
}{
23+
{
24+
name: "rows_event_handle_unsigned",
25+
fields: fields{
26+
Table: &schema.Table{
27+
// columns 1,3,5,7,9 should be converted from signed to unsigned,
28+
// column 10 is out of range and should be ignored, don't panic.
29+
UnsignedColumns: []int{1, 3, 5, 7, 9, 10},
30+
},
31+
Rows: [][]interface{}{{
32+
int8(8), int8(8),
33+
int16(16), int16(16),
34+
int32(32), int32(32),
35+
int64(64), int64(64),
36+
int(128), int(128)},
37+
},
38+
},
39+
wantRows: [][]interface{}{{
40+
int8(8), uint8(8),
41+
int16(16), uint16(16),
42+
int32(32), uint32(32),
43+
int64(64), uint64(64),
44+
int(128), uint(128)},
45+
},
46+
},
47+
}
48+
for _, tt := range tests {
49+
t.Run(tt.name, func(t *testing.T) {
50+
r := &RowsEvent{
51+
Table: tt.fields.Table,
52+
Action: tt.fields.Action,
53+
Rows: tt.fields.Rows,
54+
Header: tt.fields.Header,
55+
}
56+
r.handleUnsigned()
57+
require.Equal(t, tt.fields.Rows, tt.wantRows)
58+
})
59+
}
60+
}

0 commit comments

Comments
 (0)