Skip to content

Commit a013e9f

Browse files
authored
Lambda-promtail: Add support for processing SQS messages, add promtailClient Type, add logger, upgrade dependencies and fix unexpected flushing behaviors (#8231)
1 parent c6542e6 commit a013e9f

File tree

14 files changed

+266
-72
lines changed

14 files changed

+266
-72
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
##### Enhancements
88

9+
* [8231](https://github.com/grafana/loki/pull/8231) **CCOLLOT**: Lambda-promtail: add support for AWS SQS message ingestion.
910
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects
1011
* [7951](https://github.com/grafana/loki/pull/7951) **MichelHollands**: Add a count template function to line_format and label_format.
1112
* [7380](https://github.com/grafana/loki/pull/7380) **liguozhong**: metrics query: range vector support streaming agg when no overlap.
@@ -33,6 +34,7 @@
3334

3435
##### Fixes
3536

37+
* [8231](https://github.com/grafana/loki/pull/8231) **CCOLLOT**: Lambda-promtail: fix flushing behavior of batches, leading to a significant increase in performance.
3638
* [7784](https://github.com/grafana/loki/pull/7784) **isodude**: Fix default values of connect addresses for compactor and querier workers to work with IPv6.
3739
* [7880](https://github.com/grafana/loki/pull/7880) **sandeepsukhani**: consider range and offset in queries while looking for schema config for query sharding.
3840
* [7937](https://github.com/grafana/loki/pull/7937) **ssncferreira**: Deprecate CLI flag `-ruler.wal-cleaer.period` and replace it with `-ruler.wal-cleaner.period`.

docs/sources/clients/lambda-promtail/_index.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki.
109109

110110
Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki.
111111

112+
### Triggering Lambda-Promtail via SQS
113+
For AWS services supporting sending messages to SQS (for example, S3 with an S3 Notification to SQS), events can be processed through an [SQS queue using a lambda trigger](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) instead of directly configuring the source service to trigger lambda. Lambda-promtail will retrieve the nested events from the SQS messages' body and process them as if them came directly from the source service.
114+
115+
### On-Failure log recovery using SQS
116+
Triggering lambda-promtail through SQS allows handling on-failure recovery of the logs using a secondary SQS queue as a dead-letter-queue (DLQ). You can configure lambda so that unsuccessfully processed messages will be sent to the DLQ. After fixing the issue, operators will be able to reprocess the messages by sending back messages from the DLQ to the source queue using the [SQS DLQ redrive](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-dead-letter-queue-redrive.html) feature.
117+
112118
## Propagated Labels
113119

114120
Incoming logs can have seven special labels assigned to them which can be used in [relabeling]({{<relref "../promtail/configuration#relabel_configs">}}) or later stages in a Promtail [pipeline]({{<relref "../promtail/pipelines/">}}):

tools/lambda-promtail/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/aws/aws-sdk-go-v2 v1.16.0
88
github.com/aws/aws-sdk-go-v2/config v1.15.1
99
github.com/aws/aws-sdk-go-v2/service/s3 v1.22.0
10+
github.com/go-kit/log v0.2.1
1011
github.com/gogo/protobuf v1.3.2
1112
github.com/golang/snappy v0.0.4
1213
github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2
@@ -40,7 +41,6 @@ require (
4041
github.com/dustin/go-humanize v1.0.0 // indirect
4142
github.com/fatih/color v1.14.1 // indirect
4243
github.com/felixge/httpsnoop v1.0.3 // indirect
43-
github.com/go-kit/log v0.2.1 // indirect
4444
github.com/go-logfmt/logfmt v0.6.0 // indirect
4545
github.com/gogo/googleapis v1.4.0 // indirect
4646
github.com/gogo/status v1.1.1 // indirect

tools/lambda-promtail/go.sum

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,4 +1100,4 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
11001100
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
11011101
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
11021102
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
1103-
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
1103+
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=

tools/lambda-promtail/lambda-promtail/cw.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent) error {
1414
data, err := ev.AWSLogs.Parse()
1515
if err != nil {
16-
fmt.Println("error parsing log event: ", err)
1716
return err
1817
}
1918

@@ -42,18 +41,18 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
4241
return nil
4342
}
4443

45-
func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent) error {
46-
batch, err := newBatch(ctx)
44+
func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent, pClient Client) error {
45+
batch, err := newBatch(ctx, pClient)
4746
if err != nil {
4847
return err
4948
}
5049

5150
err = parseCWEvent(ctx, batch, ev)
5251
if err != nil {
53-
return err
52+
return fmt.Errorf("error parsing log event: %s", err)
5453
}
5554

56-
err = sendToPromtail(ctx, batch)
55+
err = pClient.sendToPromtail(ctx, batch)
5756
if err != nil {
5857
return err
5958
}

tools/lambda-promtail/lambda-promtail/kinesis.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent)
3333
return nil
3434
}
3535

36-
func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent) error {
37-
batch, _ := newBatch(ctx)
36+
func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent, pClient Client) error {
37+
batch, _ := newBatch(ctx, pClient)
3838

3939
err := parseKinesisEvent(ctx, batch, ev)
4040
if err != nil {
4141
return err
4242
}
4343

44-
err = sendToPromtail(ctx, batch)
44+
err = pClient.sendToPromtail(ctx, batch)
4545
if err != nil {
4646
return err
4747
}
4848
return nil
49-
}
49+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package main
2+
3+
import (
4+
"os"
5+
6+
"github.com/go-kit/log"
7+
"github.com/go-kit/log/level"
8+
)
9+
10+
func NewLogger(logLevel string) *log.Logger {
11+
logger := log.NewLogfmtLogger(os.Stderr)
12+
logger = level.NewFilter(logger, level.Allow(level.ParseDefault(logLevel, level.DebugValue())))
13+
logger = log.With(logger, "caller", log.DefaultCaller)
14+
return &logger
15+
}

tools/lambda-promtail/lambda-promtail/main.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"strconv"
1111
"strings"
1212

13+
"github.com/go-kit/log/level"
14+
"github.com/grafana/dskit/backoff"
1315
"github.com/prometheus/common/model"
1416

1517
"github.com/aws/aws-lambda-go/events"
@@ -23,7 +25,7 @@ const (
2325

2426
maxErrMsgLen = 1024
2527

26-
invalidExtraLabelsError = "Invalid value for environment variable EXTRA_LABELS. Expected a comma seperated list with an even number of entries. "
28+
invalidExtraLabelsError = "Invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
2729
)
2830

2931
var (
@@ -97,7 +99,6 @@ func setupArguments() {
9799
if strings.EqualFold(print, "false") {
98100
printLogLine = false
99101
}
100-
101102
s3Clients = make(map[string]*s3.Client)
102103
}
103104

@@ -133,10 +134,12 @@ func applyExtraLabels(labels model.LabelSet) model.LabelSet {
133134

134135
func checkEventType(ev map[string]interface{}) (interface{}, error) {
135136
var s3Event events.S3Event
137+
var s3TestEvent events.S3TestEvent
136138
var cwEvent events.CloudwatchLogsEvent
137139
var kinesisEvent events.KinesisEvent
140+
var sqsEvent events.SQSEvent
138141

139-
types := [...]interface{}{&s3Event, &cwEvent, &kinesisEvent}
142+
types := [...]interface{}{&s3Event, &s3TestEvent, &cwEvent, &kinesisEvent, &sqsEvent}
140143

141144
j, _ := json.Marshal(ev)
142145
reader := strings.NewReader(string(j))
@@ -157,21 +160,42 @@ func checkEventType(ev map[string]interface{}) (interface{}, error) {
157160
}
158161

159162
func handler(ctx context.Context, ev map[string]interface{}) error {
163+
lvl, ok := os.LookupEnv("LOG_LEVEL")
164+
if !ok {
165+
lvl = "info"
166+
}
167+
log := NewLogger(lvl)
168+
pClient := NewPromtailClient(&promtailClientConfig{
169+
backoff: &backoff.Config{
170+
MinBackoff: minBackoff,
171+
MaxBackoff: maxBackoff,
172+
MaxRetries: maxRetries,
173+
},
174+
http: &httpClientConfig{
175+
timeout: timeout,
176+
skipTlsVerify: skipTlsVerify,
177+
},
178+
}, log)
179+
160180
event, err := checkEventType(ev)
161181
if err != nil {
162-
fmt.Printf("invalid event: %s\n", ev)
182+
level.Error(*pClient.log).Log("err", fmt.Errorf("invalid event: %s\n", ev))
163183
return err
164184
}
165185

166186
switch evt := event.(type) {
167187
case *events.S3Event:
168-
return processS3Event(ctx, evt)
188+
return processS3Event(ctx, evt, pClient, pClient.log)
169189
case *events.CloudwatchLogsEvent:
170-
return processCWEvent(ctx, evt)
190+
return processCWEvent(ctx, evt, pClient)
171191
case *events.KinesisEvent:
172-
return processKinesisEvent(ctx, evt)
192+
return processKinesisEvent(ctx, evt, pClient)
193+
case *events.SQSEvent:
194+
return processSQSEvent(ctx, evt)
195+
// When setting up S3 Notification on a bucket, a test event is first sent, see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
196+
case *events.S3TestEvent:
197+
return nil
173198
}
174-
175199
return err
176200
}
177201

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package main
22

33
import (
4+
"testing"
5+
46
"github.com/prometheus/common/model"
57
"github.com/stretchr/testify/require"
6-
"testing"
78
)
89

910
func TestLambdaPromtail_ExtraLabelsValid(t *testing.T) {
10-
extraLabels, err := parseExtraLabels("A1,a,B2,b,C3,c,D4,d")
11+
extraLabels, err := parseExtraLabels("A1,a,B2,b,C3,c,D4,d", false)
1112
require.Nil(t, err)
1213
require.Len(t, extraLabels, 4)
1314
require.Equal(t, model.LabelValue("a"), extraLabels["__extra_A1"])
@@ -17,19 +18,19 @@ func TestLambdaPromtail_ExtraLabelsValid(t *testing.T) {
1718
}
1819

1920
func TestLambdaPromtail_ExtraLabelsMissingValue(t *testing.T) {
20-
extraLabels, err := parseExtraLabels("A,a,B,b,C,c,D")
21+
extraLabels, err := parseExtraLabels("A,a,B,b,C,c,D",false)
2122
require.Nil(t, extraLabels)
2223
require.Errorf(t, err, invalidExtraLabelsError)
2324
}
2425

2526
func TestLambdaPromtail_ExtraLabelsInvalidNames(t *testing.T) {
26-
extraLabels, err := parseExtraLabels("A!,%a,B?,$b,C-,c^")
27+
extraLabels, err := parseExtraLabels("A!,%a,B?,$b,C-,c^", false)
2728
require.Nil(t, extraLabels)
2829
require.Errorf(t, err, "invalid name \"__extra_A!\"")
2930
}
3031

3132
func TestLambdaPromtail_TestParseLabelsNoneProvided(t *testing.T) {
32-
extraLabels, err := parseExtraLabels("")
33+
extraLabels, err := parseExtraLabels("", false)
3334
require.Len(t, extraLabels, 0)
3435
require.Nil(t, err)
3536
}

tools/lambda-promtail/lambda-promtail/promtail.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7-
"crypto/tls"
87
"fmt"
98
"io"
109
"net/http"
1110
"sort"
1211
"strings"
1312
"time"
1413

14+
"github.com/go-kit/log/level"
1515
"github.com/gogo/protobuf/proto"
1616
"github.com/golang/snappy"
1717
"github.com/grafana/dskit/backoff"
@@ -38,6 +38,7 @@ type entry struct {
3838
type batch struct {
3939
streams map[string]*logproto.Stream
4040
size int
41+
client Client
4142
}
4243

4344
type batchIf interface {
@@ -47,9 +48,10 @@ type batchIf interface {
4748
flushBatch(ctx context.Context) error
4849
}
4950

50-
func newBatch(ctx context.Context, entries ...entry) (*batch, error) {
51+
func newBatch(ctx context.Context, pClient Client, entries ...entry) (*batch, error) {
5152
b := &batch{
5253
streams: map[string]*logproto.Stream{},
54+
client: pClient,
5355
}
5456

5557
for _, entry := range entries {
@@ -123,34 +125,37 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
123125
}
124126

125127
func (b *batch) flushBatch(ctx context.Context) error {
126-
err := sendToPromtail(ctx, b)
128+
err := b.client.sendToPromtail(ctx, b)
127129
if err != nil {
128130
return err
129131
}
130-
131-
b.streams = make(map[string]*logproto.Stream)
132+
b.resetBatch()
132133

133134
return nil
134135
}
135136

136-
func sendToPromtail(ctx context.Context, b *batch) error {
137+
func (b *batch) resetBatch() {
138+
b.streams = make(map[string]*logproto.Stream)
139+
b.size = 0
140+
}
141+
142+
func (c *promtailClient) sendToPromtail(ctx context.Context, b *batch) error {
137143
buf, _, err := b.encode()
138144
if err != nil {
139145
return err
140146
}
141147

142-
backoff := backoff.New(ctx, backoff.Config{minBackoff, maxBackoff, maxRetries})
148+
backoff := backoff.New(ctx, *c.config.backoff)
143149
var status int
144150
for {
145151
// send uses `timeout` internally, so `context.Background` is good enough.
146-
status, err = send(context.Background(), buf)
152+
status, err = c.send(context.Background(), buf)
147153

148154
// Only retry 429s, 500s and connection-level errors.
149155
if status > 0 && status != 429 && status/100 != 5 {
150156
break
151157
}
152-
153-
fmt.Printf("error sending batch, will retry, status: %d error: %s\n", status, err)
158+
level.Error(*c.log).Log("err", fmt.Errorf("error sending batch, will retry, status: %d error: %s\n", status, err))
154159
backoff.Wait()
155160

156161
// Make sure it sends at least once before checking for retry.
@@ -160,15 +165,15 @@ func sendToPromtail(ctx context.Context, b *batch) error {
160165
}
161166

162167
if err != nil {
163-
fmt.Printf("Failed to send logs! %s\n", err)
168+
level.Error(*c.log).Log("err", fmt.Errorf("Failed to send logs! %s\n", err))
164169
return err
165170
}
166171

167172
return nil
168173
}
169174

170-
func send(ctx context.Context, buf []byte) (int, error) {
171-
ctx, cancel := context.WithTimeout(ctx, timeout)
175+
func (c *promtailClient) send(ctx context.Context, buf []byte) (int, error) {
176+
ctx, cancel := context.WithTimeout(ctx, c.config.http.timeout)
172177
defer cancel()
173178

174179
req, err := http.NewRequest("POST", writeAddress.String(), bytes.NewReader(buf))
@@ -190,17 +195,11 @@ func send(ctx context.Context, buf []byte) (int, error) {
190195
req.Header.Set("Authorization", "Bearer "+bearerToken)
191196
}
192197

193-
promtailClient := &http.Client{}
194-
195-
if skipTlsVerify == true {
196-
promtailClient = &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
197-
}
198-
199-
resp, err := promtailClient.Do(req.WithContext(ctx))
198+
resp, err := c.http.Do(req.WithContext(ctx))
200199
if err != nil {
201200
return -1, err
202201
}
203-
202+
defer resp.Body.Close()
204203
if resp.StatusCode/100 != 2 {
205204
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
206205
line := ""

0 commit comments

Comments
 (0)