Skip to content

Commit 6dc03d8

Browse files
authored
Merge pull request #2 from SumoLogic/develop
moving drainqueue to go routine
2 parents 0d758c3 + 8af93f6 commit 6dc03d8

File tree

8 files changed

+56
-102
lines changed

8 files changed

+56
-102
lines changed

CHANGELOG.md

Lines changed: 0 additions & 16 deletions
This file was deleted.

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ This repository contains SumoLogic AWS Lambda extension.
2323
* To know more about how to use this extension follow docs [here](https://help.sumologic.com/03Send-Data/Collect-from-Other-Data-Sources/Collect_Logs_from_AWS_Lambda_using_Lambda_Extension).
2424
* [AWS Lambda Extensions API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html)
2525

26+
## Change Log
27+
28+
For Full Change Log, please visit [Releases](https://github.com/SumoLogic/sumologic-lambda-extensions/releases) page.
29+
2630
[github-build-badge]: https://github.com/SumoLogic/sumologic-lambda-extensions/workflows/build-and-test/badge.svg?branch=main
2731

2832
[github-build]: https://github.com/SumoLogic/sumologic-lambda-extensions/actions?query=workflow%3Abuild-and-test

lambda-extensions/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
7575
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
7676
logTypes := os.Getenv("SUMO_LOG_TYPES")
7777
if numRetry == "" {
78-
cfg.NumRetry = 0
78+
cfg.NumRetry = 3
7979
}
8080
if logLevel == "" {
8181
cfg.LogLevel = logrus.InfoLevel

lambda-extensions/sumoclient/sumoclient.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (s *sumoLogicClient) FlushAll(msgQueue [][]byte) error {
154154
return err
155155
}
156156

157-
func (s *sumoLogicClient) createCWLogLinee(item map[string]interface{}) {
157+
func (s *sumoLogicClient) createCWLogLine(item map[string]interface{}) {
158158

159159
message, ok := item["record"].(map[string]interface{})
160160
if ok {
@@ -198,7 +198,7 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) {
198198
}
199199
item["message"] = strings.TrimSpace(message)
200200
} else if ok && logType == "platform.report" {
201-
s.createCWLogLinee(item)
201+
s.createCWLogLine(item)
202202
}
203203
}
204204
}

lambda-extensions/sumologic-extension.go

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"os/signal"
77
"path/filepath"
88
"syscall"
9-
"time"
109

1110
"github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/utils"
1211

@@ -83,54 +82,32 @@ func nextEvent(ctx context.Context) (*lambdaapi.NextEventResponse, error) {
8382
return nextResponse, nil
8483
}
8584

86-
func flushData(ctx context.Context, DeadlineMs int64) {
87-
if config.EnableFailover {
88-
consumer.FlushDataQueue()
89-
} else {
90-
consumer.DrainQueue(ctx, DeadlineMs)
91-
}
92-
}
93-
9485
// processEvents is - Will block until shutdown event is received or cancelled via the context..
9586
func processEvents(ctx context.Context) {
96-
DeadlineMs, err := runTimeAPIInit()
87+
_, err := runTimeAPIInit()
9788
if err != nil {
9889
logger.Error("Error during Registration: ", err.Error())
9990
return
10091
}
101-
var totalMessagedProcessed int
102-
startTime := time.Now()
10392
// The For loop will continue till we recieve a shutdown event.
10493
for {
10594
select {
10695
case <-ctx.Done():
107-
flushData(ctx, 0)
96+
consumer.FlushDataQueue(ctx)
10897
return
10998
default:
110-
currentMessagedProcessed := consumer.DrainQueue(ctx, DeadlineMs)
111-
messagesChanged, durationComplete := utils.TotalMessagesCountChanged(totalMessagedProcessed, totalMessagedProcessed+currentMessagedProcessed, config.ProcessingSleepTime, startTime)
112-
totalMessagedProcessed = totalMessagedProcessed + currentMessagedProcessed
113-
// Call the next event is we reach timeout or no new message are received based on sleep time.
114-
if !utils.IsTimeRemaining(DeadlineMs) || durationComplete {
115-
logger.Debugf("Total Messages: %v, Current Messages: %v, messages changes: %v, duration Complete: %v, start Time: %s, Sleep Time: %s", totalMessagedProcessed, currentMessagedProcessed, messagesChanged, durationComplete, startTime, config.ProcessingSleepTime)
116-
logger.Info("Waiting for Run Time API event...")
117-
// This statement will freeze lambda
118-
nextResponse, err := nextEvent(ctx)
119-
if err != nil {
120-
logger.Error("Error during Next Event call: ", err.Error())
121-
return
122-
}
123-
// Next invoke will start from here
124-
logger.Infof("Received Next Event as %s", nextResponse.EventType)
125-
if nextResponse.EventType == lambdaapi.Shutdown {
126-
flushData(ctx, 0)
127-
return
128-
}
129-
DeadlineMs = nextResponse.DeadlineMs
130-
totalMessagedProcessed = 0
99+
go consumer.DrainQueue(ctx)
100+
// This statement will freeze lambda
101+
nextResponse, err := nextEvent(ctx)
102+
if err != nil {
103+
logger.Error("Error during Next Event call: ", err.Error())
104+
return
131105
}
132-
if messagesChanged {
133-
startTime = time.Now()
106+
// Next invoke will start from here
107+
logger.Infof("Received Next Event as %s", nextResponse.EventType)
108+
if nextResponse.EventType == lambdaapi.Shutdown {
109+
consumer.FlushDataQueue(ctx)
110+
return
134111
}
135112
}
136113
}

lambda-extensions/utils/utils.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"compress/gzip"
66
"encoding/json"
77
"errors"
8-
"time"
98
)
109

1110
//------------------Retry Logic Code-------------------------------
@@ -72,25 +71,3 @@ func PrettyPrint(v interface{}) string {
7271
}
7372
return string(data)
7473
}
75-
76-
// IsTimeRemaining is to check is the lambda is nearing its timeout.
77-
func IsTimeRemaining(deadtime int64) bool {
78-
t := time.Unix(deadtime, 0)
79-
dif := time.Now().Sub(t)
80-
if dif.Seconds() <= 10 {
81-
return false
82-
}
83-
return true
84-
}
85-
86-
// TotalMessagesCountChanged is to check is the lambda function is creating any new logs or not.
87-
func TotalMessagesCountChanged(totalMessages, currentMessages int, duration time.Duration, startTime time.Time) (bool, bool) {
88-
if totalMessages != 0 && totalMessages == currentMessages {
89-
t := time.Now().Sub(startTime)
90-
if t.Milliseconds() >= duration.Milliseconds() {
91-
return false, true
92-
}
93-
return false, false
94-
}
95-
return true, false
96-
}

lambda-extensions/workers/consumer.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"sync"
66

7-
"github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/utils"
8-
97
cfg "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/config"
108
sumocli "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/sumoclient"
119

@@ -14,8 +12,8 @@ import (
1412

1513
// TaskConsumer exposing methods every consmumer should implement
1614
type TaskConsumer interface {
17-
FlushDataQueue()
18-
DrainQueue(context.Context, int64) int
15+
FlushDataQueue(context.Context)
16+
DrainQueue(context.Context) int
1917
}
2018

2119
// sumoConsumer to drain log from dataQueue
@@ -37,23 +35,35 @@ func NewTaskConsumer(consumerQueue chan []byte, config *cfg.LambdaExtensionConfi
3735
}
3836

3937
// FlushDataQueue drains the dataqueue commpletely
40-
func (sc *sumoConsumer) FlushDataQueue() {
41-
var rawMsgArr [][]byte
42-
Loop:
43-
for {
44-
//Receives block when the buffer is empty.
45-
select {
46-
case rawmsg := <-sc.dataQueue:
47-
rawMsgArr = append(rawMsgArr, rawmsg)
48-
default:
49-
err := sc.sumoclient.FlushAll(rawMsgArr)
50-
if err != nil {
51-
sc.logger.Debugln("Unable to flush DataQueue", err.Error())
52-
// TODO: raise alert if flush fails
38+
func (sc *sumoConsumer) FlushDataQueue(ctx context.Context) {
39+
if sc.config.EnableFailover {
40+
var rawMsgArr [][]byte
41+
Loop:
42+
for {
43+
//Receives block when the buffer is empty.
44+
select {
45+
case rawmsg := <-sc.dataQueue:
46+
rawMsgArr = append(rawMsgArr, rawmsg)
47+
default:
48+
err := sc.sumoclient.FlushAll(rawMsgArr)
49+
if err != nil {
50+
sc.logger.Errorln("Unable to flush DataQueue", err.Error())
51+
// putting back all the msg to the queue in case of failure
52+
for _, msg := range rawMsgArr {
53+
sc.dataQueue <- msg
54+
}
55+
// TODO: raise alert if flush fails
56+
}
57+
close(sc.dataQueue)
58+
sc.logger.Debugf("DataQueue completely drained")
59+
break Loop
5360
}
54-
close(sc.dataQueue)
55-
sc.logger.Debugf("DataQueue completely drained")
56-
break Loop
61+
}
62+
} else {
63+
// calling drainqueue (during shutdown) if failover is not enabled
64+
maxCallsNeededForCompleteDraining := (len(sc.dataQueue) / sc.config.MaxConcurrentRequests) + 1
65+
for i := 0; i < maxCallsNeededForCompleteDraining; i++ {
66+
sc.DrainQueue(ctx)
5767
}
5868
}
5969

@@ -64,17 +74,19 @@ func (sc *sumoConsumer) consumeTask(ctx context.Context, wg *sync.WaitGroup, raw
6474
err := sc.sumoclient.SendLogs(ctx, rawmsg)
6575
if err != nil {
6676
sc.logger.Error("Error during Send Logs to Sumo Logic.", err.Error())
77+
// putting back the msg to the queue in case of failure
78+
sc.dataQueue <- rawmsg
6779
// TODO: raise alert if send logs fails
6880
}
6981
return
7082
}
7183

72-
func (sc *sumoConsumer) DrainQueue(ctx context.Context, deadtimems int64) int {
84+
func (sc *sumoConsumer) DrainQueue(ctx context.Context) int {
7385
wg := new(sync.WaitGroup)
7486
//sc.logger.Debug("Consuming data from dataQueue")
7587
counter := 0
7688
Loop:
77-
for i := 0; i < sc.config.MaxConcurrentRequests && len(sc.dataQueue) != 0 && utils.IsTimeRemaining(deadtimems); i++ {
89+
for i := 0; i < sc.config.MaxConcurrentRequests && len(sc.dataQueue) != 0; i++ {
7890
//Receives block when the buffer is empty.
7991
select {
8092
case rawmsg := <-sc.dataQueue:

scripts/zip.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,6 @@ for region in "${AWS_REGIONS[@]}"; do
7979
echo "Layer Arn: arn:aws:lambda:${region}:<accountId>:layer:${layer_name}:${layer_version} deployed to Region ${region}"
8080

8181
echo "Setting public permissions for layer version: ${layer_version}"
82-
# aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-prod --version-number $layer_version --principal '*' --action lambda:GetLayerVersion --region ${region}
83-
aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-dev --version-number ${layer_version} --principal '956882708938' --action lambda:GetLayerVersion --region ${region}
82+
aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-prod --version-number $layer_version --principal '*' --action lambda:GetLayerVersion --region ${region}
83+
# aws lambda add-layer-version-permission --layer-name ${layer_name} --statement-id ${layer_name}-dev --version-number ${layer_version} --principal '956882708938' --action lambda:GetLayerVersion --region ${region}
8484
done

0 commit comments

Comments
 (0)