Skip to content

Commit 9bd3219

Browse files
committed
added TELEMETRY_TIMEOUT_MS, TELEMETRY_MAX_BYTES and TELEMETRY_MAX_ITEMS in environment config
1 parent f55c25e commit 9bd3219

File tree

4 files changed

+59
-8
lines changed

4 files changed

+59
-8
lines changed

lambda-extensions/config/config.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ type LambdaExtensionConfig struct {
3838
EnhanceJsonLogs bool
3939
EnableSpanDrops bool
4040
KmsCacheSeconds int64
41+
TelemetryTimeoutMs int
42+
TelemetryMaxBytes int64
43+
TelemetryMaxItems int
4144
}
4245

4346
var defaultLogTypes = []string{"platform", "function"}
@@ -82,6 +85,22 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
8285
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
8386
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
8487
kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS")
88+
telemetryTimeoutMs := os.Getenv("TELEMETRY_TIMEOUT_MS")
89+
telemetryMaxBytes := os.Getenv("TELEMETRY_MAX_BYTES")
90+
telemetryMaxItems := os.Getenv("TELEMETRY_MAX_ITEMS")
91+
92+
if telemetryTimeoutMs == "" {
93+
cfg.TelemetryTimeoutMs = 1000
94+
}
95+
96+
if telemetryMaxBytes == "" {
97+
cfg.TelemetryMaxBytes = 262144
98+
}
99+
100+
if telemetryMaxItems == "" {
101+
cfg.TelemetryMaxItems = 10000
102+
}
103+
85104

86105
if numRetry == "" {
87106
cfg.NumRetry = 3
@@ -141,6 +160,9 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
141160
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
142161
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
143162
kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS")
163+
telemetryTimeoutMs := os.Getenv("TELEMETRY_TIMEOUT_MS")
164+
telemetryMaxBytes := os.Getenv("TELEMETRY_MAX_BYTES")
165+
telemetryMaxItems := os.Getenv("TELEMETRY_MAX_ITEMS")
144166

145167
var allErrors []string
146168
var err error
@@ -239,6 +261,38 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
239261
}
240262
}
241263

264+
if telemetryTimeoutMs != "" {
265+
telemetryTimeoutMs, err := strconv.ParseInt(telemetryTimeoutMs, 10, 32)
266+
if err != nil {
267+
allErrors = append(allErrors, fmt.Sprintf("Unable to parse TELEMETRY_TIMEOUT_MS: %v", err))
268+
} else {
269+
cfg.TelemetryTimeoutMs = int(telemetryTimeoutMs)
270+
}
271+
cfg.TelemetryTimeoutMs = max(cfg.TelemetryTimeoutMs, 25)
272+
cfg.TelemetryTimeoutMs = min(cfg.TelemetryTimeoutMs, 30000)
273+
}
274+
275+
if telemetryMaxBytes != "" {
276+
cfg.TelemetryMaxBytes, err = strconv.ParseInt(telemetryMaxBytes, 10, 64)
277+
if err != nil {
278+
allErrors = append(allErrors, fmt.Sprintf("Unable to parse TELEMETRY_MAX_BYTES: %v", err))
279+
}
280+
cfg.TelemetryMaxBytes = max(cfg.TelemetryMaxBytes, 262144)
281+
cfg.TelemetryMaxBytes = min(cfg.TelemetryMaxBytes, 1048576)
282+
}
283+
284+
if telemetryMaxItems != "" {
285+
telemetryMaxItems, err := strconv.ParseInt(telemetryMaxItems, 10, 32)
286+
if err != nil {
287+
allErrors = append(allErrors, fmt.Sprintf("Unable to parse TELEMETRY_MAX_ITEMS: %v", err))
288+
} else {
289+
cfg.TelemetryMaxItems = int(telemetryMaxItems)
290+
}
291+
cfg.TelemetryMaxItems = max(cfg.TelemetryMaxItems, 1000)
292+
cfg.TelemetryMaxItems = min(cfg.TelemetryMaxItems, 10000)
293+
}
294+
295+
242296
// test valid log format type
243297
for _, logType := range cfg.LogTypes {
244298
if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) {

lambda-extensions/lambdaapi/telemetryapiclient.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,17 @@ const (
1111
// Base URL for telemetry api extension
1212
telemetryURL = "2022-07-01/telemetry"
1313
// Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
14-
telemetry_timeoutMs = 1000
15-
telemetry_maxBytes = 1048576
16-
telemetry_maxItems = 10000
1714
telemetry_receiverPort = 4243
1815
)
1916

2017
// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs.
21-
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) {
18+
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string, telemetryTimeoutMs int, telemetryMaxBytes int64, telemetryMaxItems int) ([]byte, error) {
2219
URL := client.baseURL + telemetryURL
2320

2421
reqBody, err := json.Marshal(map[string]interface{}{
2522
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
2623
"types": logEvents,
27-
"buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems},
24+
"buffering": map[string]interface{}{"timeoutMs": telemetryTimeoutMs, "maxBytes": telemetryMaxBytes, "maxItems": telemetryMaxItems},
2825
"schemaVersion": "2022-07-01",
2926
})
3027
if err != nil {

lambda-extensions/lambdaapi/telemetryapiclient_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ func TestSubscribeToTelemetryAPI(t *testing.T) {
2626
client := NewClient(srv.URL[7:], extensionName)
2727

2828
// Without Context
29-
response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"})
29+
response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"}, 1000, 262144, 10000)
3030
commonAsserts(t, client, response, err)
3131

3232
// With Context
33-
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"})
33+
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}, 1000, 262144, 10000)
3434
commonAsserts(t, client, response, err)
3535
}

lambda-extensions/sumologic-extension.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func runTimeAPIInit() (int64, error) {
6666

6767
// Subscribe to Telemetry API
6868
logger.Debug("Subscribing Extension to Telemetry API........")
69-
subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes)
69+
subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes, config.TelemetryTimeoutMs, config.TelemetryMaxBytes, config.TelemetryMaxItems)
7070
if err != nil {
7171
return 0, err
7272
}

0 commit comments

Comments
 (0)