Skip to content

Commit c951271

Browse files
authored
Merge branch 'master' into feat/s3-prefix-model-data-for-jumpstart-model
2 parents 2235bcc + 65ca6ba commit c951271

File tree

7 files changed

+562
-5
lines changed

7 files changed

+562
-5
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
build
33
src/*.egg-info
44
.cache
5-
.coverage
5+
.coverage*
66
sagemaker_venv*
77
*.egg-info
88
.tox
+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FeatureGroup Utilities
2+
----------------------
3+
4+
.. automodule:: sagemaker.feature_store.feature_utils
5+
:members:
6+
:undoc-members:
7+
:show-inheritance:

src/sagemaker/feature_store/feature_group.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,14 @@ def get_query_execution(self) -> Dict[str, Any]:
138138
query_execution_id=self._current_query_execution_id
139139
)
140140

141-
def as_dataframe(self) -> DataFrame:
141+
def as_dataframe(self, **kwargs) -> DataFrame:
142142
"""Download the result of the current query and load it into a DataFrame.
143143
144+
Args:
145+
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
146+
have a better tuning on data. For more info read:
147+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
148+
144149
Returns:
145150
A pandas DataFrame contains the query result.
146151
"""
@@ -161,7 +166,9 @@ def as_dataframe(self) -> DataFrame:
161166
query_execution_id=self._current_query_execution_id,
162167
filename=output_filename,
163168
)
164-
return pd.read_csv(output_filename, delimiter=",")
169+
170+
kwargs.pop("delimiter", None)
171+
return pd.read_csv(filepath_or_buffer=output_filename, delimiter=",", **kwargs)
165172

166173

167174
@attr.s
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Utilities for working with FeatureGroups and FeatureStores."""
14+
from __future__ import absolute_import
15+
16+
import re
17+
import logging
18+
19+
from typing import Union
20+
from pathlib import Path
21+
22+
import pandas
23+
import boto3
24+
from pandas import DataFrame, Series, read_csv
25+
26+
from sagemaker.feature_store.feature_group import FeatureGroup
27+
from sagemaker.session import Session
28+
29+
logger = logging.getLogger(__name__)
30+
31+
32+
def get_session_from_role(region: str, assume_role: str = None) -> Session:
33+
"""Method used to get the :class:`sagemaker.session.Session` from a region and/or a role.
34+
35+
Description:
36+
If invoked from a session with a role that lacks permissions, it can temporarily
37+
assume another role to perform certain tasks.
38+
If `assume_role` is not specified it will attempt to use the default sagemaker
39+
execution role to get the session to use the Feature Store runtime client.
40+
41+
Args:
42+
assume_role (str): (Optional) role name to be assumed
43+
region (str): region name
44+
45+
Returns:
46+
:class:`sagemaker.session.Session`
47+
"""
48+
boto_session = boto3.Session(region_name=region)
49+
50+
# It will try to assume the role specified
51+
if assume_role:
52+
sts = boto_session.client("sts", region_name=region)
53+
54+
credentials = sts.assume_role(
55+
RoleArn=assume_role, RoleSessionName="SagemakerExecution"
56+
).get("Credentials", {})
57+
58+
access_key_id = credentials.get("AccessKeyId", None)
59+
secret_access_key = credentials.get("SecretAccessKey", None)
60+
session_token = credentials.get("SessionToken", None)
61+
62+
boto_session = boto3.session.Session(
63+
region_name=region,
64+
aws_access_key_id=access_key_id,
65+
aws_secret_access_key=secret_access_key,
66+
aws_session_token=session_token,
67+
)
68+
69+
sagemaker_session = Session(
70+
boto_session=boto_session,
71+
sagemaker_client=boto_session.client("sagemaker"),
72+
sagemaker_runtime_client=boto_session.client("sagemaker-runtime"),
73+
sagemaker_featurestore_runtime_client=boto_session.client(
74+
service_name="sagemaker-featurestore-runtime"
75+
),
76+
)
77+
78+
return sagemaker_session
79+
80+
81+
def get_feature_group_as_dataframe(
82+
feature_group_name: str,
83+
athena_bucket: str,
84+
query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}"
85+
WHERE is_deleted=False """,
86+
role: str = None,
87+
region: str = None,
88+
session=None,
89+
event_time_feature_name: str = None,
90+
latest_ingestion: bool = True,
91+
verbose: bool = True,
92+
**kwargs,
93+
) -> DataFrame:
94+
""":class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame`
95+
96+
Examples:
97+
>>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe
98+
>>>
99+
>>> region = "eu-west-1"
100+
>>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group",
101+
>>> athena_bucket="s3://bucket/athena_queries",
102+
>>> region=region,
103+
>>> event_time_feature_name="EventTimeId"
104+
>>> )
105+
>>>
106+
>>> type(fg_data)
107+
<class 'pandas.core.frame.DataFrame'>
108+
109+
Description:
110+
Method to run an athena query over a
111+
:class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store
112+
to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role
113+
or the region and/or role used to work with Feature Stores (it uses the module
114+
`sagemaker.feature_store.feature_utils.get_session_from_role`
115+
to get the session).
116+
117+
Args:
118+
region (str): region of the target Feature Store
119+
feature_group_name (str): feature store name
120+
query (str): query to run. By default, it will take the latest ingest with data that
121+
wasn't deleted. If latest_ingestion is False it will take all the data
122+
in the feature group that wasn't deleted. It needs to use the keyword
123+
"#{table}" to refer to the FeatureGroup name. e.g.:
124+
'SELECT * FROM "sagemaker_featurestore"."#{table}"'
125+
It must not end by ';'.
126+
athena_bucket (str): Amazon S3 bucket for running the query
127+
role (str): role to be assumed to extract data from feature store. If not specified
128+
the default sagemaker execution role will be used.
129+
session (str): :class:`sagemaker.session.Session`
130+
of SageMaker used to work with the feature store. Optional, with
131+
role and region parameters it will infer the session.
132+
event_time_feature_name (str): eventTimeId feature. Mandatory only if the
133+
latest ingestion is True.
134+
latest_ingestion (bool): if True it will get the data only from the latest ingestion.
135+
If False it will take whatever is specified in the query, or
136+
if not specify it, it will get all the data that wasn't deleted.
137+
verbose (bool): if True show messages, if False is silent.
138+
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
139+
have a better tuning on data. For more info read:
140+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
141+
Returns:
142+
:class:`pandas.DataFrame`: dataset with the data retrieved from feature group
143+
"""
144+
145+
logger.setLevel(logging.WARNING)
146+
if verbose:
147+
logger.setLevel(logging.INFO)
148+
149+
if latest_ingestion:
150+
if event_time_feature_name is not None:
151+
query += str(
152+
f"AND {event_time_feature_name}=(SELECT "
153+
f"MAX({event_time_feature_name}) FROM "
154+
'"sagemaker_featurestore"."#{table}")'
155+
)
156+
else:
157+
exc = Exception(
158+
"Argument event_time_feature_name must be specified "
159+
"when using latest_ingestion=True."
160+
)
161+
logger.exception(exc)
162+
raise exc
163+
164+
query += ";"
165+
166+
if session is not None:
167+
sagemaker_session = session
168+
elif region is not None:
169+
sagemaker_session = get_session_from_role(region=region, assume_role=role)
170+
else:
171+
exc = Exception("Argument Session or role and region must be specified.")
172+
logger.exception(exc)
173+
raise exc
174+
175+
msg = f"Feature Group used: {feature_group_name}"
176+
logger.info(msg)
177+
178+
fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
179+
180+
sample_query = fg.athena_query()
181+
query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query)
182+
183+
msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n"
184+
logger.info(msg)
185+
186+
sample_query.run(query_string=query_string, output_location=athena_bucket)
187+
188+
sample_query.wait()
189+
190+
# run Athena query. The output is loaded to a Pandas dataframe.
191+
dataset = sample_query.as_dataframe(**kwargs)
192+
193+
msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}"
194+
logger.info(msg)
195+
196+
return dataset
197+
198+
199+
def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame:
200+
"""Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup`
201+
202+
Description:
203+
Module to format correctly the name of the columns of a DataFrame
204+
to later generate the features names of a Feature Group
205+
206+
Args:
207+
data (:class:`pandas.DataFrame`): dataframe used
208+
209+
Returns:
210+
:class:`pandas.DataFrame`
211+
"""
212+
data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True)
213+
return data
214+
215+
216+
def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame:
217+
"""Cast properly pandas object types to strings
218+
219+
Description:
220+
Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to
221+
a valid string type recognized by Feature Groups.
222+
223+
Args:
224+
data_frame: dataframe used
225+
Returns:
226+
pandas.DataFrame
227+
"""
228+
for label in data_frame.select_dtypes(["object", "O"]).columns.tolist():
229+
data_frame[label] = data_frame[label].astype("str").astype("string")
230+
return data_frame
231+
232+
233+
def prepare_fg_from_dataframe_or_file(
234+
dataframe_or_path: Union[str, Path, pandas.DataFrame],
235+
feature_group_name: str,
236+
role: str = None,
237+
region: str = None,
238+
session=None,
239+
record_id: str = "record_id",
240+
event_id: str = "data_as_of_date",
241+
verbose: bool = False,
242+
**kwargs,
243+
) -> FeatureGroup:
244+
"""Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`
245+
246+
Description:
247+
Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it
248+
directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`.
249+
The path to the file needs proper dtypes, feature names and mandatory features (record_id,
250+
event_id).
251+
It needs the :class:`sagemaker.session.Session` linked to a role
252+
or the region and/or role used to work with Feature Stores (it uses the module
253+
`sagemaker.feature_store.feature_utils.get_session_from_role`
254+
to get the session).
255+
If record_id or event_id are not specified it will create ones
256+
by default with the names 'record_id' and 'data_as_of_date'.
257+
258+
Args:
259+
feature_group_name (str): feature group name
260+
dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data
261+
verbose (bool) : True for displaying messages, False for silent method.
262+
record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each
263+
value of that feature has to be unique. If not specified or
264+
record_id='record_id', then it will create a new feature from
265+
the index of the pandas.DataFrame.
266+
event_id (str) : (Optional) Feature with the time of the creation of data rows.
267+
If not specified it will create one with the current time
268+
called `data_as_of_date`
269+
role (str) : role used to get the session.
270+
region (str) : region used to get the session.
271+
session (str): session of SageMaker used to work with the feature store
272+
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
273+
have a better tuning on data. For more info read:
274+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
275+
276+
Returns:
277+
:class:`sagemaker.feature_store.feature_group.FeatureGroup`:
278+
FG prepared with all the methods and definitions properly defined
279+
"""
280+
281+
logger.setLevel(logging.WARNING)
282+
if verbose:
283+
logger.setLevel(logging.INFO)
284+
285+
if isinstance(dataframe_or_path, DataFrame):
286+
data = dataframe_or_path
287+
elif isinstance(dataframe_or_path, str):
288+
kwargs.pop("filepath_or_buffer", None)
289+
data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs)
290+
else:
291+
exc = Exception(
292+
str(
293+
f"Invalid type {type(dataframe_or_path)} for "
294+
"argument dataframe_or_path. \nParameter must be"
295+
" of type pandas.DataFrame or string"
296+
)
297+
)
298+
logger.exception(exc)
299+
raise exc
300+
301+
# Formatting cols
302+
data = _format_column_names(data=data)
303+
data = _cast_object_to_string(data_frame=data)
304+
305+
if record_id == "record_id" and record_id not in data.columns:
306+
data[record_id] = data.index
307+
308+
lg_uniq = len(data[record_id].unique())
309+
lg_id = len(data[record_id])
310+
311+
if lg_id != lg_uniq:
312+
exc = Exception(
313+
str(
314+
f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} "
315+
"duplicated rows. \nRecord identifier must be unique"
316+
" in each row."
317+
)
318+
)
319+
logger.exception(exc)
320+
raise exc
321+
322+
if event_id not in data.columns:
323+
import time
324+
325+
current_time_sec = int(round(time.time()))
326+
data[event_id] = Series([current_time_sec] * lg_id, dtype="float64")
327+
328+
if session is not None:
329+
sagemaker_session = session
330+
elif role is not None and region is not None:
331+
sagemaker_session = get_session_from_role(region=region)
332+
else:
333+
exc = Exception("Argument Session or role and region must be specified.")
334+
logger.exception(exc)
335+
raise exc
336+
337+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
338+
339+
feature_group.load_feature_definitions(data_frame=data)
340+
341+
return feature_group

0 commit comments

Comments
 (0)