diff --git a/elastic/client.go b/elastic/client.go index 8a8ed581..99e40878 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -2,16 +2,17 @@ package elastic import ( "bytes" + "crypto/tls" "encoding/json" "fmt" "io/ioutil" "net/http" "net/url" - "crypto/tls" "github.com/juju/errors" ) +// Client is the client to communicate with ES. // Although there are many Elasticsearch clients with Go, I still want to implement one by myself. // Because we only need some very simple usages. type Client struct { @@ -23,13 +24,15 @@ type Client struct { c *http.Client } +// ClientConfig is the configuration for the client. type ClientConfig struct { - Https bool + HTTPS bool Addr string User string Password string } +// NewClient creates the Cient with configuration. func NewClient(conf *ClientConfig) *Client { c := new(Client) @@ -37,7 +40,7 @@ func NewClient(conf *ClientConfig) *Client { c.User = conf.User c.Password = conf.Password - if conf.Https { + if conf.HTTPS { c.Protocol = "https" tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -51,6 +54,7 @@ func NewClient(conf *ClientConfig) *Client { return c } +// ResponseItem is the ES item in the response. type ResponseItem struct { ID string `json:"_id"` Index string `json:"_index"` @@ -60,6 +64,7 @@ type ResponseItem struct { Source map[string]interface{} `json:"_source"` } +// Response is the ES response type Response struct { Code int ResponseItem @@ -73,12 +78,13 @@ const ( ActionIndex = "index" ) +// BulkRequest is used to send multi request in batch. type BulkRequest struct { - Action string - Index string - Type string - ID string - Parent string + Action string + Index string + Type string + ID string + Parent string Pipeline string Data map[string]interface{} @@ -142,6 +148,7 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error { return nil } +// BulkResponse is the response for the bulk request. type BulkResponse struct { Code int Took int `json:"took"` @@ -150,6 +157,7 @@ type BulkResponse struct { Items []map[string]*BulkResponseItem `json:"items"` } +// BulkResponseItem is the item in the bulk response. type BulkResponseItem struct { Index string `json:"_index"` Type string `json:"_type"` @@ -160,20 +168,23 @@ type BulkResponseItem struct { Found bool `json:"found"` } +// MappingResponse is the response for the mapping request. type MappingResponse struct { - Code int + Code int Mapping Mapping } +// Mapping represents ES mapping. type Mapping map[string]struct { Mappings map[string]struct { Properties map[string]struct { - Type string `json:"type"` - Fields interface{} `json:"fields"` + Type string `json:"type"` + Fields interface{} `json:"fields"` } `json:"properties"` } `json:"mappings"` } +// DoRequest sends a request with body to ES. func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { req, err := http.NewRequest(method, url, body) req.Header.Add("Content-Type", "application/json") @@ -188,6 +199,7 @@ func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http return resp, err } +// Do sends the request with body to ES. func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) { bodyData, err := json.Marshal(body) if err != nil { @@ -221,6 +233,7 @@ func (c *Client) Do(method string, url string, body map[string]interface{}) (*Re return ret, errors.Trace(err) } +// DoBulk sends the bulk request to the ES. func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) { var buf bytes.Buffer @@ -252,18 +265,19 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) return ret, errors.Trace(err) } +// CreateMapping creates a ES mapping. func (c *Client) CreateMapping(index string, docType string, mapping map[string]interface{}) error { - reqUrl := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr, url.QueryEscape(index)) - r, err := c.Do("HEAD", reqUrl, nil) + r, err := c.Do("HEAD", reqURL, nil) if err != nil { return errors.Trace(err) } // if index doesn't exist, will get 404 not found, create index first if r.Code == http.StatusNotFound { - _, err = c.Do("PUT", reqUrl, nil) + _, err = c.Do("PUT", reqURL, nil) if err != nil { return errors.Trace(err) @@ -272,20 +286,21 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string] return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } - reqUrl = fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr, + reqURL = fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType)) - _, err = c.Do("POST", reqUrl, mapping) + _, err = c.Do("POST", reqURL, mapping) return errors.Trace(err) } -func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error){ - reqUrl := fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr, +// GetMapping gets the mapping. +func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error) { + reqURL := fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType)) buf := bytes.NewBuffer(nil) - resp, err := c.DoRequest("GET", reqUrl, buf) + resp, err := c.DoRequest("GET", reqURL, buf) if err != nil { return nil, errors.Trace(err) @@ -308,57 +323,60 @@ func (c *Client) GetMapping(index string, docType string) (*MappingResponse, err return ret, errors.Trace(err) } +// DeleteIndex deletes the index. func (c *Client) DeleteIndex(index string) error { - reqUrl := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr, url.QueryEscape(index)) - r, err := c.Do("DELETE", reqUrl, nil) + r, err := c.Do("DELETE", reqURL, nil) if err != nil { return errors.Trace(err) } if r.Code == http.StatusOK || r.Code == http.StatusNotFound { return nil - } else { - return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } + + return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } +// Get gets the item by id. func (c *Client) Get(index string, docType string, id string) (*Response, error) { - reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType), url.QueryEscape(id)) - return c.Do("GET", reqUrl, nil) + return c.Do("GET", reqURL, nil) } -// Can use Update to create or update the data +// Update creates or updates the data func (c *Client) Update(index string, docType string, id string, data map[string]interface{}) error { - reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType), url.QueryEscape(id)) - r, err := c.Do("PUT", reqUrl, data) + r, err := c.Do("PUT", reqURL, data) if err != nil { return errors.Trace(err) } if r.Code == http.StatusOK || r.Code == http.StatusCreated { return nil - } else { - return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } + + return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } +// Exists checks whether id exists or not. func (c *Client) Exists(index string, docType string, id string) (bool, error) { - reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType), url.QueryEscape(id)) - r, err := c.Do("HEAD", reqUrl, nil) + r, err := c.Do("HEAD", reqURL, nil) if err != nil { return false, err } @@ -366,42 +384,46 @@ func (c *Client) Exists(index string, docType string, id string) (bool, error) { return r.Code == http.StatusOK, nil } +// Delete deletes the item by id. func (c *Client) Delete(index string, docType string, id string) error { - reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType), url.QueryEscape(id)) - r, err := c.Do("DELETE", reqUrl, nil) + r, err := c.Do("DELETE", reqURL, nil) if err != nil { return errors.Trace(err) } if r.Code == http.StatusOK || r.Code == http.StatusNotFound { return nil - } else { - return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } + + return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } +// Bulk sends the bulk request. // only support parent in 'Bulk' related apis func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) { - reqUrl := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr) + reqURL := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr) - return c.DoBulk(reqUrl, items) + return c.DoBulk(reqURL, items) } +// IndexBulk sends the bulk request for index. func (c *Client) IndexBulk(index string, items []*BulkRequest) (*BulkResponse, error) { - reqUrl := fmt.Sprintf("%s://%s/%s/_bulk", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/_bulk", c.Protocol, c.Addr, url.QueryEscape(index)) - return c.DoBulk(reqUrl, items) + return c.DoBulk(reqURL, items) } +// IndexTypeBulk sends the bulk request for index and doc type. func (c *Client) IndexTypeBulk(index string, docType string, items []*BulkRequest) (*BulkResponse, error) { - reqUrl := fmt.Sprintf("%s://%s/%s/%s/_bulk", c.Protocol, c.Addr, + reqURL := fmt.Sprintf("%s://%s/%s/%s/_bulk", c.Protocol, c.Addr, url.QueryEscape(index), url.QueryEscape(docType)) - return c.DoBulk(reqUrl, items) + return c.DoBulk(reqURL, items) } diff --git a/river/config.go b/river/config.go index d6b354b5..61050a90 100644 --- a/river/config.go +++ b/river/config.go @@ -8,11 +8,13 @@ import ( "github.com/juju/errors" ) +// SourceConfig is the configs for source type SourceConfig struct { Schema string `toml:"schema"` Tables []string `toml:"tables"` } +// Config is the configuration type Config struct { MyAddr string `toml:"my_addr"` MyUser string `toml:"my_user"` @@ -44,6 +46,7 @@ type Config struct { SkipNoPkTable bool `toml:"skip_no_pk_table"` } +// NewConfigWithFile creates a Config from file. func NewConfigWithFile(name string) (*Config, error) { data, err := ioutil.ReadFile(name) if err != nil { @@ -53,6 +56,7 @@ func NewConfigWithFile(name string) (*Config, error) { return NewConfig(string(data)) } +// NewConfig creates a Config from data. func NewConfig(data string) (*Config, error) { var c Config @@ -64,10 +68,12 @@ func NewConfig(data string) (*Config, error) { return &c, nil } +// TomlDuration supports time codec for TOML format. type TomlDuration struct { time.Duration } +// UnmarshalText implementes TOML UnmarshalText func (d *TomlDuration) UnmarshalText(text []byte) error { var err error d.Duration, err = time.ParseDuration(string(text)) diff --git a/river/master.go b/river/master.go index fe10b577..d409d18c 100644 --- a/river/master.go +++ b/river/master.go @@ -87,8 +87,8 @@ func (m *masterInfo) Position() mysql.Position { defer m.RUnlock() return mysql.Position{ - m.Name, - m.Pos, + Name: m.Name, + Pos: m.Pos, } } diff --git a/river/river.go b/river/river.go index 6de73520..2ab8c0d1 100644 --- a/river/river.go +++ b/river/river.go @@ -13,9 +13,10 @@ import ( "gopkg.in/birkirb/loggers.v1/log" ) +// ErrRuleNotExist is the error if rule is not defined. var ErrRuleNotExist = errors.New("rule is not exist") -// In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch. +// River is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch. // We use this definition here too, although it may not run within Elasticsearch. // Maybe later I can implement a acutal river in Elasticsearch, but I must learn java. :-) type River struct { @@ -39,6 +40,7 @@ type River struct { syncCh chan interface{} } +// NewRiver creates the River from config func NewRiver(c *Config) (*River, error) { r := new(River) @@ -73,7 +75,7 @@ func NewRiver(c *Config) (*River, error) { cfg.Addr = r.c.ESAddr cfg.User = r.c.ESUser cfg.Password = r.c.ESPassword - cfg.Https = r.c.ESHttps + cfg.HTTPS = r.c.ESHttps r.es = elastic.NewClient(cfg) r.st = &stat{r: r} @@ -122,7 +124,7 @@ func (r *River) prepareCanal() error { } else { // many dbs, can only assign databases to dump keys := make([]string, 0, len(dbs)) - for key, _ := range dbs { + for key := range dbs { keys = append(keys, key) } @@ -271,9 +273,9 @@ func (r *River) prepareRule() error { if len(rule.TableInfo.PKColumns) == 0 { if !r.c.SkipNoPkTable { return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) - } else { - log.Errorf("ignored table without a primary key: %s\n", rule.TableInfo.Name) } + + log.Errorf("ignored table without a primary key: %s\n", rule.TableInfo.Name) } else { rules[key] = rule } @@ -301,10 +303,12 @@ func (r *River) Run() error { return nil } +// Ctx returns the internal context for outside use. func (r *River) Ctx() context.Context { return r.ctx } +// Close closes the River func (r *River) Close() { log.Infof("closing river") diff --git a/river/river_extra_test.go b/river/river_extra_test.go index 74b892d1..a3a35494 100644 --- a/river/river_extra_test.go +++ b/river/river_extra_test.go @@ -34,10 +34,10 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) { s.testExecute(c, fmt.Sprintf(schema, "test_river_parent")) cfg := new(Config) - cfg.MyAddr = *my_addr + cfg.MyAddr = *myAddr cfg.MyUser = "root" cfg.MyPassword = "" - cfg.ESAddr = *es_addr + cfg.ESAddr = *esAddr cfg.ServerID = 1001 cfg.Flavor = "mysql" @@ -90,13 +90,13 @@ func (s *riverTestSuite) testElasticExtraExists(c *C, id string, parent string, index := "river" docType := "river_extra" - reqUrl := fmt.Sprintf("http://%s/%s/%s/%s?parent=%s", s.r.es.Addr, + reqURL := fmt.Sprintf("http://%s/%s/%s/%s?parent=%s", s.r.es.Addr, url.QueryEscape(index), url.QueryEscape(docType), url.QueryEscape(id), url.QueryEscape(parent)) - r, err := s.r.es.Do("HEAD", reqUrl, nil) + r, err := s.r.es.Do("HEAD", reqURL, nil) c.Assert(err, IsNil) if exist { diff --git a/river/river_test.go b/river/river_test.go index e7686dae..0e0bcaa3 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -13,8 +13,8 @@ import ( "github.com/siddontang/go-mysql/mysql" ) -var my_addr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr") -var es_addr = flag.String("es_addr", "127.0.0.1:9200", "Elasticsearch addr") +var myAddr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr") +var esAddr = flag.String("es_addr", "127.0.0.1:9200", "Elasticsearch addr") func Test(t *testing.T) { TestingT(t) @@ -29,7 +29,7 @@ var _ = Suite(&riverTestSuite{}) func (s *riverTestSuite) SetUpSuite(c *C) { var err error - s.c, err = client.Connect(*my_addr, "root", "", "test") + s.c, err = client.Connect(*myAddr, "root", "", "test") c.Assert(err, IsNil) s.testExecute(c, "SET SESSION binlog_format = 'ROW'") @@ -48,7 +48,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { PRIMARY KEY(id)) ENGINE=INNODB; ` - schema_json := ` + schemaJSON := ` CREATE TABLE IF NOT EXISTS %s ( id INT, info JSON, @@ -60,7 +60,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { s.testExecute(c, "DROP TABLE IF EXISTS test_for_json") s.testExecute(c, fmt.Sprintf(schema, "test_river")) s.testExecute(c, fmt.Sprintf(schema, "test_for_id")) - s.testExecute(c, fmt.Sprintf(schema_json, "test_for_json")) + s.testExecute(c, fmt.Sprintf(schemaJSON, "test_for_json")) for i := 0; i < 10; i++ { table := fmt.Sprintf("test_river_%04d", i) @@ -69,11 +69,11 @@ func (s *riverTestSuite) SetUpSuite(c *C) { } cfg := new(Config) - cfg.MyAddr = *my_addr + cfg.MyAddr = *myAddr cfg.MyUser = "root" cfg.MyPassword = "" cfg.MyCharset = "utf8" - cfg.ESAddr = *es_addr + cfg.ESAddr = *esAddr cfg.ServerID = 1001 cfg.Flavor = "mysql" diff --git a/river/rule.go b/river/rule.go index 828d35ad..fb7cccb7 100644 --- a/river/rule.go +++ b/river/rule.go @@ -6,6 +6,7 @@ import ( "github.com/siddontang/go-mysql/schema" ) +// Rule is the rule for how to sync data from MySQL to ES. // If you want to sync MySQL data into elasticsearch, you must set a rule to let use know how to do it. // The mapping rule may thi: schema + table <-> index + document type. // schema and table is for MySQL, index and document type is for Elasticsearch. @@ -69,6 +70,7 @@ func (r *Rule) prepare() error { return nil } +// CheckFilter checkers whether the field needs to be filtered. func (r *Rule) CheckFilter(field string) bool { if r.Filter == nil { return true diff --git a/river/sync.go b/river/sync.go index 823350ee..ab026029 100644 --- a/river/sync.go +++ b/river/sync.go @@ -41,8 +41,8 @@ type eventHandler struct { func (h *eventHandler) OnRotate(e *replication.RotateEvent) error { pos := mysql.Position{ - string(e.NextLogName), - uint32(e.Position), + Name: string(e.NextLogName), + Pos: uint32(e.Position), } h.r.syncCh <- posSaver{pos, true}