Skip to content

Commit ecdfa47

Browse files
mmaallMichael Lanthier
and
Michael Lanthier
authored
Adaptive Batching Example Extension (#45)
Co-authored-by: Michael Lanthier <[email protected]>
1 parent 0c7eb0b commit ecdfa47

File tree

15 files changed

+1385
-0
lines changed

15 files changed

+1385
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ In this repository you'll find a number of different sample projects and demos t
2727
* [Logs API extension in Python](python-example-logs-api-extension/)
2828
* [Logs API extension in Python for Elasticsearch](python-example-elasticsearch-extension/)
2929
* [Logs API extension in Node.js](nodejs-example-logs-api-extension/)
30+
* [Adaptive Batching extension in Go](go-example-adaptive-batching-extension/)
3031
* [Inter-process communication extension in Go](go-example-ipc-extension/)
3132
* [Crash uploader extension in Go](go-example-crash-uploader-extension/)
3233
* [Lambda layer extension using SAM](go-example-extension-sam-layer/)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
build:
2+
GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-example-adaptive-batching main.go
3+
4+
build-GoExampleAdaptiveBatchingExtensionLayer:
5+
GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/go-example-adaptive-batching main.go
6+
chmod +x $(ARTIFACTS_DIR)/extensions/go-example-adaptive-batching
7+
8+
run-GoExampleAdaptiveBatchingExtensionLayer:
9+
go run go-example-adaptive-batching/main.go
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Adaptive Batching Extension in Go
2+
3+
The provided code sample demonstrates how to use the Logs API extension written in Go to adaptively batch log data to a destination (S3).
4+
5+
> This is a simple example extension to help you start investigating the Lambda Runtime Logs API. This example code is not production ready. Use it with your own discretion after testing thoroughly.
6+
7+
This sample extension:
8+
* Subscribes to receive platform and function logs
9+
* Runs with a main and a helper goroutine: The main goroutine registers to ExtensionAPI and process its invoke and shutdown events (see nextEvent call). The helper goroutine:
10+
- starts a local HTTP server at the provided port (default 1234) that receives requests from Logs API
11+
- puts the logs in a synchronized queue (Producer) to be processed by the main goroutine (Consumer)
12+
* The main go routine tracks the number of invokes, last time logs were shipped, and the size of the logs that have been accumulated since the last ship using a structure. Once one of these fields exceeds a set value the log shipping process begins and a new file with the logs is created in S3.
13+
14+
## Requirements
15+
16+
* [AWS SAM CLI ](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) - **minimum version 0.48**.
17+
18+
## Installation instructions
19+
20+
1. [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and login.
21+
22+
2. Clone the repository onto your local development machine:
23+
```bash
24+
git clone https://github.com/aws-samples/aws-lambda-extensions.git
25+
```
26+
3. Enter the directory
27+
```bash
28+
cd go-example-adaptive-batching/
29+
```
30+
31+
4. Run the following command for AWS SAM to deploy the components as specified in the `template.yaml` file:
32+
```bash
33+
sam build
34+
# If you don't have 'Python' or 'make' installed, you can use the option to build using a container which uses a python3.8 Docker container image
35+
# sam build --use-container
36+
sam deploy --stack-name adaptive-batching-extension --guided
37+
```
38+
39+
Following the prompts:
40+
41+
* Accept the default Stack Name `adaptive-batching-extension`.
42+
* Enter your preferred Region
43+
* Accept the defaults for the remaining questions.
44+
45+
AWS SAM deploys the application stack which includes the Lambda function and an IAM Role. AWS SAM creates a layer for the runtime, a layer for the extension, and adds them to the function.
46+
47+
Note the outputted S3 bucket name. This is where the logs will be outputted.
48+
49+
## Invoking the Lambda function
50+
You can now invoke the Lambda function. Amend the Region and use the following command:
51+
```bash
52+
aws lambda invoke \
53+
--function-name "adaptive-batching-extension-demo-function" \
54+
--payload '{"payload": "hello"}' /tmp/invoke-result \
55+
--cli-binary-format raw-in-base64-out \
56+
--log-type Tail \
57+
--region <use your Region>
58+
```
59+
The function should return `"StatusCode": 200`
60+
61+
Browse to the [Amazon CloudWatch Console](https://console.aws.amazon.com/cloudwatch). Navigate to *Logs\Log Groups*. Select the log group **/aws/lambda/adaptive-batching-extension-demo-function**.
62+
63+
View the log stream to see the platform, function, and extensions each logging while they are processing.
64+
65+
The logging extension also receives the log stream directly from Lambda, and copies the logs to S3.
66+
67+
Browse to the [Amazon S3 Console](https://console.aws.amazon.com/S3). Navigate to the S3 bucket created as part of the SAM deployment.
68+
69+
Downloading the file object containing the copied log stream. The log contains the same platform and function logs, but not the extension logs, as specified during the subscription.
70+
71+
72+
## Environment Variables
73+
74+
This section details environment variables that can be used to modify the functionality of the extension. Some are required and are marked as such.
75+
76+
* ADAPTIVE_BATCHING_EXTENSION_S3_BUCKET : (REQUIRED) The value of this variable will be used to create a bucket or use an existing bucket if it is created previously. The logs received from Logs API will be written in a file inside that bucket. For S3 bucket naming rules, see [AWS docs](https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html). The SAM template will set this by default.
77+
* ADAPTIVE_BATCHING_EXTENSION_SHIP_RATE_BYTES : Logs are shipped to S3 once log size reaches the number of bytes defined here. For example a value of 1024 here would result in logs being shipped once the log size exceeds 1 kilobyte in size. Default value of 4096 bytes (4 kilobytes).
78+
* ADAPTIVE_BATCHING_EXTENSION_SHIP_RATE_INVOKES : Logs are shipped to S3 once the number of invokes reaches the number defined here. For example a value of 10 would result in logs being shipped at least once every 10 invokes since the last time logs were shipped. Default value is 10 invokes.
79+
* ADAPTIVE_BATCHING_EXTENSION_SHIP_RATE_MILLISECONDS : Logs are shipped to S3 once the amount of time elapsed since the last time logs were shipped is exceeded. For example a value of 60,000 here would result in logs being shipped once every 60 seconds has passed. The default value is 10,0000 milliseconds.
80+
* ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES: This is a JSON array the log types that can be requested from the Logs API. These are the supported log types `["platform", "function", "extension"]`. If not included or parsing errors occur, the log types default to `["platform", "function"]`.
81+
82+
## Performance, maximums, and environment shutdown
83+
84+
Logs are shipped to S3 based off of the rates defined in the environment variables or the default values provided. If any of the conditions are met, the logs queued will be shipped to S3. The rates defined are only checked when an invoke to lambda occurs. This means for ADAPTIVE_BATCHING_EXTENSION_SHIP_RATE_MILLISECONDS, the time elapsed may be exceeded and the metric will only be checked once an invocation occurs. So for a rate of 100 ms, if there are 200 ms gaps between lambda invocations, logs will be shipped every 200 ms, once invocations occur.
85+
86+
Lambda extensions share Lambda function resources, like CPU, memory, and storage, with your function code. The extension here has a limit written into `agent/metrics.go` with MAX_SHIP_RATE_BYTES to prevent users from using too much memory. It is currently set to 50 megabytes, meaning the maximum rate that can be set for ADAPTIVE_BATCHING_EXTENSION_SHIP_RATE_BYTES is 50 megabytes. Any value set above will default to that maximum value of 50 megabytes. That maximum can be increased by modifying the constant in `agent/metrics/go`.
87+
88+
In the case of the Lambda environment shutting down, either from error or stagnation the extension will flush the log queue and upload the final log file to S3. For information about the Lambda environment shutdown phase, see [AWS docs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-lifecycle-shutdown)
89+
90+
## Outputs
91+
Log files shipped to S3 are put into prefixes depending on where the execution is occurring. For every lambda execution environment that is started, the extension will generate an environment uuid. A prefix for that execution environment is then created using this format `<year>-<month>-<day>-<environment-uuid>/`. Within a given prefix files will show up in the S3 bucket depending on how many logs are generated and the thresholds defined. Files are named as follows, `<function-name>-<timestamp>-<file-uuid>.log`.
92+
93+
This results in the following full format for a log file placed in S3.
94+
`<year>-<month>-<day>-<environment-uuid>/<function-name>-<timestamp>-<file-uuid>.log`
95+
96+
97+
98+
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT-0
3+
4+
package agent
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"errors"
10+
"fmt"
11+
"io/ioutil"
12+
"net/http"
13+
"os"
14+
"time"
15+
16+
"aws-lambda-extensions/go-example-adaptive-batching-extension/logsapi"
17+
"aws-lambda-extensions/go-example-adaptive-batching-extension/queuewrapper"
18+
log "github.com/sirupsen/logrus"
19+
)
20+
21+
var httpLogger = log.WithFields(log.Fields{"agent": "httpAgent"})
22+
23+
// DefaultHttpListenerPort is used to set the URL where the logs will be sent by Logs API
24+
const DefaultHttpListenerPort = "1234"
25+
26+
// LogsApiHttpListener is used to listen to the Logs API using HTTP
27+
type LogsApiHttpListener struct {
28+
httpServer *http.Server
29+
// logQueue is a synchronous queue and is used to put the received logs to be consumed later (see main)
30+
logQueue *queuewrapper.QueueWrapper
31+
}
32+
33+
// NewLogsApiHttpListener returns a LogsApiHttpListener with the given log queue
34+
func NewLogsApiHttpListener(lq *queuewrapper.QueueWrapper) (*LogsApiHttpListener, error) {
35+
36+
return &LogsApiHttpListener{
37+
httpServer: nil,
38+
logQueue: lq,
39+
}, nil
40+
}
41+
42+
func ListenOnAddress() string {
43+
env_aws_local, ok := os.LookupEnv("AWS_SAM_LOCAL")
44+
if ok && "true" == env_aws_local {
45+
return ":" + DefaultHttpListenerPort
46+
}
47+
48+
return "sandbox.localdomain:" + DefaultHttpListenerPort
49+
}
50+
51+
// Start initiates the server in a goroutine where the logs will be sent
52+
func (s *LogsApiHttpListener) Start() (bool, error) {
53+
address := ListenOnAddress()
54+
s.httpServer = &http.Server{Addr: address}
55+
http.HandleFunc("/", s.http_handler)
56+
go func() {
57+
logger.Infof("Serving agent on %s", address)
58+
err := s.httpServer.ListenAndServe()
59+
if err != http.ErrServerClosed {
60+
logger.Errorf("Unexpected stop on Http Server: %v", err)
61+
s.Shutdown()
62+
} else {
63+
logger.Errorf("Http Server closed %v", err)
64+
}
65+
}()
66+
return true, nil
67+
}
68+
69+
// http_handler handles the requests coming from the Logs API.
70+
// Everytime Logs API sends logs, this function will read the logs from the response body
71+
// and put them into a synchronous queue to be read by the main goroutine.
72+
// Logging or printing besides the error cases below is not recommended if you have subscribed to receive extension logs.
73+
// Otherwise, logging here will cause Logs API to send new logs for the printed lines which will create an infinite loop.
74+
func (h *LogsApiHttpListener) http_handler(w http.ResponseWriter, r *http.Request) {
75+
body, err := ioutil.ReadAll(r.Body)
76+
if err != nil {
77+
logger.Errorf("Error reading body: %+v", err)
78+
return
79+
}
80+
81+
//fmt.Println("Logs API event received:", string(body))
82+
83+
// Puts the log message into the queue
84+
err = h.logQueue.Put(string(body))
85+
if err != nil {
86+
logger.Errorf("Can't push logs to destination: %v", err)
87+
}
88+
}
89+
90+
// Shutdown terminates the HTTP server listening for logs
91+
func (s *LogsApiHttpListener) Shutdown() {
92+
if s.httpServer != nil {
93+
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
94+
err := s.httpServer.Shutdown(ctx)
95+
if err != nil {
96+
logger.Errorf("Failed to shutdown http server gracefully %s", err)
97+
} else {
98+
s.httpServer = nil
99+
}
100+
}
101+
}
102+
103+
// HttpAgent has the listener that receives the logs and the logger that handles the received logs
104+
type HttpAgent struct {
105+
listener *LogsApiHttpListener
106+
logger *S3Logger
107+
}
108+
109+
// NewHttpAgent returns an agent to listen and handle logs coming from Logs API for HTTP
110+
// Make sure the agent is initialized by calling Init(agentId) before subscription for the Logs API.
111+
func NewHttpAgent(s3Logger *S3Logger, jq *queuewrapper.QueueWrapper) (*HttpAgent, error) {
112+
113+
logsApiListener, err := NewLogsApiHttpListener(jq)
114+
if err != nil {
115+
return nil, err
116+
}
117+
118+
return &HttpAgent{
119+
logger: s3Logger,
120+
listener: logsApiListener,
121+
}, nil
122+
}
123+
124+
// Init initializes the configuration for the Logs API and subscribes to the Logs API for HTTP
125+
func (a HttpAgent) Init(agentID string) error {
126+
extensions_api_address, ok := os.LookupEnv("AWS_LAMBDA_RUNTIME_API")
127+
if !ok {
128+
return errors.New("AWS_LAMBDA_RUNTIME_API is not set")
129+
}
130+
131+
logsApiBaseUrl := fmt.Sprintf("http://%s", extensions_api_address)
132+
133+
logsApiClient, err := logsapi.NewClient(logsApiBaseUrl)
134+
if err != nil {
135+
return err
136+
}
137+
138+
_, err = a.listener.Start()
139+
if err != nil {
140+
return err
141+
}
142+
143+
// Read environment variable ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES
144+
inputJson := os.Getenv("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES")
145+
inputJsonBytes := []byte(inputJson)
146+
147+
var eventTypes []logsapi.EventType
148+
149+
// No Json included
150+
if inputJson == "" {
151+
// Hold defaults
152+
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
153+
httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES not included, subscribing to default log types")
154+
} else if !json.Valid(inputJsonBytes) {
155+
// Invalid JSON provided
156+
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
157+
httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES includes invalid JSON, subscribing to default log types")
158+
} else {
159+
160+
// Unmarshal json into structure
161+
var jsonArray []logsapi.EventType
162+
163+
err = json.Unmarshal(inputJsonBytes, &jsonArray)
164+
if err != nil {
165+
// Error unmarshaling json
166+
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
167+
httpLogger.Info("Unable to unmarshal json from ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES, subscribing to default log types")
168+
}
169+
170+
// If array is empty, use default values
171+
if len(jsonArray) == 0 {
172+
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
173+
httpLogger.Info("LogTypes in ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES does not include any elements, subscribing to default log types")
174+
}
175+
176+
// loop through elements, and check if required elements are included
177+
178+
for _, logType := range jsonArray {
179+
switch logType {
180+
case logsapi.Platform:
181+
eventTypes = append(eventTypes, logsapi.Platform)
182+
case logsapi.Function:
183+
eventTypes = append(eventTypes, logsapi.Function)
184+
case logsapi.Extension:
185+
eventTypes = append(eventTypes, logsapi.Extension)
186+
default:
187+
httpLogger.Info("Log type ", logType, " is not valid. Not including")
188+
}
189+
}
190+
191+
}
192+
193+
bufferingCfg := logsapi.BufferingCfg{
194+
MaxItems: 1000,
195+
MaxBytes: 262144,
196+
TimeoutMS: 25,
197+
}
198+
if err != nil {
199+
return err
200+
}
201+
destination := logsapi.Destination{
202+
Protocol: logsapi.HttpProto,
203+
URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s", DefaultHttpListenerPort)),
204+
HttpMethod: logsapi.HttpPost,
205+
Encoding: logsapi.JSON,
206+
}
207+
208+
_, err = logsApiClient.Subscribe(eventTypes, bufferingCfg, destination, agentID)
209+
return err
210+
}
211+
212+
// Shutdown finalizes the logging and terminates the listener
213+
func (a *HttpAgent) Shutdown() {
214+
err := a.logger.Shutdown()
215+
if err != nil {
216+
logger.Errorf("Error when trying to shutdown logger: %v", err)
217+
}
218+
219+
a.listener.Shutdown()
220+
}

0 commit comments

Comments
 (0)