Skip to content

Commit 43a21ea

Browse files
authored
fix(promtail): Make cri tags streams aware. (#8497)
1 parent 880de93 commit 43a21ea

File tree

3 files changed

+117
-40
lines changed

3 files changed

+117
-40
lines changed

clients/pkg/logentry/stages/extensions.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/go-kit/log"
77
"github.com/go-kit/log/level"
88
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/common/model"
910
)
1011

1112
const (
@@ -43,7 +44,7 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
4344

4445
type cri struct {
4546
// bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`)
46-
partialLines []string
47+
partialLines map[model.Fingerprint]Entry
4748
maxPartialLines int
4849
base *Pipeline
4950
}
@@ -57,26 +58,44 @@ func (c *cri) Name() string {
5758
func (c *cri) Run(entry chan Entry) chan Entry {
5859
entry = c.base.Run(entry)
5960

60-
in := RunWithSkip(entry, func(e Entry) (Entry, bool) {
61+
in := RunWithSkipOrSendMany(entry, func(e Entry) ([]Entry, bool) {
62+
fingerprint := e.Labels.Fingerprint()
63+
64+
// We received partial-line (tag: "P")
6165
if e.Extracted["flags"] == "P" {
62-
if len(c.partialLines) >= c.maxPartialLines {
66+
if len(c.partialLines) > c.maxPartialLines {
6367
// Merge existing partialLines
64-
newPartialLine := e.Line
65-
e.Line = strings.Join(c.partialLines, "")
68+
entries := make([]Entry, 0, len(c.partialLines))
69+
for _, v := range c.partialLines {
70+
entries = append(entries, v)
71+
}
72+
6673
level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize)
67-
c.partialLines = c.partialLines[:0]
68-
c.partialLines = append(c.partialLines, newPartialLine)
69-
return e, false
74+
75+
c.partialLines = make(map[model.Fingerprint]Entry)
76+
c.partialLines[fingerprint] = e
77+
78+
return entries, false
7079
}
71-
c.partialLines = append(c.partialLines, e.Line)
72-
return e, true
80+
81+
prev, ok := c.partialLines[fingerprint]
82+
if ok {
83+
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
84+
}
85+
c.partialLines[fingerprint] = e
86+
87+
return []Entry{e}, true // it's a partial-line so skip it.
7388
}
74-
if len(c.partialLines) > 0 {
75-
c.partialLines = append(c.partialLines, e.Line)
76-
e.Line = strings.Join(c.partialLines, "")
77-
c.partialLines = c.partialLines[:0]
89+
90+
// Now we got full-line (tag: "F").
91+
// 1. If any old partialLines matches with this full-line stream, merge it
92+
// 2. Else just return the full line.
93+
prev, ok := c.partialLines[fingerprint]
94+
if ok {
95+
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
96+
delete(c.partialLines, fingerprint)
7897
}
79-
return e, false
98+
return []Entry{e}, false
8099
})
81100

82101
return in
@@ -122,6 +141,6 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
122141
maxPartialLines: MaxPartialLinesSize,
123142
base: p,
124143
}
125-
c.partialLines = make([]string, 0, c.maxPartialLines)
144+
c.partialLines = make(map[model.Fingerprint]Entry)
126145
return &c, nil
127146
}

clients/pkg/logentry/stages/extensions_test.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/common/model"
89
"github.com/stretchr/testify/assert"
910
"github.com/stretchr/testify/require"
1011

@@ -88,49 +89,77 @@ var (
8889
criTestTime2 = time.Now()
8990
)
9091

92+
type testEntry struct {
93+
labels model.LabelSet
94+
line string
95+
}
96+
9197
func TestCRI_tags(t *testing.T) {
9298
cases := []struct {
9399
name string
94100
lines []string
95101
expected []string
96102
maxPartialLines int
103+
entries []testEntry
97104
err error
98105
}{
99106
{
100107
name: "tag F",
101-
lines: []string{
102-
"2019-05-07T18:57:50.904275087+00:00 stdout F some full line",
103-
"2019-05-07T18:57:55.904275087+00:00 stdout F log",
108+
entries: []testEntry{
109+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout F some full line", labels: model.LabelSet{"foo": "bar"}},
110+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log", labels: model.LabelSet{"foo": "bar"}},
104111
},
105112
expected: []string{"some full line", "log"},
106113
},
107114
{
108-
name: "tag P",
109-
lines: []string{
110-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ",
111-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ",
112-
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
113-
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
115+
name: "tag P multi-stream",
116+
entries: []testEntry{
117+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
118+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar2"}},
119+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
120+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar2"}},
121+
},
122+
expected: []string{
123+
"partial line 1 log finished", // belongs to stream `{foo="bar"}`
124+
"partial line 2 another full log", // belongs to stream `{foo="bar2"}
125+
},
126+
},
127+
{
128+
name: "tag P multi-stream with maxPartialLines exceeded",
129+
entries: []testEntry{
130+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},
131+
132+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"label1": "val1"}},
133+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},
134+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"label1": "val3"}},
135+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 5 ", labels: model.LabelSet{"label1": "val4"}}, // exceeded maxPartialLines as already 3 streams in flight.
136+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"label1": "val1", "label2": "val2"}},
137+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}},
138+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}},
114139
},
140+
maxPartialLines: 2,
115141
expected: []string{
116-
"partial line 1 partial line 2 log finished",
142+
"partial line 1 partial line 3 ",
143+
"partial line 2 ",
144+
"partial line 4 ",
145+
"log finished",
117146
"another full log",
147+
"partial line 5 yet an another full log",
118148
},
119149
},
120150
{
121-
name: "tag P exceeding MaxPartialLinesSize lines",
122-
lines: []string{
123-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ",
124-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ",
125-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3",
126-
"2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", // this exceeds the `MaxPartialLinesSize` of 3
127-
"2019-05-07T18:57:55.904275087+00:00 stdout F log finished",
128-
"2019-05-07T18:57:55.904275087+00:00 stdout F another full log",
151+
name: "tag P single stream",
152+
entries: []testEntry{
153+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
154+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 2 ", labels: model.LabelSet{"foo": "bar"}},
155+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 3 ", labels: model.LabelSet{"foo": "bar"}},
156+
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 4 ", labels: model.LabelSet{"foo": "bar"}}, // this exceeds the `MaxPartialLinesSize` of 3
157+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
158+
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"foo": "bar"}},
129159
},
130160
maxPartialLines: 3,
131161
expected: []string{
132-
"partial line 1 partial line 2 partial line 3",
133-
"partial line 4 log finished",
162+
"partial line 1 partial line 2 partial line 3 partial line 4 log finished",
134163
"another full log",
135164
},
136165
},
@@ -148,16 +177,26 @@ func TestCRI_tags(t *testing.T) {
148177
p.(*cri).maxPartialLines = tt.maxPartialLines
149178
}
150179

