Skip to content

Commit d238419

Browse files
spid37siddontang
authored andcommitted
added zero date check and date parser (#291)
1 parent d9d0404 commit d238419

File tree

2 files changed

+45
-14
lines changed

2 files changed

+45
-14
lines changed

river/river_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515

1616
var myAddr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr")
1717
var esAddr = flag.String("es_addr", "127.0.0.1:9200", "Elasticsearch addr")
18+
var dateTimeStr = time.Now().Format(mysql.TimeFormat)
19+
var dateStr = time.Now().Format(mysqlDateFormat)
1820

1921
func Test(t *testing.T) {
2022
TestingT(t)
@@ -36,17 +38,18 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
3638

3739
schema := `
3840
CREATE TABLE IF NOT EXISTS %s (
39-
id INT,
40-
title VARCHAR(256),
41-
content VARCHAR(256),
42-
mylist VARCHAR(256),
43-
mydate INT(10),
44-
tenum ENUM("e1", "e2", "e3"),
45-
tset SET("a", "b", "c"),
46-
tbit BIT(1) default 1,
47-
tdatetime DATETIME DEFAULT NULL,
48-
ip INT UNSIGNED DEFAULT 0,
49-
PRIMARY KEY(id)) ENGINE=INNODB;
41+
id INT,
42+
title VARCHAR(256),
43+
content VARCHAR(256),
44+
mylist VARCHAR(256),
45+
mydate INT(10),
46+
tenum ENUM("e1", "e2", "e3"),
47+
tset SET("a", "b", "c"),
48+
tbit BIT(1) default 1,
49+
tdatetime DATETIME DEFAULT NULL,
50+
tdate DATE DEFAULT NULL,
51+
ip INT UNSIGNED DEFAULT 0,
52+
PRIMARY KEY(id)) ENGINE=INNODB;
5053
`
5154

5255
schemaJSON := `
@@ -223,8 +226,10 @@ func (s *riverTestSuite) testPrepareData(c *C) {
223226
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
224227
}
225228

226-
datetime := time.Now().Format(mysql.TimeFormat)
227-
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate) VALUES (?, ?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", datetime, 1458131094)
229+
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate, tdate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", dateTimeStr, 1458131094, dateStr)
230+
231+
s.testExecute(c, "SET sql_mode = '';") // clear sql_mode to allow empty dates
232+
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime, mydate, tdate) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", 20, "test empty datetime", "date test 20", "e1", "a,b", "0000-00-00 00:00:00", 0, "0000-00-00")
228233

229234
// test ip
230235
s.testExecute(c, "INSERT test_river (id, ip) VALUES (?, ?)", 17, 0)
@@ -248,6 +253,7 @@ func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse {
248253
c.Assert(err, IsNil)
249254

250255
c.Assert(r.Mapping[index].Mappings[docType].Properties["tdatetime"].Type, Equals, "date")
256+
c.Assert(r.Mapping[index].Mappings[docType].Properties["tdate"].Type, Equals, "date")
251257
c.Assert(r.Mapping[index].Mappings[docType].Properties["mydate"].Type, Equals, "date")
252258
return r
253259
}
@@ -359,6 +365,17 @@ func (s *riverTestSuite) TestRiver(c *C) {
359365
c.Assert(r.Source["es_title"], Equals, "hello")
360366
}
361367

368+
r = s.testElasticGet(c, "16")
369+
c.Assert(r.Found, IsTrue)
370+
tdt, _ := time.Parse(time.RFC3339, r.Source["tdatetime"].(string))
371+
c.Assert(tdt.Format(mysql.TimeFormat), Equals, dateTimeStr)
372+
c.Assert(r.Source["tdate"], Equals, dateStr)
373+
374+
r = s.testElasticGet(c, "20")
375+
c.Assert(r.Found, IsTrue)
376+
c.Assert(r.Source["tdate"], Equals, nil)
377+
c.Assert(r.Source["tdatetime"], Equals, nil)
378+
362379
// test ip
363380
r = s.testElasticGet(c, "17")
364381
c.Assert(r.Found, IsTrue)

river/sync.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
fieldTypeDate = "date"
3131
)
3232

33+
const mysqlDateFormat = "2006-01-02"
34+
3335
type posSaver struct {
3436
pos mysql.Position
3537
force bool
@@ -333,9 +335,21 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
333335
case schema.TYPE_DATETIME, schema.TYPE_TIMESTAMP:
334336
switch v := value.(type) {
335337
case string:
336-
vt, _ := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
338+
vt, err := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
339+
if err != nil || vt.IsZero() { // failed to parse date or zero date
340+
return nil
341+
}
337342
return vt.Format(time.RFC3339)
338343
}
344+
case schema.TYPE_DATE:
345+
switch v := value.(type) {
346+
case string:
347+
vt, err := time.Parse(mysqlDateFormat, string(v))
348+
if err != nil || vt.IsZero() { // failed to parse date or zero date
349+
return nil
350+
}
351+
return vt.Format(mysqlDateFormat)
352+
}
339353
}
340354

341355
return value

0 commit comments

Comments
 (0)