@@ -2,16 +2,17 @@ package elastic
2
2
3
3
import (
4
4
"bytes"
5
+ "crypto/tls"
5
6
"encoding/json"
6
7
"fmt"
7
8
"io/ioutil"
8
9
"net/http"
9
10
"net/url"
10
- "crypto/tls"
11
11
12
12
"github.com/juju/errors"
13
13
)
14
14
15
+ // Client is the client to communicate with ES.
15
16
// Although there are many Elasticsearch clients with Go, I still want to implement one by myself.
16
17
// Because we only need some very simple usages.
17
18
type Client struct {
@@ -23,21 +24,23 @@ type Client struct {
23
24
c * http.Client
24
25
}
25
26
27
+ // ClientConfig is the configuration for the client.
26
28
type ClientConfig struct {
27
- Https bool
29
+ HTTPS bool
28
30
Addr string
29
31
User string
30
32
Password string
31
33
}
32
34
35
+ // NewClient creates the Cient with configuration.
33
36
func NewClient (conf * ClientConfig ) * Client {
34
37
c := new (Client )
35
38
36
39
c .Addr = conf .Addr
37
40
c .User = conf .User
38
41
c .Password = conf .Password
39
42
40
- if conf .Https {
43
+ if conf .HTTPS {
41
44
c .Protocol = "https"
42
45
tr := & http.Transport {
43
46
TLSClientConfig : & tls.Config {InsecureSkipVerify : true },
@@ -51,6 +54,7 @@ func NewClient(conf *ClientConfig) *Client {
51
54
return c
52
55
}
53
56
57
+ // ResponseItem is the ES item in the response.
54
58
type ResponseItem struct {
55
59
ID string `json:"_id"`
56
60
Index string `json:"_index"`
@@ -60,6 +64,7 @@ type ResponseItem struct {
60
64
Source map [string ]interface {} `json:"_source"`
61
65
}
62
66
67
+ // Response is the ES response
63
68
type Response struct {
64
69
Code int
65
70
ResponseItem
@@ -73,12 +78,13 @@ const (
73
78
ActionIndex = "index"
74
79
)
75
80
81
+ // BulkRequest is used to send multi request in batch.
76
82
type BulkRequest struct {
77
- Action string
78
- Index string
79
- Type string
80
- ID string
81
- Parent string
83
+ Action string
84
+ Index string
85
+ Type string
86
+ ID string
87
+ Parent string
82
88
Pipeline string
83
89
84
90
Data map [string ]interface {}
@@ -142,6 +148,7 @@ func (r *BulkRequest) bulk(buf *bytes.Buffer) error {
142
148
return nil
143
149
}
144
150
151
+ // BulkResponse is the response for the bulk request.
145
152
type BulkResponse struct {
146
153
Code int
147
154
Took int `json:"took"`
@@ -150,6 +157,7 @@ type BulkResponse struct {
150
157
Items []map [string ]* BulkResponseItem `json:"items"`
151
158
}
152
159
160
+ // BulkResponseItem is the item in the bulk response.
153
161
type BulkResponseItem struct {
154
162
Index string `json:"_index"`
155
163
Type string `json:"_type"`
@@ -160,20 +168,23 @@ type BulkResponseItem struct {
160
168
Found bool `json:"found"`
161
169
}
162
170
171
+ // MappingResponse is the response for the mapping request.
163
172
type MappingResponse struct {
164
- Code int
173
+ Code int
165
174
Mapping Mapping
166
175
}
167
176
177
+ // Mapping represents ES mapping.
168
178
type Mapping map [string ]struct {
169
179
Mappings map [string ]struct {
170
180
Properties map [string ]struct {
171
- Type string `json:"type"`
172
- Fields interface {} `json:"fields"`
181
+ Type string `json:"type"`
182
+ Fields interface {} `json:"fields"`
173
183
} `json:"properties"`
174
184
} `json:"mappings"`
175
185
}
176
186
187
+ // DoRequest sends a request with body to ES.
177
188
func (c * Client ) DoRequest (method string , url string , body * bytes.Buffer ) (* http.Response , error ) {
178
189
req , err := http .NewRequest (method , url , body )
179
190
req .Header .Add ("Content-Type" , "application/json" )
@@ -188,6 +199,7 @@ func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http
188
199
return resp , err
189
200
}
190
201
202
+ // Do sends the request with body to ES.
191
203
func (c * Client ) Do (method string , url string , body map [string ]interface {}) (* Response , error ) {
192
204
bodyData , err := json .Marshal (body )
193
205
if err != nil {
@@ -221,6 +233,7 @@ func (c *Client) Do(method string, url string, body map[string]interface{}) (*Re
221
233
return ret , errors .Trace (err )
222
234
}
223
235
236
+ // DoBulk sends the bulk request to the ES.
224
237
func (c * Client ) DoBulk (url string , items []* BulkRequest ) (* BulkResponse , error ) {
225
238
var buf bytes.Buffer
226
239
@@ -252,18 +265,19 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error)
252
265
return ret , errors .Trace (err )
253
266
}
254
267
268
+ // CreateMapping creates a ES mapping.
255
269
func (c * Client ) CreateMapping (index string , docType string , mapping map [string ]interface {}) error {
256
- reqUrl := fmt .Sprintf ("%s://%s/%s" , c .Protocol , c .Addr ,
270
+ reqURL := fmt .Sprintf ("%s://%s/%s" , c .Protocol , c .Addr ,
257
271
url .QueryEscape (index ))
258
272
259
- r , err := c .Do ("HEAD" , reqUrl , nil )
273
+ r , err := c .Do ("HEAD" , reqURL , nil )
260
274
if err != nil {
261
275
return errors .Trace (err )
262
276
}
263
277
264
278
// if index doesn't exist, will get 404 not found, create index first
265
279
if r .Code == http .StatusNotFound {
266
- _ , err = c .Do ("PUT" , reqUrl , nil )
280
+ _ , err = c .Do ("PUT" , reqURL , nil )
267
281
268
282
if err != nil {
269
283
return errors .Trace (err )
@@ -272,20 +286,21 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string]
272
286
return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
273
287
}
274
288
275
- reqUrl = fmt .Sprintf ("%s://%s/%s/%s/_mapping" , c .Protocol , c .Addr ,
289
+ reqURL = fmt .Sprintf ("%s://%s/%s/%s/_mapping" , c .Protocol , c .Addr ,
276
290
url .QueryEscape (index ),
277
291
url .QueryEscape (docType ))
278
292
279
- _ , err = c .Do ("POST" , reqUrl , mapping )
293
+ _ , err = c .Do ("POST" , reqURL , mapping )
280
294
return errors .Trace (err )
281
295
}
282
296
283
- func (c * Client ) GetMapping (index string , docType string ) (* MappingResponse , error ){
284
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/_mapping" , c .Protocol , c .Addr ,
297
+ // GetMapping gets the mapping.
298
+ func (c * Client ) GetMapping (index string , docType string ) (* MappingResponse , error ) {
299
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/_mapping" , c .Protocol , c .Addr ,
285
300
url .QueryEscape (index ),
286
301
url .QueryEscape (docType ))
287
302
buf := bytes .NewBuffer (nil )
288
- resp , err := c .DoRequest ("GET" , reqUrl , buf )
303
+ resp , err := c .DoRequest ("GET" , reqURL , buf )
289
304
290
305
if err != nil {
291
306
return nil , errors .Trace (err )
@@ -308,100 +323,107 @@ func (c *Client) GetMapping(index string, docType string) (*MappingResponse, err
308
323
return ret , errors .Trace (err )
309
324
}
310
325
326
+ // DeleteIndex deletes the index.
311
327
func (c * Client ) DeleteIndex (index string ) error {
312
- reqUrl := fmt .Sprintf ("%s://%s/%s" , c .Protocol , c .Addr ,
328
+ reqURL := fmt .Sprintf ("%s://%s/%s" , c .Protocol , c .Addr ,
313
329
url .QueryEscape (index ))
314
330
315
- r , err := c .Do ("DELETE" , reqUrl , nil )
331
+ r , err := c .Do ("DELETE" , reqURL , nil )
316
332
if err != nil {
317
333
return errors .Trace (err )
318
334
}
319
335
320
336
if r .Code == http .StatusOK || r .Code == http .StatusNotFound {
321
337
return nil
322
- } else {
323
- return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
324
338
}
339
+
340
+ return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
325
341
}
326
342
343
+ // Get gets the item by id.
327
344
func (c * Client ) Get (index string , docType string , id string ) (* Response , error ) {
328
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
345
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
329
346
url .QueryEscape (index ),
330
347
url .QueryEscape (docType ),
331
348
url .QueryEscape (id ))
332
349
333
- return c .Do ("GET" , reqUrl , nil )
350
+ return c .Do ("GET" , reqURL , nil )
334
351
}
335
352
336
- // Can use Update to create or update the data
353
+ // Update creates or updates the data
337
354
func (c * Client ) Update (index string , docType string , id string , data map [string ]interface {}) error {
338
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
355
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
339
356
url .QueryEscape (index ),
340
357
url .QueryEscape (docType ),
341
358
url .QueryEscape (id ))
342
359
343
- r , err := c .Do ("PUT" , reqUrl , data )
360
+ r , err := c .Do ("PUT" , reqURL , data )
344
361
if err != nil {
345
362
return errors .Trace (err )
346
363
}
347
364
348
365
if r .Code == http .StatusOK || r .Code == http .StatusCreated {
349
366
return nil
350
- } else {
351
- return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
352
367
}
368
+
369
+ return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
353
370
}
354
371
372
+ // Exists checks whether id exists or not.
355
373
func (c * Client ) Exists (index string , docType string , id string ) (bool , error ) {
356
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
374
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
357
375
url .QueryEscape (index ),
358
376
url .QueryEscape (docType ),
359
377
url .QueryEscape (id ))
360
378
361
- r , err := c .Do ("HEAD" , reqUrl , nil )
379
+ r , err := c .Do ("HEAD" , reqURL , nil )
362
380
if err != nil {
363
381
return false , err
364
382
}
365
383
366
384
return r .Code == http .StatusOK , nil
367
385
}
368
386
387
+ // Delete deletes the item by id.
369
388
func (c * Client ) Delete (index string , docType string , id string ) error {
370
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
389
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/%s" , c .Protocol , c .Addr ,
371
390
url .QueryEscape (index ),
372
391
url .QueryEscape (docType ),
373
392
url .QueryEscape (id ))
374
393
375
- r , err := c .Do ("DELETE" , reqUrl , nil )
394
+ r , err := c .Do ("DELETE" , reqURL , nil )
376
395
if err != nil {
377
396
return errors .Trace (err )
378
397
}
379
398
380
399
if r .Code == http .StatusOK || r .Code == http .StatusNotFound {
381
400
return nil
382
- } else {
383
- return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
384
401
}
402
+
403
+ return errors .Errorf ("Error: %s, code: %d" , http .StatusText (r .Code ), r .Code )
385
404
}
386
405
406
+ // Bulk sends the bulk request.
387
407
// only support parent in 'Bulk' related apis
388
408
func (c * Client ) Bulk (items []* BulkRequest ) (* BulkResponse , error ) {
389
- reqUrl := fmt .Sprintf ("%s://%s/_bulk" , c .Protocol , c .Addr )
409
+ reqURL := fmt .Sprintf ("%s://%s/_bulk" , c .Protocol , c .Addr )
390
410
391
- return c .DoBulk (reqUrl , items )
411
+ return c .DoBulk (reqURL , items )
392
412
}
393
413
414
+ // IndexBulk sends the bulk request for index.
394
415
func (c * Client ) IndexBulk (index string , items []* BulkRequest ) (* BulkResponse , error ) {
395
- reqUrl := fmt .Sprintf ("%s://%s/%s/_bulk" , c .Protocol , c .Addr ,
416
+ reqURL := fmt .Sprintf ("%s://%s/%s/_bulk" , c .Protocol , c .Addr ,
396
417
url .QueryEscape (index ))
397
418
398
- return c .DoBulk (reqUrl , items )
419
+ return c .DoBulk (reqURL , items )
399
420
}
400
421
422
+ // IndexTypeBulk sends the bulk request for index and doc type.
401
423
func (c * Client ) IndexTypeBulk (index string , docType string , items []* BulkRequest ) (* BulkResponse , error ) {
402
- reqUrl := fmt .Sprintf ("%s://%s/%s/%s/_bulk" , c .Protocol , c .Addr ,
424
+ reqURL := fmt .Sprintf ("%s://%s/%s/%s/_bulk" , c .Protocol , c .Addr ,
403
425
url .QueryEscape (index ),
404
426
url .QueryEscape (docType ))
405
427
406
- return c .DoBulk (reqUrl , items )
428
+ return c .DoBulk (reqURL , items )
407
429
}
0 commit comments