|
| 1 | +#!/bin/sh |
| 2 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 3 | +# SPDX-License-Identifier: MIT-0 |
| 4 | + |
| 5 | +''''exec python -u -- "$0" ${1+"$@"} # ''' |
| 6 | +import os |
| 7 | +import sys |
| 8 | +from pathlib import Path |
| 9 | +from datetime import datetime |
| 10 | +# Add lib folder to path to import boto3 library. |
| 11 | +# Normally with Lambda Layers, python libraries are put into the /python folder which is in the path. |
| 12 | +# As this extension is bringing its own Python runtime, and running a separate process, the path is not available. |
| 13 | +# Hence, having the files in a different folder and adding it to the path, makes it available. |
| 14 | +lib_folder = Path(__file__).parent / "lib" |
| 15 | +sys.path.insert(0,str(lib_folder)) |
| 16 | +import boto3 |
| 17 | + |
| 18 | +from logs_api_http_extension.http_listener import http_server_init, RECEIVER_PORT |
| 19 | +from logs_api_http_extension.logs_api_client import LogsAPIClient |
| 20 | +from logs_api_http_extension.extensions_api_client import ExtensionsAPIClient |
| 21 | + |
| 22 | +from queue import Queue |
| 23 | + |
| 24 | +"""Here is the sample extension code. |
| 25 | + - The extension runs two threads. The "main" thread, will register with the Extension API and process its invoke and |
| 26 | + shutdown events (see next call). The second "listener" thread listens for HTTP POST events that deliver log batches. |
| 27 | + - The "listener" thread places every log batch it receives in a synchronized queue; during each execution slice, |
| 28 | + the "main" thread will make sure to process any event in the queue before returning control by invoking next again. |
| 29 | + - Note that because of the asynchronous nature of the system, it is possible that logs for one invoke are |
| 30 | + processed during the next invoke slice. Likewise, it is possible that logs for the last invoke are processed during |
| 31 | + the SHUTDOWN event. |
| 32 | +
|
| 33 | +Note: |
| 34 | +
|
| 35 | +1. This is a simple example extension to help you understand the Lambda Logs API. |
| 36 | + This code is not production ready. Use it with your own discretion after testing it thoroughly. |
| 37 | +
|
| 38 | +2. The extension code starts with a shebang. This is to bring Python runtime to the execution environment. |
| 39 | + This works if the lambda function is a python3.x function, therefore it brings the python3.x runtime with itself. |
| 40 | + It may not work for python 2.7 or other runtimes. |
| 41 | + The recommended best practice is to compile your extension into an executable binary and not rely on the runtime. |
| 42 | + |
| 43 | +3. This file needs to be executable, so make sure you add execute permission to the file |
| 44 | + `chmod +x logs_api_http_extension.py` |
| 45 | +
|
| 46 | +""" |
| 47 | + |
| 48 | +class LogsAPIHTTPExtension(): |
| 49 | + def __init__(self, agent_name, registration_body, subscription_body): |
| 50 | + # print(f"extension.logs_api_http_extension: Initializing LogsAPIExternalExtension {agent_name}") |
| 51 | + self.agent_name = agent_name |
| 52 | + self.queue = Queue() |
| 53 | + self.logs_api_client = LogsAPIClient() |
| 54 | + self.extensions_api_client = ExtensionsAPIClient() |
| 55 | + |
| 56 | + # Register early so Runtime could start in parallel |
| 57 | + self.agent_id = self.extensions_api_client.register(self.agent_name, registration_body) |
| 58 | + |
| 59 | + # Start listening before Logs API registration |
| 60 | +# print(f"extension.logs_api_http_extension: Starting HTTP Server {agent_name}") |
| 61 | + http_server_init(self.queue) |
| 62 | + self.logs_api_client.subscribe(self.agent_id, subscription_body) |
| 63 | + |
| 64 | + def run_forever(self): |
| 65 | + # Configuring S3 Connection |
| 66 | + s3_bucket = (os.environ['S3_BUCKET_NAME']) |
| 67 | + s3 = boto3.resource('s3') |
| 68 | + print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}") |
| 69 | + while True: |
| 70 | + resp = self.extensions_api_client.next(self.agent_id) |
| 71 | + # Process the received batches if any. |
| 72 | + while not self.queue.empty(): |
| 73 | + batch = self.queue.get_nowait() |
| 74 | + # This following line logs the events received to CloudWatch. |
| 75 | + # Replace it to send logs to elsewhere. |
| 76 | + # If you've subscribed to extension logs, e.g. "types": ["platform", "function", "extension"], |
| 77 | + # you'll receive the logs of this extension back from Logs API. |
| 78 | + # And if you log it again with the line below, it will create a cycle since you receive it back again. |
| 79 | + # Use `extension` log type if you'll egress it to another endpoint, |
| 80 | + # or make sure you've implemented a protocol to handle this case. |
| 81 | +# print(f"Log Batch Received from Lambda: {batch}", flush=True) |
| 82 | + |
| 83 | +# There are two options illustrated: |
| 84 | +# 1. Sending the entire log batch to S3 |
| 85 | +# 2. Parsing the batch and sending individual log lines. |
| 86 | +# This could be used to parse the log lines and only selectively send logs to S3, or amend for any other destination. |
| 87 | + |
| 88 | +# 1. The following line writes the entire batch to S3 |
| 89 | + s3_filename = (os.environ['AWS_LAMBDA_FUNCTION_NAME'])+'-'+(datetime.now().strftime('%Y-%m-%d-%H:%M:%S.%f'))+'.log' |
| 90 | + try: |
| 91 | + response = s3.Bucket(s3_bucket).put_object(Key=s3_filename, Body=str(batch)) |
| 92 | + except Exception as e: |
| 93 | + raise Exception(f"Error sending log to S3 {e}") from e |
| 94 | +# 2. The following parses the batch and sends individual log line |
| 95 | +# try: |
| 96 | +# for item in range(len(batch)): |
| 97 | +# s3_filename = (os.environ['AWS_LAMBDA_FUNCTION_NAME'])+'-'+(datetime.now().strftime('%Y-%m-%d-%H:%M:%S.%f'))+'.'+str(item)+'.log' |
| 98 | +# content = str(batch[item]) |
| 99 | +# response = s3.Bucket(s3_bucket).put_object(Key=s3_filename, Body=content) |
| 100 | +# except Exception as e: |
| 101 | +# raise Exception(f"Error sending log to S3 {e}") from e |
| 102 | + |
| 103 | +# Register for the INVOKE events from the EXTENSIONS API |
| 104 | +_REGISTRATION_BODY = { |
| 105 | + "events": ["INVOKE", "SHUTDOWN"], |
| 106 | +} |
| 107 | + |
| 108 | +# Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol. |
| 109 | + |
| 110 | +TIMEOUT_MS = 1000 # Maximum time (in milliseconds) that a batch is buffered. |
| 111 | +MAX_BYTES = 262144 # Maximum size in bytes that the logs are buffered in memory. |
| 112 | +MAX_ITEMS = 10000 # Maximum number of events that are buffered in memory. |
| 113 | + |
| 114 | +_SUBSCRIPTION_BODY = { |
| 115 | + "destination":{ |
| 116 | + "protocol": "HTTP", |
| 117 | + "URI": f"http://sandbox:{RECEIVER_PORT}", |
| 118 | + }, |
| 119 | + "types": ["platform", "function"], |
| 120 | + "buffering": { |
| 121 | + "timeoutMs": TIMEOUT_MS, |
| 122 | + "maxBytes": MAX_BYTES, |
| 123 | + "maxItems": MAX_ITEMS |
| 124 | + } |
| 125 | +} |
| 126 | + |
| 127 | +def main(): |
| 128 | +# print(f"extension.logs_api_http_extension: Starting Extension {_REGISTRATION_BODY} {_SUBSCRIPTION_BODY}") |
| 129 | + # Note: Agent name has to be file name to register as an external extension |
| 130 | + ext = LogsAPIHTTPExtension(os.path.basename(__file__), _REGISTRATION_BODY, _SUBSCRIPTION_BODY) |
| 131 | + ext.run_forever() |
| 132 | + |
| 133 | +if __name__ == "__main__": |
| 134 | + main() |
0 commit comments