Skip to content

Commit 25c8dcc

Browse files
committed
Merge pull request go-mysql-org#32 from adriacidre/master
Map mysql fields to elasticsearch arrays
2 parents 6f01ccb + 792902b commit 25c8dcc

File tree

4 files changed

+107
-16
lines changed

4 files changed

+107
-16
lines changed

README.md

+28-2
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,38 @@ index = "t"
6565
type = "t"
6666
parent = "parent_id"
6767
68-
[rule.field]
69-
title = "my_title"
68+
[[rule.fields]]
69+
mysql = "title"
70+
elastic = "my_title"
7071
```
7172

7273
In the example above, we will use a new index and type both named "t" instead of default "t1", and use "my_title" instead of field name "title".
7374

75+
## Rule field types
76+
77+
In order to map a mysql column on different elasticsearch types you can define the field type as follows:
78+
79+
```
80+
[[rule]]
81+
schema = "test"
82+
table = "t1"
83+
index = "t"
84+
type = "t"
85+
parent = "parent_id"
86+
87+
[rule.field]
88+
// This will map column title to elastic search my_title
89+
title="my_title"
90+
91+
// This will map column title to elastic search my_title and use array type
92+
title="my_title,list"
93+
94+
// This will map column title to elastic search title and use array type
95+
title=",list"
96+
```
97+
98+
Modifier "list" will translates a mysql string field like "a,b,c" on an elastic array type '{"a", "b", "c"}' this is specially useful if you need to use those fields on filtering on elasticsearch.
99+
74100
## Wildcard table
75101

76102
go-mysql-elasticsearch only allows you determind which table to be synced, but sometimes, if you split a big table into multi sub tables, like 1024, table_0000, table_0001, ... table_1023, it is very hard to write rules for every table.

etc/river.toml

+9-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ type = "river"
4040

4141
# title is MySQL test_river field name, es_title is the customized name in Elasticsearch
4242
[rule.field]
43-
title = "es_title"
43+
# This will map column title to elastic search my_title
44+
title="es_title"
45+
# This will map column tags to elastic search my_tags and use array type
46+
tags="my_tags,list"
47+
# This will map column keywords to elastic search keywords and use array type
48+
keywords=",list"
4449

4550
# wildcard table rule, the wildcard table must be in source tables
4651
[[rule]]
@@ -50,7 +55,8 @@ index = "river"
5055
type = "river"
5156

5257
# title is MySQL test_river field name, es_title is the customized name in Elasticsearch
53-
[rule.field]
54-
title = "es_title"
58+
[[rule.fields]]
59+
mysql = "title"
60+
elastic = "es_title"
5561

5662

river/river_test.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
3737
id INT,
3838
title VARCHAR(256),
3939
content VARCHAR(256),
40+
mylist VARCHAR(256),
4041
tenum ENUM("e1", "e2", "e3"),
4142
tset SET("a", "b", "c"),
4243
PRIMARY KEY(id)) ENGINE=INNODB;
@@ -74,13 +75,16 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
7475
Table: "test_river",
7576
Index: "river",
7677
Type: "river",
77-
FieldMapping: map[string]string{"title": "es_title"}},
78+
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
79+
},
7880

7981
&Rule{Schema: "test",
8082
Table: "test_river_[0-9]{4}",
8183
Index: "river",
8284
Type: "river",
83-
FieldMapping: map[string]string{"title": "es_title"}}}
85+
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
86+
},
87+
}
8488

8589
s.r, err = NewRiver(cfg)
8690
c.Assert(err, IsNil)
@@ -123,6 +127,7 @@ parent = "pid"
123127
124128
[rule.field]
125129
title = "es_title"
130+
mylist = "es_mylist,list"
126131
127132
[[rule]]
128133
schema = "test"
@@ -132,6 +137,7 @@ type = "river"
132137
133138
[rule.field]
134139
title = "es_title"
140+
mylist = "es_mylist,list"
135141
136142
`
137143

@@ -196,7 +202,7 @@ func (s *riverTestSuite) TestRiver(c *C) {
196202
c.Assert(r.Source["es_title"], Equals, "abc")
197203
}
198204

