Skip to content

Commit 7d924b1

Browse files
authored
Merge pull request #19 from npande/npande_lambda_telemetry_api
Checkin for lambda telemetry api support for extension.
2 parents 5ee12ea + 9bb28bf commit 7d924b1

File tree

7 files changed

+134
-20
lines changed

7 files changed

+134
-20
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Releasing new layer versions
8585
`sh zip.sh`
8686

8787

88-
- The new wheel package gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).
88+
- The new extension binary and zip files gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).
8989

9090
Run below commands to create and push tags
9191

lambda-extensions/config/config.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type LambdaExtensionConfig struct {
3535
LambdaRegion string
3636
SourceCategoryOverride string
3737
EnhanceJsonLogs bool
38+
EnableSpanDrops bool
3839
}
3940

4041
var defaultLogTypes = []string{"platform", "function"}
@@ -66,6 +67,7 @@ func GetConfig() (*LambdaExtensionConfig, error) {
6667
}
6768
return config, nil
6869
}
70+
6971
func (cfg *LambdaExtensionConfig) setDefaults() {
7072
numRetry := os.Getenv("SUMO_NUM_RETRIES")
7173
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
@@ -75,6 +77,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
7577
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
7678
logTypes := os.Getenv("SUMO_LOG_TYPES")
7779
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
80+
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
7881

7982
if numRetry == "" {
8083
cfg.NumRetry = 3
@@ -88,7 +91,6 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
8891
if maxConcurrentRequests == "" {
8992
cfg.MaxConcurrentRequests = 3
9093
}
91-
9294
if enableFailover == "" {
9395
cfg.EnableFailover = false
9496
}
@@ -106,6 +108,10 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
106108
if enhanceJsonLogs == "" {
107109
cfg.EnhanceJsonLogs = true
108110
}
111+
if enableSpanDrops == "" {
112+
// by default, spans will not be dropped if user did not configure the env variable
113+
cfg.EnableSpanDrops = false
114+
}
109115
}
110116

111117
func (cfg *LambdaExtensionConfig) validateConfig() error {
@@ -116,6 +122,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
116122
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
117123
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
118124
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
125+
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
119126

120127
var allErrors []string
121128
var err error
@@ -139,7 +146,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
139146
}
140147
}
141148

142-
if cfg.EnableFailover == true {
149+
if cfg.EnableFailover {
143150
if cfg.S3BucketName == "" {
144151
allErrors = append(allErrors, "SUMO_S3_BUCKET_NAME not set in environment variable")
145152
}
@@ -173,25 +180,24 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
173180
} else {
174181
cfg.MaxDataQueueLength = int(customMaxDataQueueLength)
175182
}
176-
177183
}
184+
178185
if maxConcurrentRequests != "" {
179186
customMaxConcurrentRequests, err := strconv.ParseInt(maxConcurrentRequests, 10, 32)
180187
if err != nil {
181188
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_MAX_CONCURRENT_REQUESTS: %v", err))
182189
} else {
183190
cfg.MaxConcurrentRequests = int(customMaxConcurrentRequests)
184191
}
185-
186192
}
193+
187194
if logLevel != "" {
188195
customloglevel, err := logrus.ParseLevel(logLevel)
189196
if err != nil {
190197
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_LOG_LEVEL: %v", err))
191198
} else {
192199
cfg.LogLevel = customloglevel
193200
}
194-
195201
}
196202

197203
if enhanceJsonLogs != "" {
@@ -201,6 +207,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
201207
}
202208
}
203209

210+
if enableSpanDrops != "" {
211+
cfg.EnableSpanDrops, err = strconv.ParseBool(enableSpanDrops)
212+
if err != nil {
213+
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_SPAN_DROP: %v", err))
214+
}
215+
}
216+
204217
// test valid log format type
205218
for _, logType := range cfg.LogTypes {
206219
if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) {

lambda-extensions/config/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
// ExtensionName same as binary name or file name where main exists
1010
var ExtensionName = filepath.Base(os.Args[0])
11-
var layerVersion = "4"
11+
var layerVersion = "7"
1212

1313
// SumoLogicExtensionLayerVersionSuffix denotes the layer version published in AWS
1414
var SumoLogicExtensionLayerVersionSuffix string = fmt.Sprintf("%s-prod:%s", ExtensionName, layerVersion)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package lambdaapi
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
)
9+
10+
const (
11+
// Base URL for telemetry api extension
12+
telemetryURL = "2022-07-01/telemetry"
13+
// Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
14+
telemetry_timeoutMs = 1000
15+
telemetry_maxBytes = 1048576
16+
telemetry_maxItems = 10000
17+
telemetry_receiverPort = 4243
18+
)
19+
20+
// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs.
21+
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) {
22+
URL := client.baseURL + telemetryURL
23+
24+
reqBody, err := json.Marshal(map[string]interface{}{
25+
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
26+
"types": logEvents,
27+
"buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems},
28+
"schemaVersion": "2022-07-01",
29+
})
30+
if err != nil {
31+
return nil, err
32+
}
33+
headers := map[string]string{
34+
extensionIdentiferHeader: client.extensionID,
35+
}
36+
var response []byte
37+
if ctx != nil {
38+
response, err = client.MakeRequestWithContext(ctx, headers, bytes.NewBuffer(reqBody), "PUT", URL)
39+
} else {
40+
response, err = client.MakeRequest(headers, bytes.NewBuffer(reqBody), "PUT", URL)
41+
}
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return response, nil
47+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package lambdaapi
2+
3+
import (
4+
"context"
5+
"io/ioutil"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
)
10+
11+
func TestSubscribeToTelemetryAPI(t *testing.T) {
12+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
13+
assertEqual(t, r.Method, http.MethodPut, "Method is not PUT")
14+
assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present")
15+
16+
reqBytes, err := ioutil.ReadAll(r.Body)
17+
assertNoError(t, err, "Received error")
18+
defer r.Body.Close()
19+
assertNotEmpty(t, reqBytes, "Received error in request")
20+
21+
w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
22+
w.WriteHeader(200)
23+
}))
24+
25+
defer srv.Close()
26+
client := NewClient(srv.URL[7:], extensionName)
27+
28+
// Without Context
29+
response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"})
30+
commonAsserts(t, client, response, err)
31+
32+
// With Context
33+
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"})
34+
commonAsserts(t, client, response, err)
35+
}