151-
for _, line := range tt.lines {
152-
out := processEntries(p, newEntry(nil, nil, line, time.Now()))
180+
for _, entry := range tt.entries {
181+
out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now()))
153182
if len(out) > 0 {
154183
for _, en := range out {
155184
got = append(got, en.Line)
156-
157185
}
158186
}
159187
}
160-
assert.Equal(t, tt.expected, got)
188+
189+
expectedMap := make(map[string]bool)
190+
for _, v := range tt.expected {
191+
expectedMap[v] = true
192+
}
193+
194+
gotMap := make(map[string]bool)
195+
for _, v := range got {
196+
gotMap[v] = true
197+
}
198+
199+
assert.Equal(t, expectedMap, gotMap)
161200
})
162201
}
163202
}

clients/pkg/logentry/stages/pipeline.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Ent
9191
return out
9292
}
9393

94+
// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
95+
func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry {
96+
out := make(chan Entry)
97+
go func() {
98+
defer close(out)
99+
for e := range input {
100+
results, skip := process(e)
101+
if skip {
102+
continue
103+
}
104+
for _, result := range results {
105+
out <- result
106+
}
107+
}
108+
}()
109+
110+
return out
111+
}
112+
94113
// Run implements Stage
95114
func (p *Pipeline) Run(in chan Entry) chan Entry {
96115
in = RunWith(in, func(e Entry) Entry {

0 commit comments

Comments
 (0)