199-
s.testExecute(c, "UPDATE test_river SET title = ?, tenum = ?, tset = ? WHERE id = ?", "second 2", "e3", "a,b,c", 2)
205+
s.testExecute(c, "UPDATE test_river SET title = ?, tenum = ?, tset = ?, mylist = ? WHERE id = ?", "second 2", "e3", "a,b,c", "a,b,c", 2)
200206
s.testExecute(c, "DELETE FROM test_river WHERE id = ?", 1)
201207
s.testExecute(c, "UPDATE test_river SET title = ?, id = ? WHERE id = ?", "second 30", 30, 3)
202208

@@ -221,6 +227,7 @@ func (s *riverTestSuite) TestRiver(c *C) {
221227
c.Assert(r.Source["es_title"], Equals, "second 2")
222228
c.Assert(r.Source["tenum"], Equals, "e3")
223229
c.Assert(r.Source["tset"], Equals, "a,b,c")
230+
c.Assert(r.Source["es_mylist"], DeepEquals, []interface{}{"a", "b", "c"})
224231

225232
r = s.testElasticGet(c, "4")
226233
c.Assert(r.Found, Equals, true)

river/sync.go

+60-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ const (
1818
syncUpdateDoc
1919
)
2020

21+
const (
22+
fieldTypeList = "list"
23+
)
24+
2125
type rowsEventHandler struct {
2226
r *River
2327
}
@@ -187,15 +191,46 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
187191
return value
188192
}
189193

194+
func (r *River) getFieldParts(k string, v string) (string, string, string) {
195+
composedField := strings.Split(v, ",")
196+
197+
mysql := k
198+
elastic := composedField[0]
199+
fieldType := ""
200+
201+
if 0 == len(elastic) {
202+
elastic = mysql
203+
}
204+
if 2 == len(composedField) {
205+
fieldType = composedField[1]
206+
}
207+
208+
return mysql, elastic, fieldType
209+
}
210+
190211
func (r *River) makeInsertReqData(req *elastic.BulkRequest, rule *Rule, values []interface{}) {
191212
req.Data = make(map[string]interface{}, len(values))
192213
req.Action = elastic.ActionIndex
193214

194215
for i, c := range rule.TableInfo.Columns {
195-
if name, ok := rule.FieldMapping[c.Name]; ok {
196-
// has custom field mapping
197-
req.Data[name] = r.makeReqColumnData(&c, values[i])
198-
} else {
216+
mapped := false
217+
for k, v := range rule.FieldMapping {
218+
mysql, elastic, fieldType := r.getFieldParts(k, v)
219+
if mysql == c.Name {
220+
mapped = true
221+
v := r.makeReqColumnData(&c, values[i])
222+
if fieldType == fieldTypeList {
223+
if str, ok := v.(string); ok {
224+
req.Data[elastic] = strings.Split(str, ",")
225+
} else {
226+
req.Data[elastic] = v
227+
}
228+
} else {
229+
req.Data[elastic] = v
230+
}
231+
}
232+
}
233+
if mapped == false {
199234
req.Data[c.Name] = r.makeReqColumnData(&c, values[i])
200235
}
201236
}
@@ -209,16 +244,33 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule,
209244
req.Action = elastic.ActionUpdate
210245

211246
for i, c := range rule.TableInfo.Columns {
247+
mapped := false
212248
if reflect.DeepEqual(beforeValues[i], afterValues[i]) {
213249
//nothing changed
214250
continue
215251
}
216-
if name, ok := rule.FieldMapping[c.Name]; ok {
217-
// has custom field mapping
218-
req.Data[name] = r.makeReqColumnData(&c, afterValues[i])
219-
} else {
252+
for k, v := range rule.FieldMapping {
253+
mysql, elastic, fieldType := r.getFieldParts(k, v)
254+
if mysql == c.Name {
255+
mapped = true
256+
// has custom field mapping
257+
v := r.makeReqColumnData(&c, afterValues[i])
258+
str, ok := v.(string)
259+
if ok == false {
260+
req.Data[c.Name] = v
261+
} else {
262+
if fieldType == fieldTypeList {
263+
req.Data[elastic] = strings.Split(str, ",")
264+
} else {
265+
req.Data[elastic] = str
266+
}
267+
}
268+
}
269+
}
270+
if mapped == false {
220271
req.Data[c.Name] = r.makeReqColumnData(&c, afterValues[i])
221272
}
273+
222274
}
223275
}
224276

0 commit comments

Comments
 (0)