Skip to content

Commit 0261720

Browse files
authored
adding logs api example for elasticsearch (#8)
* adding logs api example for elasticsearch * updating existing elasticsearch extension with logs api framework * adding copyright to elasticsearch_producer * updated readme with updated log output * removed dependency on the requests module * removing outdate info from readme * updating readme to include function deployment configuration requirements
1 parent 7d903cf commit 0261720

File tree

10 files changed

+317
-203
lines changed

10 files changed

+317
-203
lines changed
Lines changed: 18 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,38 @@
1-
# Example ElasticSearch Extension in Python
2-
The provided code sample demonstrates how to get a basic extension written in Python 3 up and running.
1+
# Example Logs API Extension in Python for Elasticsearch
2+
The provided code sample demonstrates how to get a basic Logs API extension for Elasticsearch written in Python 3 up and running.
33

4-
> Note: This extension requires the Python 3 runtime to be present in the Lambda execution environment of your function.
4+
> Note: This extension requires the Python 3 runtime to be present in the Lambda execution environment of your function. This example code is not production ready. Use it with your own discretion after testing thoroughly.
55
6-
There are two components to this sample:
7-
* `extensions/`: This sub-directory should be extracted to /opt/extensions where the Lambda platform will scan for executables to launch extensions
8-
* `python-example-elasticsearch-extension/`: This sub-directory should be extracted to /opt/python-example-extension which is referenced by the `extensions/python-example-elasticsearch-extension` executable and includes a Python executable along with all of its necessary dependencies.
6+
In this example, we start by developing a simple extension and then add the ability to read logs from the Logs API. For more details on building an extension, please read the Extension API Developer Guide.
97

10-
## Prep Python Dependencies
11-
Install the extension dependencies locally, which will be mounted along with the extension code.
12-
13-
```bash
14-
$ cd python-example-elasticsearch-extension
15-
$ chmod +x extension.py
16-
$ pip3 install -r requirements.txt -t .
17-
$ cd ..
18-
```
8+
When the Lambda service sets up the execution environment, it runs the extension (logs_api_elasticsearch_extension.py). This extension first registers as an extension and then subscribes to the Logs API to receive the logs via HTTP protocol. It starts an HTTP listener which receives the logs and processes them.
199

2010
## Layer Setup Process
21-
The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located and another root directory called `python-example-elasticsearch-extension/`, where the core logic of the extension and its dependencies are located.
11+
The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located. The dependencies for the extension (logs_api_elasticsearch_extension.py) are found in the logs_api_elasticsearch_extension directory.
2212

2313
Creating zip package for the extension:
2414
```bash
25-
$ chmod +x extensions/python-example-elasticsearch-extension
15+
$ cd python-example-elasticsearch-extension
16+
$ chmod +x extensions/logs_api_elasticsearch_extension.py
2617
$ zip -r extension.zip .
2718
```
2819

29-
Ensure that you have aws-cli v2 for the commands below.
30-
Publish a new layer using the `extension.zip`. The output of the following command should provide you a layer arn.
20+
Publish a new layer using the `extension.zip`. The output of the following command should provides you a layer arn.
3121
```bash
3222
aws lambda publish-layer-version \
33-
--layer-name "python-example-extension" \
23+
--layer-name "python-example-elasticsearch-extension" \
3424
--region <use your region> \
3525
--zip-file "fileb://extension.zip"
3626
```
3727
Note the LayerVersionArn that is produced in the output.
38-
eg. `"LayerVersionArn": "arn:aws:lambda:<region>:123456789012:layer:<layerName>:1"`
39-
40-
Add the newly created layer version to a Python 3.8 runtime Lambda function. Ensure to include environment variables like `ES_ENDPOINT`="ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com" and `ES_INDEX`="extensions"
41-
28+
e.g. `"LayerVersionArn": "arn:aws:lambda:<region>:123456789012:layer:python-example-elasticsearch-extension:1"`
4229

43-
## Function Invocation and Extension Execution
44-
45-
When invoking the function, you should now see log messages from the example extension similar to the following:
46-
```
47-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX EXTENSION Name: python-example-elasticsearch-extension State: Ready Events: [INVOKE,SHUTDOWN]
48-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX START RequestId: 9ca08945-de9b-46ec-adc6-3fe9ef0d2e8d Version: $LATEST
49-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX python-example-elasticsearch-extension launching extension
50-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Registering...
51-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Registered with ID: 6ec8756c-4830-458b-9dda-156e5dda1cc1
52-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Waiting for event...
53-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Received event: {"eventType": "INVOKE", "deadlineMs": 1596756305517, "requestId": "4b61d2be-3ba0-4e99-9121-30268b462c77", "invokedFunctionArn": "", "tracing": {"type": "X-Amzn-Trace-Id", "value": ""}}
54-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Sending payload: {"functionName": "defaultFunctionName", "functionVersion": "$LATEST", "requestId": "4b61d2be-3ba0-4e99-9121-30268b462c77", "waitDuration": 3787.946044921875}
55-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Attempting POST to: https://ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com/extensions/_doc
56-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Response: {"_index":"extensions","_type":"_doc","_id":"eW8WxnMB4bYnBqSFvK5w","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}
57-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX [python-example-elasticsearch-extension] Waiting for event...
58-
...
59-
...
60-
Function logs...
61-
...
62-
...
63-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX END RequestId: 9ca08945-de9b-46ec-adc6-3fe9ef0d2e8d
64-
XXXX-XX-XXTXX:XX:XX.XXX-XX:XX REPORT RequestId: 9ca08945-de9b-46ec-adc6-3fe9ef0d2e8d Duration: 80.36 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 67 MB Init Duration: 297.83 ms
30+
Add the newly created layer version to a Python 3.8 runtime Lambda function.
31+
```bash
32+
aws lambda update-function-configuration --region <use your region> --function-name <your function name> --layers <LayerVersionArn from previous step>
6533
```
6634

67-
68-
69-
70-
35+
## Lambda Function Deployment
36+
When deploying the Lambda function, be sure to include configure two environment variables that the extension will leverage for communicating with the Elasticsearch cluster.
37+
ES_ENDPOINT: The endpoint for the Elasticsearch cluster, e.g. elasticsearch.example.com (don't need to include https:// or the trailing /). This endpoint must be reachable from the Lambda function.
38+
ES_INDEX: The index to which the extension should write log outputs.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#!/bin/sh
2+
''''exec python -u -- "$0" ${1+"$@"} # '''
3+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
# SPDX-License-Identifier: MIT-0
5+
import json
6+
import os
7+
8+
from logs_api_elasticsearch_extension.http_listener import http_server_init, RECEIVER_PORT
9+
from logs_api_elasticsearch_extension.logs_api_client import LogsAPIClient
10+
from logs_api_elasticsearch_extension.extensions_api_client import ExtensionsAPIClient
11+
from logs_api_elasticsearch_extension.elasticsearch_producer import ElasticsearchProducer
12+
from queue import Queue
13+
14+
"""Here is the sample extension code that stitches all of this together.
15+
- The extension will run two threads. The "main" thread, will register to ExtensionAPI and process its invoke and
16+
shutdown events (see next call). The second "listener" thread will listen for HTTP Post events that deliver log batches.
17+
- The "listener" thread will place every log batch it receives in a synchronized queue; during each execution slice,
18+
the "main" thread will make sure to process any event in the queue before returning control by invoking next again.
19+
- Note that because of the asynchronous nature of the system, it is possible that logs for one invoke are
20+
processed during the next invoke slice. Likewise, it is possible that logs for the last invoke are processed during
21+
the SHUTDOWN event.
22+
23+
Note:
24+
25+
1. This is a simple example extension to make you help start investigating the Lambda Runtime Logs API.
26+
This code is not production ready, and it has never intended to be. Use it with your own discretion after you tested
27+
it thoroughly.
28+
29+
2. The extension code is starting with a shebang this is to bring Python runtime to the execution environment.
30+
This works if the lambda function is a python3.x function therefore it brings python3.x runtime with itself.
31+
It may not work for python 2.7 or other runtimes.
32+
The recommended best practice is to compile your extension into an executable binary and not rely on the runtime.
33+
34+
3. This file needs to be executable, so make sure you add execute permission to the file
35+
`chmod +x logs_api_http_extension.py`
36+
37+
"""
38+
39+
class LogsAPIHTTPExtension():
40+
def __init__(self, agent_name, registration_body, subscription_body):
41+
print(f"Initializing LogsAPIExternalExtension {agent_name}")
42+
self.agent_name = agent_name
43+
self.queue = Queue()
44+
self.logs_api_client = LogsAPIClient()
45+
self.extensions_api_client = ExtensionsAPIClient()
46+
self.es_endpoint = os.environ['ES_ENDPOINT']
47+
self.es_index = os.environ['ES_INDEX']
48+
self.es_producer = ElasticsearchProducer(self.agent_name, self.es_endpoint, self.es_index)
49+
50+
# Register early so Runtime could start in parallel
51+
self.agent_id = self.extensions_api_client.register(self.agent_name, registration_body)
52+
53+
# Start listening before Logs API registration
54+
http_server_init(self.queue)
55+
self.logs_api_client.subscribe(self.agent_id, subscription_body)
56+
57+
def run_forever(self):
58+
print(f"Serving LogsAPIHTTPExternalExtension {self.agent_name}")
59+
while True:
60+
resp = self.extensions_api_client.next(self.agent_id)
61+
# Process the received batches if any.
62+
while not self.queue.empty():
63+
batch = self.queue.get_nowait()
64+
# This line logs the events received to CloudWatch.
65+
# Replace it to send logs to elsewhere.
66+
# If you've subscribed to extension logs, e.g. "types": ["platform", "function", "extension"],
67+
# you'll receive the logs of this extension back from Logs API.
68+
# And if you log it again with the line below, it will create a cycle since you receive it back again.
69+
# Use `extension` log type if you'll egress it to another endpoint,
70+
# or make sure you've implemented a protocol to handle this case.
71+
for item in batch:
72+
self.es_producer.send(item)
73+
74+
# Register for the INVOKE events from the EXTENSIONS API
75+
_REGISTRATION_BODY = {
76+
"events": ["INVOKE", "SHUTDOWN"],
77+
}
78+
79+
# Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
80+
TIMEOUT_MS = 1000 # Maximum time (in milliseconds) that a batch would be buffered.
81+
MAX_BYTES = 262144 # Maximum size in bytes that the logs would be buffered in memory.
82+
MAX_ITEMS = 10000 # Maximum number of events that would be buffered in memory.
83+
84+
_SUBSCRIPTION_BODY = {
85+
"destination":{
86+
"protocol": "HTTP",
87+
"URI": f"http://sandbox:{RECEIVER_PORT}",
88+
},
89+
"types": ["platform", "function"],
90+
"buffering": {
91+
"timeoutMs": TIMEOUT_MS,
92+
"maxBytes": MAX_BYTES,
93+
"maxItems": MAX_ITEMS
94+
}
95+
}
96+
97+
def main():
98+
print(f"Starting Extensions {_REGISTRATION_BODY} {_SUBSCRIPTION_BODY}")
99+
# Note: Agent name has to be file name to register as an external extension
100+
ext = LogsAPIHTTPExtension(os.path.basename(__file__), _REGISTRATION_BODY, _SUBSCRIPTION_BODY)
101+
ext.run_forever()
102+
103+
if __name__ == "__main__":
104+
main()

python-example-elasticsearch-extension/extensions/logs_api_elasticsearch_extension/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
import json
5+
import sys
6+
import urllib.request
7+
8+
class ElasticsearchProducer():
9+
def __init__(self, agent_name, endpoint, index):
10+
self.agent_name = agent_name
11+
self.endpoint = endpoint
12+
self.index = index
13+
14+
def send(self, payload):
15+
url = f"https://{self.endpoint}/{self.index}/_doc"
16+
try:
17+
if isinstance(payload["record"], str):
18+
converted = payload
19+
converted["record"] = json.loads(payload["record"].replace("'",'"').rstrip())
20+
else:
21+
converted = payload
22+
req = urllib.request.Request(url)
23+
req.method = "POST"
24+
req.add_header("Content-Type", "application/json")
25+
req.data = json.dumps(converted).encode("utf-8")
26+
resp = urllib.request.urlopen(req)
27+
except urllib.request.HTTPError as e:
28+
print(f"[{self.agent_name}] HTTPError: {e}", flush=True)
29+
sys.exit(1)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
import json
5+
import os
6+
import sys
7+
import urllib.request
8+
9+
""" Demonstrates code to register your extension as an extension.
10+
"""
11+
12+
LAMBDA_AGENT_NAME_HEADER_KEY = "Lambda-Extension-Name"
13+
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"
14+
15+
class ExtensionsAPIClient():
16+
def __init__(self):
17+
try:
18+
runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
19+
self.runtime_api_base_url = f"http://{runtime_api_address}/2020-01-01/extension"
20+
except Exception as e:
21+
raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e
22+
23+
# Register as early as possible - the runtime initialization starts after all extensions have registered.
24+
def register(self, agent_unique_name, registration_body):
25+
try:
26+
print(f"Registering to ExtensionsAPIClient on {self.runtime_api_base_url}")
27+
req = urllib.request.Request(f"{self.runtime_api_base_url}/register")
28+
req.method = 'POST'
29+
req.add_header(LAMBDA_AGENT_NAME_HEADER_KEY, agent_unique_name)
30+
req.add_header("Content-Type", "application/json")
31+
data = json.dumps(registration_body).encode("utf-8")
32+
req.data = data
33+
resp = urllib.request.urlopen(req)
34+
if resp.status != 200:
35+
print(f"/register request to ExtensionsAPIClient failed. Status: {resp.status}, Response: {resp.read()}")
36+
# Fail the extension
37+
sys.exit(1)
38+
agent_identifier = resp.headers.get(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY)
39+
return agent_identifier
40+
except Exception as e:
41+
raise Exception(f"Failed to register to ExtensionsAPIClient: on {self.runtime_api_base_url}/register \
42+
with agent_unique_name:{agent_unique_name} \
43+
and registration_body:{registration_body}\nError: {e}") from e
44+
45+
# Call the following method when the extension is ready to receive the next invocation
46+
# and there is no job it needs to execute beforehand.
47+
def next(self, agent_id):
48+
try:
49+
req = urllib.request.Request(f"{self.runtime_api_base_url}/event/next")
50+
req.method = 'GET'
51+
req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
52+
req.add_header("Content-Type", "application/json")
53+
resp = urllib.request.urlopen(req)
54+
if resp.status != 200:
55+
print(f"/event/next request to ExtensionsAPIClient failed. Status: {resp.status}, Response: {resp.read()} ")
56+
# Fail the extension
57+
sys.exit(1)
58+
data = resp.read()
59+
print(f"Received response from ExtensionsAPIClient: {data}")
60+
return data
61+
except Exception as e:
62+
raise Exception(f"Failed to get /event/next from ExtensionsAPIClient: {e}") from e
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
import json
5+
import sys
6+
from http.server import BaseHTTPRequestHandler, HTTPServer
7+
from threading import Event, Thread
8+
9+
""" Demonstrates code to set up an HTTP listener and receive log events
10+
"""
11+
12+
RECEIVER_IP = "0.0.0.0"
13+
RECEIVER_PORT = 4243
14+
15+
def http_server_init(queue):
16+
def handler(*args):
17+
LogsHandler(queue, *args)
18+
print(f"Initializing HTTP Server on {RECEIVER_IP}:{RECEIVER_PORT}")
19+
server = HTTPServer((RECEIVER_IP, RECEIVER_PORT), handler)
20+
21+
# Ensure that the server thread is scheduled so that the server binds to the port
22+
# and starts to listening before subscribe for the logs and ask for the next event.
23+
started_event = Event()
24+
server_thread = Thread(target=serve, daemon=True, args=(started_event, server,))
25+
server_thread.start()
26+
rc = started_event.wait(timeout = 9)
27+
if rc is not True:
28+
raise Exception("server_thread has timedout before starting")
29+
30+
31+
# Server implementation
32+
class LogsHandler(BaseHTTPRequestHandler):
33+
def __init__(self, queue, *args):
34+
self.queue = queue
35+
BaseHTTPRequestHandler.__init__(self, *args)
36+
37+
def do_POST(self):
38+
try:
39+
cl = self.headers.get("Content-Length")
40+
if cl:
41+
data_len = int(cl)
42+
else:
43+
data_len = 0
44+
content = self.rfile.read(data_len)
45+
self.send_response(200)
46+
self.end_headers()
47+
batch = json.loads(content.decode("utf-8"))
48+
self.queue.put(batch)
49+
50+
except Exception as e:
51+
print(f"Error processing message: {e}")
52+
53+
# Server thread
54+
def serve(started_event, server):
55+
# Notify that this thread is up and running
56+
started_event.set()
57+
try:
58+
print(f"Serving HTTP Server on {RECEIVER_IP}:{RECEIVER_PORT}")
59+
server.serve_forever()
60+
except:
61+
print(f"Error in HTTP server {sys.exc_info()[0]}")
62+
finally:
63+
if server:
64+
server.shutdown()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
import json
5+
import os
6+
import sys
7+
import urllib.request
8+
9+
""" Demonstrates code to call the Logs API to subscribe to log events
10+
"""
11+
12+
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"
13+
14+
class LogsAPIClient:
15+
def __init__(self):
16+
try:
17+
runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
18+
self.logs_api_base_url = f"http://{runtime_api_address}/2020-08-15"
19+
except Exception as e:
20+
raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e
21+
22+
# Method to call the Logs API to subscribe to log events.
23+
def subscribe(self, agent_id, subscription_body):
24+
try:
25+
print(f"Subscribing to Logs API on {self.logs_api_base_url}")
26+
req = urllib.request.Request(f"{self.logs_api_base_url}/logs")
27+
req.method = 'PUT'
28+
req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
29+
req.add_header("Content-Type", "application/json")
30+
data = json.dumps(subscription_body).encode("utf-8")
31+
req.data = data
32+
resp = urllib.request.urlopen(req)
33+
if resp.status != 200:
34+
print(f"Could not subscribe to Logs API: {resp.status} {resp.read()}")
35+
# Fail the extension
36+
sys.exit(1)
37+
print(f"Successfully subscribed to Logs API: {resp.read()}")
38+
except Exception as e:
39+
raise Exception(f"Failed to subscribe to Logs API on {self.logs_api_base_url} with id: {agent_id} \
40+
and subscription_body: {json.dumps(subscription_body).encode('utf-8')} \nError:{e}") from e

0 commit comments

Comments
 (0)