Skip to content

Commit c47ed20

Browse files
Add Logs API examples and demo.
1 parent c152215 commit c47ed20

File tree

26 files changed

+1743
-0
lines changed

26 files changed

+1743
-0
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ In this repository you'll find a number of different sample projects and demos t
1515

1616
* [AWS AppConfig extension demo](awsappconfig-extension-demo/)
1717
* [Custom runtime extension demo](custom-runtime-extension-demo/)
18+
* [Logs to Amazon S3 extension demo](s3-logs-extension-demo/)
1819
* [Extension in Go](go-example-extension/)
1920
* [Extension in Python](python-example-extension/)
2021
* [Extension in Node.js](nodejs-example-extension/)
22+
* [Logs API Extension in Go](go-example-logs-api-extension/)
23+
* [Logs API Extension in Python](python-example-logs-api-extension/)
2124
* [Inter-process communication extension in Go](go-example-ipc-extension/)
2225
* [Crash uploader extension in Go](go-example-crash-uploader-extension/)
2326
* [ElasticSearch extension in Python](python-example-elasticsearch-extension/)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Example Logs API Extension in Go
2+
3+
The provided code sample demonstrates how to get a basic Logs API extension written in Go up and running.
4+
5+
> This is a simple example extension to help you start investigating the Lambda Runtime Logs API. This example code is not production ready. Use it with your own discretion after testing thoroughly.
6+
7+
This sample extension:
8+
* Subscribes to recieve platform and function logs
9+
* Runs with a main and a helper goroutine: The main goroutine registers to ExtensionAPI and process its invoke and shutdown events (see nextEvent call). The helper goroutine:
10+
- starts a local HTTP server at the provided port (default 1234) that receives requests from Logs API
11+
- puts the logs in a synchronized queue (Producer) to be processed by the main goroutine (Consumer)
12+
* Writes the received logs to an S3 Bucket
13+
14+
## Compile package and dependencies
15+
16+
To run this example, you will need to ensure that your build architecture matches that of the Lambda execution environment by compiling with `GOOS=linux` and `GOARCH=amd64` if you are not running in a Linux environment.
17+
18+
Building and saving package into a `bin/extensions` directory:
19+
```bash
20+
$ cd go-example-logs-api-extension
21+
$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-example-logs-api-extension main.go
22+
$ chmod +x bin/extensions/go-example-logs-api-extension
23+
```
24+
25+
## Layer Setup Process
26+
The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located. In this sample project we must include the `go-example-logs-api-extension` binary.
27+
28+
Creating zip package for the extension:
29+
```bash
30+
$ cd bin
31+
$ zip -r extension.zip extensions/
32+
```
33+
34+
Publish a new layer using the `extension.zip` and capture the produced layer arn in `layer_arn`. If you don't have jq command installed, you can run only the aws cli part and manually pass the layer arn to `aws lambda update-function-configuration`.
35+
```bash
36+
layer_arn=$(aws lambda publish-layer-version --layer-name "go-example-logs-api-extension" --region "<use your region>" --zip-file "fileb://extension.zip" | jq -r '.LayerVersionArn')
37+
```
38+
39+
Add the newly created layer version to a Lambda function.
40+
```bash
41+
aws lambda update-function-configuration --region <use your region> --function-name <your function name> --layers $layer_arn
42+
```
43+
44+
## Function Invocation and Extension Execution
45+
> Note: Your function role should have the AmazonS3FullAccess policy attached.
46+
47+
> Note: You need to add `LOGS_API_EXTENSION_S3_BUCKET` environment variable to your lambda function. The value of this variable will be used to create a bucket or use an existing bucket if it is created previously. The logs received from Logs API will be written in a file inside that bucket. For S3 bucket naming rules, see [AWS docs](https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html).
48+
49+
After invoking the function and receiving the shutdown event, you should now see log messages from the example extension written to an S3 bucket with the following name format:
50+
51+
`<function-name>-<timestamp>-<UUID>.log` in to the bucket set with the environment variable above.
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT-0
3+
4+
package agent
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"io/ioutil"
11+
"net/http"
12+
"os"
13+
"time"
14+
15+
"aws-lambda-extensions/go-example-logs-api-extension/logsapi"
16+
"github.com/golang-collections/go-datastructures/queue"
17+
)
18+
19+
// DefaulHttpListenerPort is used to set the URL where the logs will be sent by Logs API
20+
const DefaulHttpListenerPort = "1234"
21+
22+
// LogsApiHttpListener is used to listen to the Logs API using HTTP
23+
type LogsApiHttpListener struct {
24+
httpServer *http.Server
25+
// logQueue is a synchronous queue and is used to put the received logs to be consumed later (see main)
26+
logQueue *queue.Queue
27+
}
28+
29+
// NewLogsApiHttpListener returns a LogsApiHttpListener with the given log queue
30+
func NewLogsApiHttpListener(lq *queue.Queue) (*LogsApiHttpListener, error) {
31+
32+
return &LogsApiHttpListener{
33+
httpServer: nil,
34+
logQueue: lq,
35+
}, nil
36+
}
37+
38+
// Start initiates the server in a goroutine where the logs will be sent
39+
func (s *LogsApiHttpListener) Start() (bool, error) {
40+
s.httpServer = &http.Server{Addr: ":" + DefaulHttpListenerPort}
41+
http.HandleFunc("/", s.http_handler)
42+
go func() {
43+
logger.Infof("Serving agent on %s", ":"+DefaulHttpListenerPort)
44+
err := s.httpServer.ListenAndServe()
45+
if err != http.ErrServerClosed {
46+
logger.Errorf("Unexpected stop on Http Server: %v", err)
47+
s.Shutdown()
48+
} else {
49+
logger.Errorf("Http Server closed %v", err)
50+
}
51+
}()
52+
return true, nil
53+
}
54+
55+
// http_handler handles the requests coming from the Logs API.
56+
// Everytime Logs API sends logs, this function will read the logs from the response body
57+
// and put them into a synchronous queue to be read by the main goroutine.
58+
// Logging or printing besides the error cases below is not recommended if you have subscribed to receive extension logs.
59+
// Otherwise, logging here will cause Logs API to send new logs for the printed lines which will create an infinite loop.
60+
func (h *LogsApiHttpListener) http_handler(w http.ResponseWriter, r *http.Request) {
61+
body, err := ioutil.ReadAll(r.Body)
62+
if err != nil {
63+
logger.Errorf("Error reading body: %+v", err)
64+
return
65+
}
66+
67+
// Puts the log message into the queue
68+
err = h.logQueue.Put(string(body))
69+
if err != nil {
70+
logger.Errorf("Can't push logs to destination: %v", err)
71+
}
72+
}
73+
74+
// Shutdown terminates the HTTP server listening for logs
75+
func (s *LogsApiHttpListener) Shutdown() {
76+
if s.httpServer != nil {
77+
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
78+
err := s.httpServer.Shutdown(ctx)
79+
if err != nil {
80+
logger.Errorf("Failed to shutdown http server gracefully %s", err)
81+
} else {
82+
s.httpServer = nil
83+
}
84+
}
85+
}
86+
87+
// HttpAgent has the listener that receives the logs and the logger that handles the received logs
88+
type HttpAgent struct {
89+
listener *LogsApiHttpListener
90+
logger *S3Logger
91+
}
92+
93+
// NewHttpAgent returns an agent to listen and handle logs coming from Logs API for HTTP
94+
// Make sure the agent is initialized by calling Init(agentId) before subscription for the Logs API.
95+
func NewHttpAgent(s3Logger *S3Logger, jq *queue.Queue) (*HttpAgent, error) {
96+
97+
logsApiListener, err := NewLogsApiHttpListener(jq)
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
return &HttpAgent{
103+
logger: s3Logger,
104+
listener: logsApiListener,
105+
}, nil
106+
}
107+
108+
// Init initializes the configuration for the Logs API and subscribes to the Logs API for HTTP
109+
func (a HttpAgent) Init(agentID string) error {
110+
extensions_api_address, ok := os.LookupEnv("AWS_LAMBDA_RUNTIME_API")
111+
if !ok {
112+
return errors.New("AWS_LAMBDA_RUNTIME_API is not set")
113+
}
114+
115+
logsApiBaseUrl := fmt.Sprintf("http://%s", extensions_api_address)
116+
117+
logsApiClient, err := logsapi.NewClient(logsApiBaseUrl)
118+
if err != nil {
119+
return err
120+
}
121+
122+
_, err = a.listener.Start()
123+
if err != nil {
124+
return err
125+
}
126+
127+
eventTypes := []logsapi.EventType{logsapi.Platform, logsapi.Function}
128+
bufferingCfg := logsapi.BufferingCfg{
129+
MaxItems: 10000,
130+
MaxBytes: 262144,
131+
TimeoutMS: 1000,
132+
}
133+
if err != nil {
134+
return err
135+
}
136+
destination := logsapi.Destination{
137+
Protocol: logsapi.HttpProto,
138+
URI: logsapi.URI(fmt.Sprintf("http://sandbox:%s", DefaulHttpListenerPort)),
139+
HttpMethod: logsapi.HttpPost,
140+
Encoding: logsapi.JSON,
141+
}
142+
143+
_, err = logsApiClient.Subscribe(eventTypes, bufferingCfg, destination, agentID)
144+
return err
145+
}
146+
147+
// Shutdown finalizes the logging and terminates the listener
148+
func (a *HttpAgent) Shutdown() {
149+
err := a.logger.Shutdown()
150+
if err != nil {
151+
logger.Errorf("Error when trying to shutdown logger: %v", err)
152+
}
153+
154+
a.listener.Shutdown()
155+
}

0 commit comments

Comments
 (0)