-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: add LambdaStep support for sagemaker pipelines #2544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
47cef38
change: use sagemaker_session in workflow tests
8a5dbfa
remove unused imports
c9c30bd
Merge branch 'master' into master
metrizable f2d9f45
Merge branch 'master' into master
metrizable a50f41c
Merge branch 'master' into master
metrizable 6f68232
Merge branch 'master' into master
metrizable 5cf09d4
Merge remote-tracking branch 'upstream/master' into master
585de94
Merge branch 'aws:master' into master
rohangujarathi 0fb9a1d
Merge branch 'aws:master' into master
rohangujarathi 1ec09da
Merge branch 'aws:master' into master
rohangujarathi 65ebea3
Merge branch 'aws:master' into master
rohangujarathi c9e563d
Merge branch 'aws:master' into master
rohangujarathi 5b7e1b1
Merge branch 'aws:master' into master
rohangujarathi 69dbd27
feature: add LambdaStep support for sagemaker pipelines
08b10fb
fix: flake8 error
db7c577
fix: unit tests
05a33d0
fix: unit tests
93a907f
fix: another unit tests fix
fe0bf03
add session in integ tests
8e53226
fix integ test
9ce3fbe
fix black error
ea671dc
doc update for lambda step
9c9bfda
add new line at the end
34fd20a
fix doc8 error
3af7a39
Merge branch 'master' into master
rohangujarathi 67be628
Merge branch 'master' into master
ahsan-z-khan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
Lambda Utilities | ||
---------------- | ||
|
||
.. automodule:: sagemaker.lambda_helper | ||
:members: | ||
:undoc-members: | ||
:show-inheritance: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"). You | ||
# may not use this file except in compliance with the License. A copy of | ||
# the License is located at | ||
# | ||
# http://aws.amazon.com/apache2.0/ | ||
# | ||
# or in the "license" file accompanying this file. This file is | ||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | ||
# ANY KIND, either express or implied. See the License for the specific | ||
# language governing permissions and limitations under the License. | ||
"""This module contains helper methods related to Lambda.""" | ||
from __future__ import print_function, absolute_import | ||
|
||
from io import BytesIO | ||
import zipfile | ||
from botocore.exceptions import ClientError | ||
from sagemaker.session import Session | ||
|
||
|
||
class Lambda: | ||
"""Contains lambda boto3 wrappers to Create, Update, Delete and Invoke Lambda functions.""" | ||
|
||
def __init__( | ||
self, | ||
function_arn: str = None, | ||
function_name: str = None, | ||
execution_role_arn: str = None, | ||
zipped_code_dir: str = None, | ||
s3_bucket: str = None, | ||
script: str = None, | ||
handler: str = None, | ||
session: Session = None, | ||
timeout: int = 120, | ||
memory_size: int = 128, | ||
runtime: str = "python3.8", | ||
): | ||
"""Constructs a Lambda instance. | ||
|
||
This instance represents a Lambda function and provides methods for updating, | ||
deleting and invoking the function. | ||
|
||
This class can be used either for creating a new Lambda function or using an existing one. | ||
When using an existing Lambda function, only the function_arn argument is required. | ||
When creating a new one the function_name, execution_role_arn and handler arguments | ||
are required, as well as either script or zipped_code_dir. | ||
|
||
Args: | ||
function_arn (str): The arn of the Lambda function. | ||
function_name (str): The name of the Lambda function. | ||
Function name must be provided to create a Lambda function. | ||
execution_role_arn (str): The role to be attached to Lambda function. | ||
zipped_code_dir (str): The path of the zipped code package of the Lambda function. | ||
s3_bucket (str): The bucket where zipped code is uploaded. | ||
If not provided, default session bucket is used to upload zipped_code_dir. | ||
script (str): The path of Lambda function script for direct zipped upload | ||
handler (str): The Lambda handler. The format for handler should be | ||
file_name.function_name. For ex: if the name of the Lambda script is | ||
hello_world.py and Lambda function definition in that script is | ||
lambda_handler(event, context), the handler should be hello_world.lambda_handler | ||
session (sagemaker.session.Session): Session object which manages interactions | ||
with Amazon SageMaker APIs and any other AWS services needed. | ||
If not specified, new session is created. | ||
timeout (int): Timeout of the Lambda function in seconds. Default is 120 seconds. | ||
memory_size (int): Memory of the Lambda function in megabytes. Default is 128 MB. | ||
runtime (str): Runtime of the Lambda function. Default is set to python3.8. | ||
""" | ||
self.function_arn = function_arn | ||
self.function_name = function_name | ||
self.zipped_code_dir = zipped_code_dir | ||
self.s3_bucket = s3_bucket | ||
self.script = script | ||
self.handler = handler | ||
self.execution_role_arn = execution_role_arn | ||
self.session = session if session is not None else Session() | ||
self.timeout = timeout | ||
self.memory_size = memory_size | ||
self.runtime = runtime | ||
|
||
if function_arn is None and function_name is None: | ||
raise ValueError("Either function_arn or function_name must be provided.") | ||
|
||
if function_name is not None: | ||
if execution_role_arn is None: | ||
raise ValueError("execution_role_arn must be provided.") | ||
if zipped_code_dir is None and script is None: | ||
raise ValueError("Either zipped_code_dir or script must be provided.") | ||
if zipped_code_dir and script: | ||
raise ValueError("Provide either script or zipped_code_dir, not both.") | ||
if handler is None: | ||
raise ValueError("Lambda handler must be provided.") | ||
|
||
def create(self): | ||
"""Method to create a lambda function. | ||
|
||
Returns: boto3 response from Lambda's create_function method. | ||
""" | ||
lambda_client = _get_lambda_client(self.session) | ||
|
||
if self.function_name is None: | ||
raise ValueError("FunctionName must be provided to create a Lambda function.") | ||
|
||
if self.script is not None: | ||
code = {"ZipFile": _zip_lambda_code(self.script)} | ||
else: | ||
bucket = self.s3_bucket or self.session.default_bucket() | ||
key = _upload_to_s3( | ||
s3_client=_get_s3_client(self.session), | ||
function_name=self.function_name, | ||
zipped_code_dir=self.zipped_code_dir, | ||
s3_bucket=bucket, | ||
) | ||
code = {"S3Bucket": bucket, "S3Key": key} | ||
|
||
try: | ||
response = lambda_client.create_function( | ||
FunctionName=self.function_name, | ||
Runtime=self.runtime, | ||
Handler=self.handler, | ||
Role=self.execution_role_arn, | ||
Code=code, | ||
Timeout=self.timeout, | ||
MemorySize=self.memory_size, | ||
) | ||
return response | ||
except ClientError as e: | ||
error = e.response["Error"] | ||
raise ValueError(error) | ||
|
||
def update(self): | ||
"""Method to update a lambda function. | ||
|
||
Returns: boto3 response from Lambda's update_function method. | ||
""" | ||
lambda_client = _get_lambda_client(self.session) | ||
|
||
if self.script is not None: | ||
try: | ||
response = lambda_client.update_function_code( | ||
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script) | ||
) | ||
return response | ||
except ClientError as e: | ||
error = e.response["Error"] | ||
raise ValueError(error) | ||
else: | ||
try: | ||
response = lambda_client.update_function_code( | ||
FunctionName=(self.function_name or self.function_arn), | ||
S3Bucket=self.s3_bucket, | ||
S3Key=_upload_to_s3( | ||
s3_client=_get_s3_client(self.session), | ||
function_name=self.function_name, | ||
zipped_code_dir=self.zipped_code_dir, | ||
s3_bucket=self.s3_bucket, | ||
), | ||
) | ||
return response | ||
except ClientError as e: | ||
error = e.response["Error"] | ||
raise ValueError(error) | ||
|
||
def invoke(self): | ||
"""Method to invoke a lambda function. | ||
|
||
Returns: boto3 response from Lambda's invoke method. | ||
""" | ||
lambda_client = _get_lambda_client(self.session) | ||
try: | ||
response = lambda_client.invoke( | ||
FunctionName=self.function_name or self.function_arn, | ||
InvocationType="RequestResponse", | ||
) | ||
return response | ||
except ClientError as e: | ||
error = e.response["Error"] | ||
raise ValueError(error) | ||
|
||
def delete(self): | ||
"""Method to delete a lambda function. | ||
|
||
Returns: boto3 response from Lambda's delete_function method. | ||
""" | ||
lambda_client = _get_lambda_client(self.session) | ||
try: | ||
response = lambda_client.delete_function( | ||
FunctionName=self.function_name or self.function_arn | ||
) | ||
return response | ||
except ClientError as e: | ||
error = e.response["Error"] | ||
raise ValueError(error) | ||
|
||
|
||
def _get_s3_client(session): | ||
"""Method to get a boto3 s3 client. | ||
|
||
Returns: a s3 client. | ||
""" | ||
sagemaker_session = session or Session() | ||
if sagemaker_session.s3_client is None: | ||
s3_client = sagemaker_session.boto_session.client( | ||
"s3", region_name=sagemaker_session.boto_region_name | ||
) | ||
else: | ||
s3_client = sagemaker_session.s3_client | ||
return s3_client | ||
|
||
|
||
def _get_lambda_client(session): | ||
"""Method to get a boto3 lambda client. | ||
|
||
Returns: a lambda client. | ||
""" | ||
sagemaker_session = session or Session() | ||
if sagemaker_session.lambda_client is None: | ||
lambda_client = sagemaker_session.boto_session.client( | ||
"lambda", region_name=sagemaker_session.boto_region_name | ||
) | ||
else: | ||
lambda_client = sagemaker_session.lambda_client | ||
return lambda_client | ||
|
||
|
||
def _upload_to_s3(s3_client, function_name, zipped_code_dir, s3_bucket): | ||
"""Upload the zipped code to S3 bucket provided in the Lambda instance. | ||
|
||
Lambda instance must have a path to the zipped code folder and a S3 bucket to upload | ||
the code. The key will lambda/function_name/code and the S3 URI where the code is | ||
uploaded is in this format: s3://bucket_name/lambda/function_name/code. | ||
|
||
Returns: the S3 key where the code is uploaded. | ||
""" | ||
key = "{}/{}/{}".format("lambda", function_name, "code") | ||
s3_client.upload_file(zipped_code_dir, s3_bucket, key) | ||
return key | ||
|
||
|
||
def _zip_lambda_code(script): | ||
"""This method zips the lambda function script. | ||
|
||
Lambda function script is provided in the lambda instance and reads that zipped file. | ||
|
||
Returns: A buffer of zipped lambda function script. | ||
""" | ||
buffer = BytesIO() | ||
code_dir = script.split("/")[-1] | ||
|
||
with zipfile.ZipFile(buffer, "w") as z: | ||
z.write(script, code_dir) | ||
buffer.seek(0) | ||
return buffer.read() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the years from the public PR as the new guideline.