Skip to content

Commit 69dbd27

Browse files
author
Rohan Gujarathi
committed
feature: add LambdaStep support for sagemaker pipelines
1 parent 5b7e1b1 commit 69dbd27

File tree

8 files changed

+1039
-7
lines changed

8 files changed

+1039
-7
lines changed

src/sagemaker/lambda_helper.py

+253
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# Copyright 2019-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""This module contains helper methods related to Lambda."""
14+
from __future__ import print_function, absolute_import
15+
16+
from io import BytesIO
17+
import zipfile
18+
from botocore.exceptions import ClientError
19+
from sagemaker.session import Session
20+
21+
22+
class Lambda:
23+
"""Contains lambda boto3 wrappers to Create, Update, Delete and Invoke Lambda functions."""
24+
25+
def __init__(
26+
self,
27+
function_arn: str = None,
28+
function_name: str = None,
29+
execution_role_arn: str = None,
30+
zipped_code_dir: str = None,
31+
s3_bucket: str = None,
32+
script: str = None,
33+
handler: str = None,
34+
session: Session = Session(),
35+
timeout: int = 120,
36+
memory_size: int = 128,
37+
runtime: str = "python3.8",
38+
):
39+
"""Constructs a Lambda instance.
40+
41+
This instance represents a Lambda function and provides methods for updating,
42+
deleting and invoking the function.
43+
44+
This class can be used either for creating a new Lambda function or using an existing one.
45+
When using an existing Lambda function, only the function_arn argument is required.
46+
When creating a new one the function_name, execution_role_arn and handler arguments
47+
are required, as well as either script or zipped_code_dir.
48+
49+
Args:
50+
function_arn (str): The arn of the Lambda function.
51+
function_name (str): The name of the Lambda function.
52+
Function name must be provided to create a Lambda function.
53+
execution_role_arn (str): The role to be attached to Lambda function.
54+
zipped_code_dir (str): The path of the zipped code package of the Lambda function.
55+
s3_bucket (str): The bucket where zipped code is uploaded.
56+
If not provided, default session bucket is used to upload zipped_code_dir.
57+
script (str): The path of Lambda function script for direct zipped upload
58+
handler (str): The Lambda handler. The format for handler should be
59+
file_name.function_name. For ex: if the name of the Lambda script is
60+
hello_world.py and Lambda function definition in that script is
61+
lambda_handler(event, context), the handler should be hello_world.lambda_handler
62+
session (sagemaker.session.Session): Session object which manages interactions
63+
with Amazon SageMaker APIs and any other AWS services needed.
64+
If not specified, new session is created.
65+
timeout (int): Timeout of the Lambda function in seconds. Default is 120 seconds.
66+
memory_size (int): Memory of the Lambda function in megabytes. Default is 128 MB.
67+
runtime (str): Runtime of the Lambda function. Default is set to python3.8.
68+
"""
69+
self.function_arn = function_arn
70+
self.function_name = function_name
71+
self.zipped_code_dir = zipped_code_dir
72+
self.s3_bucket = s3_bucket
73+
self.script = script
74+
self.handler = handler
75+
self.execution_role_arn = execution_role_arn
76+
self.session = session
77+
self.timeout = timeout
78+
self.memory_size = memory_size
79+
self.runtime = runtime
80+
81+
if function_arn is None and function_name is None:
82+
raise ValueError("Either function_arn or function_name must be provided.")
83+
84+
if function_name is not None:
85+
if execution_role_arn is None:
86+
raise ValueError("execution_role_arn must be provided.")
87+
if zipped_code_dir is None and script is None:
88+
raise ValueError("Either zipped_code_dir or script must be provided.")
89+
if zipped_code_dir and script:
90+
raise ValueError("Provide either script or zipped_code_dir, not both.")
91+
if handler is None:
92+
raise ValueError("Lambda handler must be provided.")
93+
94+
def create(self):
95+
"""Method to create a lambda function.
96+
97+
Returns: boto3 response from Lambda's create_function method.
98+
"""
99+
lambda_client = _get_lambda_client(self.session)
100+
101+
if self.function_name is None:
102+
raise ValueError("FunctionName must be provided to create a Lambda function.")
103+
104+
if self.script is not None:
105+
code = {"ZipFile": _zip_lambda_code(self.script)}
106+
else:
107+
bucket = self.s3_bucket or self.session.default_bucket()
108+
key = _upload_to_s3(
109+
s3_client=_get_s3_client(self.session),
110+
function_name=self.function_name,
111+
zipped_code_dir=self.zipped_code_dir,
112+
s3_bucket=bucket,
113+
)
114+
code = {"S3Bucket": bucket, "S3Key": key}
115+
116+
try:
117+
response = lambda_client.create_function(
118+
FunctionName=self.function_name,
119+
Runtime=self.runtime,
120+
Handler=self.handler,
121+
Role=self.execution_role_arn,
122+
Code=code,
123+
Timeout=self.timeout,
124+
MemorySize=self.memory_size,
125+
)
126+
return response
127+
except ClientError as e:
128+
error = e.response["Error"]
129+
raise ValueError(error)
130+
131+
def update(self):
132+
"""Method to update a lambda function.
133+
134+
Returns: boto3 response from Lambda's update_function method.
135+
"""
136+
lambda_client = _get_lambda_client(self.session)
137+
138+
if self.script is not None:
139+
try:
140+
response = lambda_client.update_function_code(
141+
FunctionName=self.function_name, ZipFile=_zip_lambda_code(self.script)
142+
)
143+
return response
144+
except ClientError as e:
145+
error = e.response["Error"]
146+
raise ValueError(error)
147+
else:
148+
try:
149+
response = lambda_client.update_function_code(
150+
FunctionName=(self.function_name or self.function_arn),
151+
S3Bucket=self.s3_bucket,
152+
S3Key=_upload_to_s3(
153+
s3_client=_get_s3_client(self.session),
154+
function_name=self.function_name,
155+
zipped_code_dir=self.zipped_code_dir,
156+
s3_bucket=self.s3_bucket,
157+
),
158+
)
159+
return response
160+
except ClientError as e:
161+
error = e.response["Error"]
162+
raise ValueError(error)
163+
164+
def invoke(self):
165+
"""Method to invoke a lambda function.
166+
167+
Returns: boto3 response from Lambda's invoke method.
168+
"""
169+
lambda_client = _get_lambda_client(self.session)
170+
try:
171+
response = lambda_client.invoke(
172+
FunctionName=self.function_name or self.function_arn,
173+
InvocationType="RequestResponse",
174+
)
175+
return response
176+
except ClientError as e:
177+
error = e.response["Error"]
178+
raise ValueError(error)
179+
180+
def delete(self):
181+
"""Method to delete a lambda function.
182+
183+
Returns: boto3 response from Lambda's delete_function method.
184+
"""
185+
lambda_client = _get_lambda_client(self.session)
186+
try:
187+
response = lambda_client.delete_function(
188+
FunctionName=self.function_name or self.function_arn
189+
)
190+
return response
191+
except ClientError as e:
192+
error = e.response["Error"]
193+
raise ValueError(error)
194+
195+
196+
def _get_s3_client(session):
197+
"""Method to get a boto3 s3 client.
198+
199+
Returns: a s3 client.
200+
"""
201+
sagemaker_session = session or Session()
202+
if sagemaker_session.s3_client is None:
203+
s3_client = sagemaker_session.boto_session.client(
204+
"s3", region_name=sagemaker_session.boto_region_name
205+
)
206+
else:
207+
s3_client = sagemaker_session.s3_client
208+
return s3_client
209+
210+
211+
def _get_lambda_client(session):
212+
"""Method to get a boto3 lambda client.
213+
214+
Returns: a lambda client.
215+
"""
216+
sagemaker_session = session or Session()
217+
if sagemaker_session.lambda_client is None:
218+
lambda_client = sagemaker_session.boto_session.client(
219+
"lambda", region_name=sagemaker_session.boto_region_name
220+
)
221+
else:
222+
lambda_client = sagemaker_session.lambda_client
223+
return lambda_client
224+
225+
226+
def _upload_to_s3(s3_client, function_name, zipped_code_dir, s3_bucket):
227+
"""Upload the zipped code to S3 bucket provided in the Lambda instance.
228+
229+
Lambda instance must have a path to the zipped code folder and a S3 bucket to upload
230+
the code. The key will lambda/function_name/code and the S3 URI where the code is
231+
uploaded is in this format: s3://bucket_name/lambda/function_name/code.
232+
233+
Returns: the S3 key where the code is uploaded.
234+
"""
235+
key = "{}/{}/{}".format("lambda", function_name, "code")
236+
s3_client.upload_file(zipped_code_dir, s3_bucket, key)
237+
return key
238+
239+
240+
def _zip_lambda_code(script):
241+
"""This method zips the lambda function script.
242+
243+
Lambda function script is provided in the lambda instance and reads that zipped file.
244+
245+
Returns: A buffer of zipped lambda function script.
246+
"""
247+
buffer = BytesIO()
248+
code_dir = script.split("/")[-1]
249+
250+
with zipfile.ZipFile(buffer, "w") as z:
251+
z.write(script, code_dir)
252+
buffer.seek(0)
253+
return buffer.read()

src/sagemaker/session.py

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def __init__(
116116
self.s3_resource = None
117117
self.s3_client = None
118118
self.config = None
119+
self.lambda_client = None
119120

120121
self._initialize(
121122
boto_session=boto_session,

0 commit comments

Comments
 (0)