Skip to content

Commit b94c8aa

Browse files
adityacsslim-bean
andauthored
feature: geoip stage in promtail (#3493)
Co-authored-by: Ed Welch <[email protected]>
1 parent 272ebde commit b94c8aa

32 files changed

+3920
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
* [8233](https://github.com/grafana/loki/pull/8233) **nicoche**: promtail: Add `max-line-size-truncate` limit to truncate too long lines on client side
6363
* [7462](https://github.com/grafana/loki/pull/7462) **MarNicGit**: Allow excluding event message from Windows Event Log entries.
6464
* [7597](https://github.com/grafana/loki/pull/7597) **redbaron**: allow ratelimiting by label
65+
* [3493](https://github.com/grafana/loki/pull/3493) **adityacs** Support geoip stage.
6566
* [8382](https://github.com/grafana/loki/pull/8382) **kelnage**: Promtail: Add event log message stage
6667

6768
##### Fixes

clients/pkg/logentry/stages/geoip.go

+241
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package stages
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"reflect"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/mitchellh/mapstructure"
12+
"github.com/oschwald/geoip2-golang"
13+
"github.com/pkg/errors"
14+
"github.com/prometheus/common/model"
15+
)
16+
17+
const (
18+
ErrEmptyGeoIPStageConfig = "geoip stage config cannot be empty"
19+
ErrEmptyDBPathGeoIPStageConfig = "db path cannot be empty"
20+
ErrEmptySourceGeoIPStageConfig = "source cannot be empty"
21+
ErrEmptyDBTypeGeoIPStageConfig = "db type should be either city or asn"
22+
)
23+
24+
type GeoIPFields int
25+
26+
const (
27+
CITYNAME GeoIPFields = iota
28+
COUNTRYNAME
29+
CONTINENTNAME
30+
CONTINENTCODE
31+
LOCATION
32+
POSTALCODE
33+
TIMEZONE
34+
SUBDIVISIONNAME
35+
SUBDIVISIONCODE
36+
)
37+
38+
var fields = map[GeoIPFields]string{
39+
CITYNAME: "geoip_city_name",
40+
COUNTRYNAME: "geoip_country_name",
41+
CONTINENTNAME: "geoip_continet_name",
42+
CONTINENTCODE: "geoip_continent_code",
43+
LOCATION: "geoip_location",
44+
POSTALCODE: "geoip_postal_code",
45+
TIMEZONE: "geoip_timezone",
46+
SUBDIVISIONNAME: "geoip_subdivision_name",
47+
SUBDIVISIONCODE: "geoip_subdivision_code",
48+
}
49+
50+
// GeoIPConfig represents GeoIP stage config
51+
type GeoIPConfig struct {
52+
DB string `mapstructure:"db"`
53+
Source *string `mapstructure:"source"`
54+
DBType string `mapstructure:"db_type"`
55+
}
56+
57+
func validateGeoIPConfig(c *GeoIPConfig) error {
58+
if c == nil {
59+
return errors.New(ErrEmptyGeoIPStageConfig)
60+
}
61+
62+
if c.DB == "" {
63+
return errors.New(ErrEmptyDBPathGeoIPStageConfig)
64+
}
65+
66+
if c.Source != nil && *c.Source == "" {
67+
return errors.New(ErrEmptySourceGeoIPStageConfig)
68+
}
69+
70+
if c.DBType == "" {
71+
return errors.New(ErrEmptyDBTypeGeoIPStageConfig)
72+
}
73+
74+
return nil
75+
}
76+
77+
func newGeoIPStage(logger log.Logger, configs interface{}) (Stage, error) {
78+
cfgs := &GeoIPConfig{}
79+
err := mapstructure.Decode(configs, cfgs)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
err = validateGeoIPConfig(cfgs)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
db, err := geoip2.Open(cfgs.DB)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
return &geoIPStage{
95+
db: db,
96+
logger: logger,
97+
cfgs: cfgs,
98+
}, nil
99+
}
100+
101+
type geoIPStage struct {
102+
logger log.Logger
103+
db *geoip2.Reader
104+
cfgs *GeoIPConfig
105+
}
106+
107+
// Run implements Stage
108+
func (g *geoIPStage) Run(in chan Entry) chan Entry {
109+
out := make(chan Entry)
110+
go func() {
111+
defer close(out)
112+
defer g.close()
113+
for e := range in {
114+
g.process(e.Labels, e.Extracted, &e.Timestamp, &e.Entry.Line)
115+
out <- e
116+
}
117+
}()
118+
return out
119+
}
120+
121+
// Name implements Stage
122+
func (g *geoIPStage) Name() string {
123+
return StageTypeGeoIP
124+
}
125+
126+
func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
127+
var ip net.IP
128+
if g.cfgs.Source != nil {
129+
if _, ok := extracted[*g.cfgs.Source]; !ok {
130+
if Debug {
131+
level.Debug(g.logger).Log("msg", "source does not exist in the set of extracted values", "source", *g.cfgs.Source)
132+
}
133+
return
134+
}
135+
136+
value, err := getString(extracted[*g.cfgs.Source])
137+
if err != nil {
138+
if Debug {
139+
level.Debug(g.logger).Log("msg", "failed to convert source value to string", "source", *g.cfgs.Source, "err", err, "type", reflect.TypeOf(extracted[*g.cfgs.Source]))
140+
}
141+
return
142+
}
143+
ip = net.ParseIP(value)
144+
}
145+
switch g.cfgs.DBType {
146+
case "city":
147+
record, err := g.db.City(ip)
148+
if err != nil {
149+
level.Error(g.logger).Log("msg", "unable to get City record for the ip", "err", err, "ip", ip)
150+
return
151+
}
152+
g.populateLabelsWithCityData(labels, record)
153+
case "asn":
154+
record, err := g.db.ASN(ip)
155+
if err != nil {
156+
level.Error(g.logger).Log("msg", "unable to get ASN record for the ip", "err", err, "ip", ip)
157+
return
158+
}
159+
g.populateLabelsWithASNData(labels, record)
160+
default:
161+
level.Error(g.logger).Log("msg", "unknown database type")
162+
}
163+
}
164+
165+
func (g *geoIPStage) close() {
166+
if err := g.db.Close(); err != nil {
167+
level.Error(g.logger).Log("msg", "error while closing geoip db", "err", err)
168+
}
169+
}
170+
171+
func (g *geoIPStage) populateLabelsWithCityData(labels model.LabelSet, record *geoip2.City) {
172+
for field, label := range fields {
173+
switch field {
174+
case CITYNAME:
175+
cityName := record.City.Names["en"]
176+
if cityName != "" {
177+
labels[model.LabelName(label)] = model.LabelValue(cityName)
178+
}
179+
case COUNTRYNAME:
180+
contryName := record.Country.Names["en"]
181+
if contryName != "" {
182+
labels[model.LabelName(label)] = model.LabelValue(contryName)
183+
}
184+
case CONTINENTNAME:
185+
continentName := record.Continent.Names["en"]
186+
if continentName != "" {
187+
labels[model.LabelName(label)] = model.LabelValue(continentName)
188+
}
189+
case CONTINENTCODE:
190+
continentCode := record.Continent.Code
191+
if continentCode != "" {
192+
labels[model.LabelName(label)] = model.LabelValue(continentCode)
193+
}
194+
case POSTALCODE:
195+
postalCode := record.Postal.Code
196+
if postalCode != "" {
197+
labels[model.LabelName(label)] = model.LabelValue(postalCode)
198+
}
199+
case TIMEZONE:
200+
timezone := record.Location.TimeZone
201+
if timezone != "" {
202+
labels[model.LabelName(label)] = model.LabelValue(timezone)
203+
}
204+
case LOCATION:
205+
latitude := record.Location.Latitude
206+
longitude := record.Location.Longitude
207+
if latitude != 0 || longitude != 0 {
208+
labels[model.LabelName(fmt.Sprintf("%s_latitude", label))] = model.LabelValue(fmt.Sprint(latitude))
209+
labels[model.LabelName(fmt.Sprintf("%s_longitude", label))] = model.LabelValue(fmt.Sprint(longitude))
210+
}
211+
case SUBDIVISIONNAME:
212+
if len(record.Subdivisions) > 0 {
213+
// we get most specific subdivision https://dev.maxmind.com/release-note/most-specific-subdivision-attribute-added/
214+
subdivisionName := record.Subdivisions[len(record.Subdivisions)-1].Names["en"]
215+
if subdivisionName != "" {
216+
labels[model.LabelName(label)] = model.LabelValue(subdivisionName)
217+
}
218+
}
219+
case SUBDIVISIONCODE:
220+
if len(record.Subdivisions) > 0 {
221+
subdivisionCode := record.Subdivisions[len(record.Subdivisions)-1].IsoCode
222+
if subdivisionCode != "" {
223+
labels[model.LabelName(label)] = model.LabelValue(subdivisionCode)
224+
}
225+
}
226+
default:
227+
level.Error(g.logger).Log("msg", "unknown geoip field")
228+
}
229+
}
230+
}
231+
232+
func (g *geoIPStage) populateLabelsWithASNData(labels model.LabelSet, record *geoip2.ASN) {
233+
autonomousSystemNumber := record.AutonomousSystemNumber
234+
autonomousSystemOrganization := record.AutonomousSystemOrganization
235+
if autonomousSystemNumber != 0 {
236+
labels[model.LabelName("geoip_autonomous_system_number")] = model.LabelValue(fmt.Sprint(autonomousSystemNumber))
237+
}
238+
if autonomousSystemOrganization != "" {
239+
labels[model.LabelName("geoip_autonomous_system_organization")] = model.LabelValue(autonomousSystemOrganization)
240+
}
241+
}
+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package stages
2+
3+
import (
4+
"testing"
5+
6+
"github.com/pkg/errors"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func Test_ValidateConfigs(t *testing.T) {
11+
source := "ip"
12+
tests := []struct {
13+
config GeoIPConfig
14+
wantError error
15+
}{
16+
{
17+
GeoIPConfig{
18+
DB: "test",
19+
Source: &source,
20+
DBType: "city",
21+
},
22+
nil,
23+
},
24+
{
25+
GeoIPConfig{
26+
Source: &source,
27+
DBType: "city",
28+
},
29+
errors.New(ErrEmptyDBPathGeoIPStageConfig),
30+
},
31+
{
32+
GeoIPConfig{
33+
DB: "test",
34+
DBType: "city",
35+
},
36+
errors.New(ErrEmptySourceGeoIPStageConfig),
37+
},
38+
{
39+
GeoIPConfig{
40+
DB: "test",
41+
Source: &source,
42+
},
43+
errors.New(ErrEmptyDBTypeGeoIPStageConfig),
44+
},
45+
}
46+
for _, tt := range tests {
47+
err := validateGeoIPConfig(&tt.config)
48+
if err != nil {
49+
require.Equal(t, tt.wantError.Error(), err.Error())
50+
}
51+
if tt.wantError == nil {
52+
require.Nil(t, err)
53+
}
54+
}
55+
}

clients/pkg/logentry/stages/match.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package stages
22

33
import (
4-
"github.com/prometheus/prometheus/model/labels"
5-
64
"github.com/go-kit/log"
75
"github.com/mitchellh/mapstructure"
86
"github.com/pkg/errors"
97
"github.com/prometheus/client_golang/prometheus"
108
"github.com/prometheus/common/model"
9+
"github.com/prometheus/prometheus/model/labels"
1110

1211
"github.com/grafana/loki/clients/pkg/logentry/logql"
1312
)

clients/pkg/logentry/stages/stage.go

+6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
StageTypeStaticLabels = "static_labels"
3939
StageTypeDecolorize = "decolorize"
4040
StageTypeEventLogMessage = "eventlogmessage"
41+
StageTypeGeoIP = "geoip"
4142
)
4243

4344
// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
@@ -218,6 +219,11 @@ func New(logger log.Logger, jobName *string, stageType string,
218219
}
219220
case StageTypeEventLogMessage:
220221
s, err = newEventLogMessageStage(logger, cfg)
222+
if err != nil {
223+
return nil, err
224+
}
225+
case StageTypeGeoIP:
226+
s, err = newGeoIPStage(logger, cfg)
221227
if err != nil {
222228
return nil, err
223229
}

docs/sources/clients/promtail/stages/_index.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Parsing stages:
1616
- [logfmt]({{<relref "logfmt">}}): Extract data by parsing the log line as logfmt.
1717
- [replace]({{<relref "replace">}}): Replace data using a regular expression.
1818
- [multiline]({{<relref "multiline">}}): Merge multiple lines into a multiline block.
19+
- [geoip]({{<relref "geoip">}}): Extract geoip data from extracted labels.
1920

2021
Transform stages:
2122

0 commit comments

Comments
 (0)