lambda-extensions/sumoclient/sumoclient.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (s *sumoLogicClient) failoverHandler(buf *bytes.Buffer) error {
101101
}
102102
err = utils.UploadToS3(&s.config.S3BucketName, &keyName, buf)
103103
if err != nil {
104-
err = fmt.Errorf("Failed to Send to S3 Bucket %s Path %s: %w", s.config.S3BucketName, keyName, err)
104+
err = fmt.Errorf("failed to send to s3 bucket %s path %s: %w", s.config.S3BucketName, keyName, err)
105105
}
106106
return err
107107
}
@@ -159,13 +159,22 @@ func (s *sumoLogicClient) createCWLogLine(item map[string]interface{}) {
159159

160160
message, ok := item["record"].(map[string]interface{})
161161
if ok {
162-
delete(item, "record")
162+
s.logger.Debug("Not dropping record, if logType is platform.report.")
163+
// delete(item, "record")
163164
}
165+
164166
// Todo convert this to struct
167+
// Updated cwMessageLine to also cover new field initDurationMs as record.metrics do have it.
165168
metric := message["metrics"].(map[string]interface{})
166-
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB",
167-
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"])
168-
item["message"] = cwMessageLine
169+
if metric["initDurationMs"] == nil {
170+
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB",
171+
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"])
172+
item["message"] = cwMessageLine
173+
} else {
174+
cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB Init Duration: %v ms",
175+
message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"], metric["initDurationMs"])
176+
item["message"] = cwMessageLine
177+
}
169178
}
170179

171180
func (s *sumoLogicClient) getLogGroup() string {
@@ -210,17 +219,26 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) {
210219
}
211220
} else if ok && logType == "platform.report" {
212221
s.createCWLogLine(item)
222+
} else if ok && logType == "platform.runtimeDone" {
223+
message, ok := item["record"].(map[string]interface{})
224+
if ok {
225+
_, ok := message["spans"]
226+
if ok && s.config.EnableSpanDrops {
227+
// dropping spans if its present and configured to drop
228+
delete(message, "spans")
229+
}
230+
}
213231
}
214232
}
215233
}
216234

217235
func (s *sumoLogicClient) transformBytesToArrayOfMap(rawmsg []byte) (responseBody, error) {
218236
s.logger.Debugln("Transforming bytes to array of maps")
219237
var msg responseBody
220-
var err error
221-
err = json.Unmarshal(rawmsg, &msg)
238+
// var err error
239+
var err error = json.Unmarshal(rawmsg, &msg)
222240
if err != nil {
223-
return msg, fmt.Errorf("Error in parsing payload %s: %v", string(rawmsg), err)
241+
return msg, fmt.Errorf("error in parsing payload %s: %v", string(rawmsg), err)
224242
}
225243
return msg, err
226244
}
@@ -253,7 +271,7 @@ func (s *sumoLogicClient) createChunks(msgArr responseBody) ([]string, error) {
253271
}
254272
chunks = append(chunks, currentChunk.String())
255273
if errorCount > 0 {
256-
err = fmt.Errorf("Dropping %d messages due to json parsing error", errorCount)
274+
err = fmt.Errorf("dropping %d messages due to json parsing error", errorCount)
257275
}
258276
s.logger.Debugf("Chunks created: %d NumOfParsingError: %d", len(chunks), errorCount)
259277
return chunks, err

lambda-extensions/sumologic-extension.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ func runTimeAPIInit() (int64, error) {
6464
}
6565
logger.Debug("Succcessfully Registered with Run Time API Client: ", utils.PrettyPrint(registerResponse))
6666

67-
// Subscribe to Logs API
68-
logger.Debug("Subscribing Extension to Logs API........")
69-
subscribeResponse, err := extensionClient.SubscribeToLogsAPI(nil, config.LogTypes)
67+
// Subscribe to Telemetry API
68+
logger.Debug("Subscribing Extension to Telemetry API........")
69+
subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes)
7070
if err != nil {
7171
return 0, err
7272
}
73-
logger.Debug("Successfully subscribed to Logs API: ", utils.PrettyPrint(string(subscribeResponse)))
73+
74+
logger.Debug("Successfully subscribed to Telemetry API: ", utils.PrettyPrint(string(subscribeResponse)))
7475

7576
// Call next to say registration is successful and get the deadtimems
7677
nextResponse, err := nextEvent(nil)

0 commit comments

Comments
 (0)