Skip to content

fix golint #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 64 additions & 42 deletions elastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package elastic

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"crypto/tls"

"github.com/juju/errors"
)

// Client is the client to communicate with ES.
// Although there are many Elasticsearch clients with Go, I still want to implement one by myself.
// Because we only need some very simple usages.
type Client struct {
Expand All @@ -23,21 +24,23 @@ type Client struct {
c *http.Client
}

// ClientConfig is the configuration for the client.
type ClientConfig struct {
Https bool
HTTPS bool
Addr string
User string
Password string
}

// NewClient creates the Cient with configuration.
func NewClient(conf *ClientConfig) *Client {
c := new(Client)

c.Addr = conf.Addr
c.User = conf.User
c.Password = conf.Password

if conf.Https {
if conf.HTTPS {
c.Protocol = "https"
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Expand All @@ -51,6 +54,7 @@ func NewClient(conf *ClientConfig) *Client {
return c
}

// ResponseItem is the ES item in the response.
type ResponseItem struct {
ID string `json:"_id"`
Index string `json:"_index"`
Expand All @@ -60,6 +64,7 @@ type ResponseItem struct {
Source map[string]interface{} `json:"_source"`
}

// Response is the ES response
type Response struct {
Code int
ResponseItem
Expand All @@ -73,12 +78,13 @@ const (
ActionIndex = "index"
)

// BulkRequest is used to send multi request in batch.
type BulkRequest struct {
Action string
Index string
Type string
ID string
Parent string
Action string
Index string
Type string
ID string
Parent string
Pipeline string

Data map[string]interface{}
Expand Down Expand Up @@ -142,6 +148,7 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error {
return nil
}

// BulkResponse is the response for the bulk request.
type BulkResponse struct {
Code int
Took int `json:"took"`
Expand All @@ -150,6 +157,7 @@ type BulkResponse struct {
Items []map[string]*BulkResponseItem `json:"items"`
}

// BulkResponseItem is the item in the bulk response.
type BulkResponseItem struct {
Index string `json:"_index"`
Type string `json:"_type"`
Expand All @@ -160,20 +168,23 @@ type BulkResponseItem struct {
Found bool `json:"found"`
}

// MappingResponse is the response for the mapping request.
type MappingResponse struct {
Code int
Code int
Mapping Mapping
}

// Mapping represents ES mapping.
type Mapping map[string]struct {
Mappings map[string]struct {
Properties map[string]struct {
Type string `json:"type"`
Fields interface{} `json:"fields"`
Type string `json:"type"`
Fields interface{} `json:"fields"`
} `json:"properties"`
} `json:"mappings"`
}

// DoRequest sends a request with body to ES.
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
req, err := http.NewRequest(method, url, body)
req.Header.Add("Content-Type", "application/json")
Expand All @@ -188,6 +199,7 @@ func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http
return resp, err
}

// Do sends the request with body to ES.
func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) {
bodyData, err := json.Marshal(body)
if err != nil {
Expand Down Expand Up @@ -221,6 +233,7 @@ func (c *Client) Do(method string, url string, body map[string]interface{}) (*Re
return ret, errors.Trace(err)
}

// DoBulk sends the bulk request to the ES.
func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) {
var buf bytes.Buffer

Expand Down Expand Up @@ -252,18 +265,19 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error)
return ret, errors.Trace(err)
}

// CreateMapping creates a ES mapping.
func (c *Client) CreateMapping(index string, docType string, mapping map[string]interface{}) error {
reqUrl := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index))

r, err := c.Do("HEAD", reqUrl, nil)
r, err := c.Do("HEAD", reqURL, nil)
if err != nil {
return errors.Trace(err)
}

// if index doesn't exist, will get 404 not found, create index first
if r.Code == http.StatusNotFound {
_, err = c.Do("PUT", reqUrl, nil)
_, err = c.Do("PUT", reqURL, nil)

if err != nil {
return errors.Trace(err)
Expand All @@ -272,20 +286,21 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string]
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

reqUrl = fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr,
reqURL = fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType))

_, err = c.Do("POST", reqUrl, mapping)
_, err = c.Do("POST", reqURL, mapping)
return errors.Trace(err)
}

func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error){
reqUrl := fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr,
// GetMapping gets the mapping.
func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error) {
reqURL := fmt.Sprintf("%s://%s/%s/%s/_mapping", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType))
buf := bytes.NewBuffer(nil)
resp, err := c.DoRequest("GET", reqUrl, buf)
resp, err := c.DoRequest("GET", reqURL, buf)

if err != nil {
return nil, errors.Trace(err)
Expand All @@ -308,100 +323,107 @@ func (c *Client) GetMapping(index string, docType string) (*MappingResponse, err
return ret, errors.Trace(err)
}

// DeleteIndex deletes the index.
func (c *Client) DeleteIndex(index string) error {
reqUrl := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index))

r, err := c.Do("DELETE", reqUrl, nil)
r, err := c.Do("DELETE", reqURL, nil)
if err != nil {
return errors.Trace(err)
}

if r.Code == http.StatusOK || r.Code == http.StatusNotFound {
return nil
} else {
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

// Get gets the item by id.
func (c *Client) Get(index string, docType string, id string) (*Response, error) {
reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id))

return c.Do("GET", reqUrl, nil)
return c.Do("GET", reqURL, nil)
}

// Can use Update to create or update the data
// Update creates or updates the data
func (c *Client) Update(index string, docType string, id string, data map[string]interface{}) error {
reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id))

r, err := c.Do("PUT", reqUrl, data)
r, err := c.Do("PUT", reqURL, data)
if err != nil {
return errors.Trace(err)
}

if r.Code == http.StatusOK || r.Code == http.StatusCreated {
return nil
} else {
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

// Exists checks whether id exists or not.
func (c *Client) Exists(index string, docType string, id string) (bool, error) {
reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id))

r, err := c.Do("HEAD", reqUrl, nil)
r, err := c.Do("HEAD", reqURL, nil)
if err != nil {
return false, err
}

return r.Code == http.StatusOK, nil
}

