10
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
11
# ANY KIND, either express or implied. See the License for the specific
12
12
# language governing permissions and limitations under the License.
13
- """The FeatureGroup entity for FeatureStore."""
13
+ """The FeatureGroup entity for FeatureStore.
14
+
15
+ A feature group is a logical grouping of features, defined in the Feature Store,
16
+ to describe records. A feature group definition is composed of a list of feature definitions,
17
+ a record identifier name, and configurations for its online and offline store.
18
+ Create feature group, describe feature group, update feature groups, delete feature group and
19
+ list feature groups APIs can be used to manage feature groups.
20
+ """
21
+
14
22
from __future__ import absolute_import
15
23
16
24
import logging
45
53
46
54
@attr .s
47
55
class AthenaQuery :
48
- """Class to manager querying of feature store data with AWS Athena
56
+ """Class to manage querying of feature store data with AWS Athena.
57
+
58
+ This class instantiates a AthenaQuery object that is used to retrieve data from feature store
59
+ via standard SQL queries.
49
60
50
61
Attributes:
51
62
catalog (str): name of the data catalog.
@@ -63,12 +74,15 @@ class AthenaQuery:
63
74
_result_file_prefix : str = attr .ib (init = False , default = None )
64
75
65
76
def run (self , query_string : str , output_location : str , kms_key : str = None ) -> str :
66
- """Run athena query with the given query_string
77
+ """Execute a SQL query given a query string, output location and kms key.
78
+
79
+ This method executes the SQL query using Athena and outputs the results to output_location
80
+ and returns the execution id of the query.
67
81
68
82
Args:
69
83
query_string: SQL query string.
70
- output_location: s3 uri of the query result.
71
- kms_key: KMS key id, if set will be used to encrypt the query result file.
84
+ output_location: S3 URI of the query result.
85
+ kms_key: KMS key id. If set, will be used to encrypt the query result file.
72
86
73
87
Returns:
74
88
Execution id of the query.
@@ -103,7 +117,7 @@ def get_query_execution(self) -> Dict[str, Any]:
103
117
)
104
118
105
119
def as_dataframe (self ) -> DataFrame :
106
- """Download the result of the current query and load it into a DataFrame
120
+ """Download the result of the current query and load it into a DataFrame.
107
121
108
122
Returns:
109
123
A pandas DataFrame contains the query result.
@@ -132,10 +146,12 @@ def as_dataframe(self) -> DataFrame:
132
146
class IngestionManagerPandas :
133
147
"""Class to manage the multi-threaded data ingestion process.
134
148
149
+ This class will manage the data ingestion process which is multi-threaded.
150
+
135
151
Attributes:
136
152
feature_group_name (str): name of the Feature Group.
137
153
sagemaker_session (Session): instance of the Session class to perform boto calls.
138
- data_frame (DataFrame): pandas data_frame to be ingested to the given feature group.
154
+ data_frame (DataFrame): pandas DataFrame to be ingested to the given feature group.
139
155
max_works (int): number of threads to create.
140
156
"""
141
157
@@ -201,9 +217,8 @@ def run(self, wait=True, timeout=None):
201
217
Args:
202
218
wait (bool): whether to wait for the ingestion to finish or not.
203
219
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
204
- if timeout is reached.
220
+ if timeout is reached.
205
221
"""
206
-
207
222
executor = ThreadPoolExecutor (max_workers = self .max_workers )
208
223
batch_size = math .ceil (self .data_frame .shape [0 ] / self .max_workers )
209
224
@@ -230,7 +245,10 @@ def run(self, wait=True, timeout=None):
230
245
231
246
@attr .s
232
247
class FeatureGroup :
233
- """FeatureGroup for FeatureStore
248
+ """FeatureGroup definition.
249
+
250
+ This class instantiates a FeatureGroup object that comprises of a name for the FeatureGroup,
251
+ session instance, and a list of feature definition objects i.e., FeatureDefinition.
234
252
235
253
Attributes:
236
254
name (str): name of the FeatureGroup instance.
@@ -282,7 +300,7 @@ def create(
282
300
description : str = None ,
283
301
tags : List [Dict [str , str ]] = None ,
284
302
) -> Dict [str , Any ]:
285
- """Creates a SageMaker FeatureStore FeatureGroup
303
+ """Create a SageMaker FeatureStore FeatureGroup.
286
304
287
305
Args:
288
306
s3_uri (str): S3 URI of the offline store.
@@ -300,7 +318,6 @@ def create(
300
318
Returns:
301
319
Response dict from service.
302
320
"""
303
-
304
321
create_feature_store_args = dict (
305
322
feature_group_name = self .name ,
306
323
record_identifier_name = record_identifier_name ,
@@ -336,7 +353,7 @@ def create(
336
353
return self .sagemaker_session .create_feature_group (** create_feature_store_args )
337
354
338
355
def delete (self ):
339
- """Deletes a FeatureGroup"""
356
+ """Delete a FeatureGroup. """
340
357
self .sagemaker_session .delete_feature_group (feature_group_name = self .name )
341
358
342
359
def describe (self , next_token : str = None ) -> Dict [str , Any ]:
@@ -354,7 +371,7 @@ def load_feature_definitions(
354
371
self ,
355
372
data_frame : DataFrame ,
356
373
) -> Sequence [FeatureDefinition ]:
357
- """Loads feature definitions from a Pandas DataFrame
374
+ """Load feature definitions from a Pandas DataFrame.
358
375
359
376
Column name is used as feature name. Feature type is inferred from the dtype
360
377
of the column. Dtype int_, int8, int16, int32, int64, uint8, uint16, uint32
@@ -389,7 +406,7 @@ def load_feature_definitions(
389
406
return self .feature_definitions
390
407
391
408
def put_record (self , record : Sequence [FeatureValue ]):
392
- """Puts a single record in the FeatureGroup
409
+ """Put a single record in the FeatureGroup.
393
410
394
411
Args:
395
412
record (Sequence[FeatureValue]): a list contains feature values.
@@ -430,7 +447,7 @@ def ingest(
430
447
return manager
431
448
432
449
def athena_query (self ) -> AthenaQuery :
433
- """Creates an AthenaQuery instance
450
+ """Create an AthenaQuery instance.
434
451
435
452
Returns:
436
453
An instance of AthenaQuery initialized with data catalog configurations.
@@ -449,10 +466,11 @@ def athena_query(self) -> AthenaQuery:
449
466
raise RuntimeError ("No metastore is configured with this feature group." )
450
467
451
468
def as_hive_ddl (self , database : str = "sagemaker_featurestore" , table_name : str = None ) -> str :
452
- """Generate DDL can be used to create Hive table
469
+ """Generate Hive DDL commands that can be used to define or change structure of tables or
470
+ databases in Hive.
453
471
454
472
Schema of the table is generated based on the feature definitions. Columns are named
455
- after feature name and data-type are infered based on feature type. Integral feature
473
+ after feature name and data-type are inferred based on feature type. Integral feature
456
474
type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type.
457
475
String feature type is mapped to STRING data-type.
458
476
@@ -464,7 +482,6 @@ def as_hive_ddl(self, database: str = "sagemaker_featurestore", table_name: str
464
482
Returns:
465
483
Generated create table DDL string.
466
484
"""
467
-
468
485
if not table_name :
469
486
table_name = self .name
470
487
0 commit comments