Skip to content

Commit 483e55c

Browse files
authored
Merge pull request #17 from SumoLogic/hpal_lambda_cost_fixes
Fixes Lambda Extension cost overruns
2 parents 6963d4f + 4648aba commit 483e55c

File tree

3 files changed

+22
-22
lines changed

3 files changed

+22
-22
lines changed

lambda-extensions/config/config.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type LambdaExtensionConfig struct {
2828
LogLevel logrus.Level
2929
MaxDataQueueLength int
3030
MaxConcurrentRequests int
31-
ProcessingSleepTime time.Duration
3231
MaxRetryAttempts int
3332
RetrySleepTime time.Duration
3433
ConnectionTimeoutValue time.Duration
@@ -37,7 +36,8 @@ type LambdaExtensionConfig struct {
3736
SourceCategoryOverride string
3837
}
3938

40-
var validLogTypes = []string{"platform", "function"}
39+
var defaultLogTypes = []string{"platform", "function"}
40+
var validLogTypes = []string{"platform", "function", "extension"}
4141

4242
// GetConfig to get config instance
4343
func GetConfig() (*LambdaExtensionConfig, error) {
@@ -52,7 +52,6 @@ func GetConfig() (*LambdaExtensionConfig, error) {
5252
LambdaRegion: os.Getenv("AWS_REGION"),
5353
SourceCategoryOverride: os.Getenv("SOURCE_CATEGORY_OVERRIDE"),
5454
MaxRetryAttempts: 5,
55-
RetrySleepTime: 300 * time.Millisecond,
5655
ConnectionTimeoutValue: 10000 * time.Millisecond,
5756
MaxDataPayloadSize: 1024 * 1024, // 1 MB
5857
}
@@ -68,12 +67,13 @@ func GetConfig() (*LambdaExtensionConfig, error) {
6867
}
6968
func (cfg *LambdaExtensionConfig) setDefaults() {
7069
numRetry := os.Getenv("SUMO_NUM_RETRIES")
71-
processingSleepTime := os.Getenv("SUMO_PROCESSING_SLEEP_TIME_MS")
70+
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
7271
logLevel := os.Getenv("SUMO_LOG_LEVEL")
7372
maxDataQueueLength := os.Getenv("SUMO_MAX_DATAQUEUE_LENGTH")
7473
maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS")
7574
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
7675
logTypes := os.Getenv("SUMO_LOG_TYPES")
76+
7777
if numRetry == "" {
7878
cfg.NumRetry = 3
7979
}
@@ -94,12 +94,12 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
9494
cfg.AWSLambdaRuntimeAPI = "127.0.0.1:9001"
9595
}
9696
if logTypes == "" {
97-
cfg.LogTypes = validLogTypes
97+
cfg.LogTypes = defaultLogTypes
9898
} else {
9999
cfg.LogTypes = strings.Split(logTypes, ",")
100100
}
101-
if processingSleepTime == "" {
102-
cfg.ProcessingSleepTime = 0 * time.Millisecond
101+
if retrySleepTime == "" {
102+
cfg.RetrySleepTime = 300 * time.Millisecond
103103
}
104104

105105
}
@@ -110,7 +110,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
110110
maxDataQueueLength := os.Getenv("SUMO_MAX_DATAQUEUE_LENGTH")
111111
maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS")
112112
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
113-
processingSleepTime := os.Getenv("SUMO_PROCESSING_SLEEP_TIME_MS")
113+
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
114114

115115
var allErrors []string
116116
var err error
@@ -152,12 +152,12 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
152152
}
153153
}
154154

155-
if processingSleepTime != "" {
156-
customProcessingSleepTime, err := strconv.ParseInt(processingSleepTime, 10, 32)
155+
if retrySleepTime != "" {
156+
customRetrySleepTime, err := strconv.ParseInt(retrySleepTime, 10, 32)
157157
if err != nil {
158-
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_PROCESSING_SLEEP_TIME_MS: %v", err))
158+
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_RETRY_SLEEP_TIME_MS: %v", err))
159159
} else {
160-
cfg.ProcessingSleepTime = time.Duration(customProcessingSleepTime) * time.Millisecond
160+
cfg.RetrySleepTime = time.Duration(customRetrySleepTime) * time.Millisecond
161161
}
162162
}
163163

lambda-extensions/sumoclient/sumoclient_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ func setupEnv() {
3131
os.Setenv("SUMO_MAX_DATAQUEUE_LENGTH", "10")
3232
os.Setenv("SUMO_MAX_CONCURRENT_REQUESTS", "3")
3333
os.Setenv("SUMO_LOG_LEVEL", "DEBUG")
34+
os.Setenv("SUMO_RETRY_SLEEP_TIME_MS", "50")
35+
os.Setenv("SUMO_LOG_TYPES", "function")
3436
}
3537

3638
func assertEqual(t *testing.T, a interface{}, b interface{}, message string) {

lambda-extensions/sumologic-extension.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,16 @@ func processEvents(ctx context.Context) {
104104
consumer.FlushDataQueue(ctx)
105105
return
106106
default:
107+
logger.Debugf("switching to other go routine")
108+
runtime.Gosched()
107109
logger.Infof("Calling DrainQueue from processEvents")
108-
for {
109-
runtime_done := consumer.DrainQueue(ctx)
110-
111-
if runtime_done == 1 {
112-
logger.Infof("Exiting DrainQueueLoop: Runtime is Done")
113-
break
114-
} else {
115-
logger.Debugf("switching to other go routine")
116-
runtime.Gosched()
117-
}
110+
// for {
111+
runtime_done := consumer.DrainQueue(ctx)
112+
113+
if runtime_done == 1 {
114+
logger.Infof("Exiting DrainQueueLoop: Runtime is Done")
118115
}
116+
// }
119117

120118
// This statement will freeze lambda
121119
nextResponse, err := nextEvent(ctx)

0 commit comments

Comments
 (0)