Skip to content

Commit d4e8812

Browse files
mxlxmsiddontang
authored andcommitted
refactor status with prometheus (#340)
* refactor status with prometheus
1 parent 7b48b8c commit d4e8812

10 files changed

+77
-94
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
language: go
22

33
go:
4-
- "1.11"
4+
- "1.12"
55

66
services:
77
- elasticsearch
@@ -35,4 +35,4 @@ script:
3535
- go test --race ./...
3636

3737
env:
38-
- GO111MODULE=on
38+
- GO111MODULE=on

etc/river.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ data_dir = "./var"
2020

2121
# Inner Http status address
2222
stat_addr = "127.0.0.1:12800"
23+
stat_path = "/metrics"
2324

2425
# pseudo server id like a slave
2526
server_id = 1001

go.mod

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
module github.com/siddontang/go-mysql-elasticsearch
22

3+
go 1.12
4+
35
require (
46
github.com/BurntSushi/toml v0.3.1
57
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
8+
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
9+
github.com/prometheus/client_golang v0.9.3
610
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
7-
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
8-
github.com/siddontang/go-mysql v0.0.0-20190303113352-670f74e8daf5
11+
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed
12+
github.com/siddontang/go-mysql v0.0.0-20190524062908-de6c3a84bcbe
913
)

river/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Config struct {
2727
ESPassword string `toml:"es_pass"`
2828

2929
StatAddr string `toml:"stat_addr"`
30+
StatPath string `toml:"stat_path"`
3031

3132
ServerID uint32 `toml:"server_id"`
3233
Flavor string `toml:"flavor"`

river/metrics.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package river
2+
3+
import (
4+
"net/http"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/promauto"
9+
"github.com/prometheus/client_golang/prometheus/promhttp"
10+
)
11+
12+
var (
13+
esInsertNum = promauto.NewCounterVec(
14+
prometheus.CounterOpts{
15+
Name: "mysql2es_inserted_num",
16+
Help: "The number of docs inserted to elasticsearch",
17+
}, []string{"index"},
18+
)
19+
esUpdateNum = promauto.NewCounterVec(
20+
prometheus.CounterOpts{
21+
Name: "mysql2es_updated_num",
22+
Help: "The number of docs updated to elasticsearch",
23+
}, []string{"index"},
24+
)
25+
esDeleteNum = promauto.NewCounterVec(
26+
prometheus.CounterOpts{
27+
Name: "mysql2es_deleted_num",
28+
Help: "The number of docs deleted from elasticsearch",
29+
}, []string{"index"},
30+
)
31+
canalSyncState = promauto.NewGauge(
32+
prometheus.GaugeOpts{
33+
Name: "mysql2es_canal_state",
34+
Help: "The canal slave running state: 0=stopped, 1=ok",
35+
},
36+
)
37+
canalDelay = promauto.NewGauge(
38+
prometheus.GaugeOpts{
39+
Name: "mysql2es_canal_delay",
40+
Help: "The canal slave lag",
41+
},
42+
)
43+
)
44+
45+
func (r *River) collectMetrics() {
46+
for range time.Tick(10 * time.Second) {
47+
canalDelay.Set(float64(r.canal.GetDelay()))
48+
}
49+
}
50+
51+
func InitStatus(addr string, path string) {
52+
http.Handle(path, promhttp.Handler())
53+
http.ListenAndServe(addr, nil)
54+
}

river/river.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ type River struct {
3333

3434
es *elastic.Client
3535

36-
st *stat
37-
3836
master *masterInfo
3937

4038
syncCh chan interface{}
@@ -78,8 +76,7 @@ func NewRiver(c *Config) (*River, error) {
7876
cfg.HTTPS = r.c.ESHttps
7977
r.es = elastic.NewClient(cfg)
8078

81-
r.st = &stat{r: r}
82-
go r.st.Run(r.c.StatAddr)
79+
go InitStatus(r.c.StatAddr, r.c.StatPath)
8380

8481
return r, nil
8582
}
@@ -292,11 +289,13 @@ func ruleKey(schema string, table string) string {
292289
// Run syncs the data from MySQL and inserts to ES.
293290
func (r *River) Run() error {
294291
r.wg.Add(1)
292+
canalSyncState.Set(float64(1))
295293
go r.syncLoop()
296294

297295
pos := r.master.Position()
298296
if err := r.canal.RunFrom(pos); err != nil {
299297
log.Errorf("start canal err %v", err)
298+
canalSyncState.Set(0)
300299
return errors.Trace(err)
301300
}
302301

river/river_extra_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) {
4646
cfg.DumpExec = "mysqldump"
4747

4848
cfg.StatAddr = "127.0.0.1:12800"
49+
cfg.StatPath = "/metrics2"
50+
4951
cfg.BulkSize = 1
5052
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}
5153

river/river_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
8686
cfg.DumpExec = "mysqldump"
8787

8888
cfg.StatAddr = "127.0.0.1:12800"
89+
cfg.StatPath = "/metrics1"
90+
8991
cfg.BulkSize = 1
9092
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}
9193

river/status.go

Lines changed: 0 additions & 74 deletions
This file was deleted.

river/sync.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ import (
1717
"github.com/siddontang/go-mysql/schema"
1818
)
1919

20-
const (
21-
syncInsertDoc = iota
22-
syncDeleteDoc
23-
syncUpdateDoc
24-
)
25-
2620
const (
2721
fieldTypeList = "list"
2822
// for the mysql int type to es date type
@@ -103,7 +97,7 @@ func (h *eventHandler) OnGTID(gtid mysql.GTIDSet) error {
10397
return nil
10498
}
10599

106-
func (h *eventHandler) OnPosSynced(pos mysql.Position, force bool) error {
100+
func (h *eventHandler) OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error {
107101
return nil
108102
}
109103

@@ -197,10 +191,10 @@ func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]
197191

198192
if action == canal.DeleteAction {
199193
req.Action = elastic.ActionDelete
200-
r.st.DeleteNum.Add(1)
194+
esDeleteNum.WithLabelValues(rule.Index).Inc()
201195
} else {
202196
r.makeInsertReqData(req, rule, values)
203-
r.st.InsertNum.Add(1)
197+
esInsertNum.WithLabelValues(rule.Index).Inc()
204198
}
205199

206200
reqs = append(reqs, req)
@@ -255,8 +249,8 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
255249
req = &elastic.BulkRequest{Index: rule.Index, Type: rule.Type, ID: afterID, Parent: afterParentID, Pipeline: rule.Pipeline}
256250
r.makeInsertReqData(req, rule, rows[i+1])
257251

258-
r.st.DeleteNum.Add(1)
259-
r.st.InsertNum.Add(1)
252+
esDeleteNum.WithLabelValues(rule.Index).Inc()
253+
esInsertNum.WithLabelValues(rule.Index).Inc()
260254
} else {
261255
if len(rule.Pipeline) > 0 {
262256
// Pipelines can only be specified on index action
@@ -267,7 +261,7 @@ func (r *River) makeUpdateRequest(rule *Rule, rows [][]interface{}) ([]*elastic.
267261
} else {
268262
r.makeUpdateReqData(req, rule, rows[i], rows[i+1])
269263
}
270-
r.st.UpdateNum.Add(1)
264+
esUpdateNum.WithLabelValues(rule.Index).Inc()
271265
}
272266

273267
reqs = append(reqs, req)

0 commit comments

Comments
 (0)