// Delete deletes the item by id.
func (c *Client) Delete(index string, docType string, id string) error {
reqUrl := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/%s/%s", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id))

r, err := c.Do("DELETE", reqUrl, nil)
r, err := c.Do("DELETE", reqURL, nil)
if err != nil {
return errors.Trace(err)
}

if r.Code == http.StatusOK || r.Code == http.StatusNotFound {
return nil
} else {
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

// Bulk sends the bulk request.
// only support parent in 'Bulk' related apis
func (c *Client) Bulk(items []*BulkRequest) (*BulkResponse, error) {
reqUrl := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr)
reqURL := fmt.Sprintf("%s://%s/_bulk", c.Protocol, c.Addr)

return c.DoBulk(reqUrl, items)
return c.DoBulk(reqURL, items)
}

// IndexBulk sends the bulk request for index.
func (c *Client) IndexBulk(index string, items []*BulkRequest) (*BulkResponse, error) {
reqUrl := fmt.Sprintf("%s://%s/%s/_bulk", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/_bulk", c.Protocol, c.Addr,
url.QueryEscape(index))

return c.DoBulk(reqUrl, items)
return c.DoBulk(reqURL, items)
}

// IndexTypeBulk sends the bulk request for index and doc type.
func (c *Client) IndexTypeBulk(index string, docType string, items []*BulkRequest) (*BulkResponse, error) {
reqUrl := fmt.Sprintf("%s://%s/%s/%s/_bulk", c.Protocol, c.Addr,
reqURL := fmt.Sprintf("%s://%s/%s/%s/_bulk", c.Protocol, c.Addr,
url.QueryEscape(index),
url.QueryEscape(docType))

return c.DoBulk(reqUrl, items)
return c.DoBulk(reqURL, items)
}
6 changes: 6 additions & 0 deletions river/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"github.com/juju/errors"
)

// SourceConfig is the configs for source
type SourceConfig struct {
Schema string `toml:"schema"`
Tables []string `toml:"tables"`
}

// Config is the configuration
type Config struct {
MyAddr string `toml:"my_addr"`
MyUser string `toml:"my_user"`
Expand Down Expand Up @@ -44,6 +46,7 @@ type Config struct {
SkipNoPkTable bool `toml:"skip_no_pk_table"`
}

// NewConfigWithFile creates a Config from file.
func NewConfigWithFile(name string) (*Config, error) {
data, err := ioutil.ReadFile(name)
if err != nil {
Expand All @@ -53,6 +56,7 @@ func NewConfigWithFile(name string) (*Config, error) {
return NewConfig(string(data))
}

// NewConfig creates a Config from data.
func NewConfig(data string) (*Config, error) {
var c Config

Expand All @@ -64,10 +68,12 @@ func NewConfig(data string) (*Config, error) {
return &c, nil
}

// TomlDuration supports time codec for TOML format.
type TomlDuration struct {
time.Duration
}

// UnmarshalText implementes TOML UnmarshalText
func (d *TomlDuration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
Expand Down
4 changes: 2 additions & 2 deletions river/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (m *masterInfo) Position() mysql.Position {
defer m.RUnlock()

return mysql.Position{
m.Name,
m.Pos,
Name: m.Name,
Pos: m.Pos,
}
}

Expand Down
Loading