|
| 1 | +############################## |
| 2 | +Amazon SageMaker Feature Store |
| 3 | +############################## |
| 4 | + |
| 5 | +.. rubric:: **Create Feature Groups** |
| 6 | + :name: bCe9CAXalwH |
| 7 | + |
| 8 | +This guide will show you how to create and use |
| 9 | +`Amazon SageMaker Feature Store <https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-getting-started.html>`__. |
| 10 | +The example code in this guide covers using the SageMaker Python SDK. The |
| 11 | +underlying APIs are available for developers using other languages. |
| 12 | + |
| 13 | +.. rubric:: Features |
| 14 | + :name: bCe9CAtWHPP |
| 15 | + |
| 16 | +Prior to using a feature store you will typically load your dataset, run |
| 17 | +transformations, and set up your features for ingestion. This step has a |
| 18 | +lot of variation and is highly dependent on your data. The example code |
| 19 | +in the following code blocks will often make reference to an example |
| 20 | +notebook, \ `Fraud Detection with Amazon SageMaker Feature Store |
| 21 | +<https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.html>`__. |
| 22 | +It is recommended that you run this notebook |
| 23 | +in SageMaker Studio and use the code from there, as the code in this |
| 24 | +guide is conceptual and not fully functional if copied. |
| 25 | + |
| 26 | +.. rubric:: Feature store data types and schema |
| 27 | + :name: bCe9CAr4kIT |
| 28 | + |
| 29 | +Feature Store supported types are ``String``, ``Fractional``, and |
| 30 | +``Integral``. The default type is set to ``String``. This means that, if |
| 31 | +a column in your dataset is not a ``float`` or ``long`` type, it will |
| 32 | +default to ``String`` in your feature store. |
| 33 | + |
| 34 | + |
| 35 | +You may use a schema to describe your data’s columns and data types. You |
| 36 | +pass this schema into FeatureDefinitions, a required parameter for a |
| 37 | +FeatureGroup. However, for Python developers, the SageMaker Python SDK |
| 38 | +has automatic data type detection when you use the |
| 39 | +``load_feature_definitions`` function. |
| 40 | +
|
| 41 | +.. rubric:: Feature store setup |
| 42 | + :name: bCe9CAgy6IH |
| 43 | +
|
| 44 | +To start using Feature Store, first create a SageMaker session, boto3 |
| 45 | +session, and a Feature Store session. Also, setup the bucket you will |
| 46 | +use for your features; this is your Offline Store. The following will |
| 47 | +use the SageMaker default bucket and add a custom prefix to it. |
| 48 | +
|
| 49 | +.. note:: |
| 50 | +
|
| 51 | + The role that you use requires these managed |
| 52 | + policies:\ ``AmazonSageMakerFullAccess``\ and\ ``AmazonSageMakerFeatureStoreAccess``\ . |
| 53 | + |
| 54 | + |
| 55 | +.. code:: python |
| 56 | +
|
| 57 | + import boto3 |
| 58 | + import sagemaker |
| 59 | + from sagemaker.session import Session |
| 60 | +
|
| 61 | + boto_session = boto3.Session(region_name=region) |
| 62 | + role = sagemaker.get_execution_role() |
| 63 | + sagemaker_session = sagemaker.Session() |
| 64 | + region = sagemaker_session.boto_region_name |
| 65 | + default_bucket = sagemaker_session.default_bucket() |
| 66 | + prefix = 'sagemaker-featurestore' |
| 67 | + offline_feature_store_bucket = 's3://*{}*/*{}*'.format(default_bucket, prefix) |
| 68 | +
|
| 69 | + sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region) |
| 70 | + featurestore_runtime = boto_session.client(service_name='featurestore-runtime', region_name=region) |
| 71 | +
|
| 72 | + feature_store_session = Session( |
| 73 | + boto_session=boto_session, |
| 74 | + sagemaker_client=sagemaker_client, |
| 75 | + sagemaker_featurestore_runtime_client=featurestore_runtime |
| 76 | + ) |
| 77 | +
|
| 78 | +.. rubric:: Load datasets and partition data into feature groups |
| 79 | + :name: bCe9CA31y9f |
| 80 | + |
| 81 | +You will load your data into data frames for each of your features. You |
| 82 | +will use these data frames after you setup the feature group. In the |
| 83 | +fraud detection example, you can see these steps in the following code. |
| 84 | + |
| 85 | +.. code:: python |
| 86 | +
|
| 87 | + import numpy as np |
| 88 | + import pandas as pd |
| 89 | + import matplotlib.pyplot as plt |
| 90 | + import io |
| 91 | +
|
| 92 | + fraud_detection_bucket_name = 'sagemaker-featurestore-fraud-detection' |
| 93 | + identity_file_key = 'sampled_identity.csv' |
| 94 | + transaction_file_key = 'sampled_transactions.csv' |
| 95 | +
|
| 96 | + identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key) |
| 97 | + transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key) |
| 98 | +
|
| 99 | + identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read())) |
| 100 | + transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read())) |
| 101 | +
|
| 102 | + identity_data = identity_data.round(5) |
| 103 | + transaction_data = transaction_data.round(5) |
| 104 | +
|
| 105 | + identity_data = identity_data.fillna(0) |
| 106 | + transaction_data = transaction_data.fillna(0) |
| 107 | +
|
| 108 | + # Feature transformations for this dataset are applied before ingestion into FeatureStore. |
| 109 | + # One hot encode card4, card6 |
| 110 | + encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank') |
| 111 | + encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type') |
| 112 | +
|
| 113 | + transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1) |
| 114 | + transformed_transaction_data = transformed_transaction_data.rename(columns={"card_bank_american express": "card_bank_american_express"}) |
| 115 | +
|
| 116 | +.. rubric:: Feature group setup |
| 117 | + :name: bCe9CARx8h9 |
| 118 | + |
| 119 | +Name your feature groups and customize the feature names with a unique |
| 120 | +name, and setup each feature group with the ``FeatureGroup`` class. |
| 121 | +
|
| 122 | +.. code:: python |
| 123 | +
|
| 124 | + from sagemaker.feature_store.feature_group import FeatureGroup |
| 125 | + feature_group_name = "some string for a name" |
| 126 | + feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session) |
| 127 | +
|
| 128 | +For example, in the fraud detection example, the two feature groups are |
| 129 | +“identity” and “transaction”. In the following code you can see how the |
| 130 | +names are customized with a timestamp, then each group is setup by |
| 131 | +passing in the name and the session. |
| 132 | +
|
| 133 | +.. code:: python |
| 134 | +
|
| 135 | + import time |
| 136 | + from time import gmtime, strftime, sleep |
| 137 | + from sagemaker.feature_store.feature_group import FeatureGroup |
| 138 | +
|
| 139 | + identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime()) |
| 140 | + transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime()) |
| 141 | +
|
| 142 | + identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session) |
| 143 | + transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session) |
| 144 | +
|
| 145 | +.. rubric:: Record identifier and event time feature |
| 146 | + :name: bCe9CA17VV7 |
| 147 | +
|
| 148 | +Next, you will need a record identifier name and an event time feature |
| 149 | +name. This will match the column of the corresponding features in your |
| 150 | +data. For example, in the fraud detection example, the column of |
| 151 | +interest is “TransactionID”. “EventTime” can be appended to your data |
| 152 | +when no timestamp is available. In the following code, you can see how |
| 153 | +these variables are set, and then ``EventTime`` is appended to both |
| 154 | +feature’s data. |
| 155 | + |
| 156 | +.. code:: python |
| 157 | +
|
| 158 | + record_identifier_name = "TransactionID" |
| 159 | + event_time_feature_name = "EventTime" |
| 160 | + current_time_sec = int(round(time.time())) |
| 161 | + identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype="float64") |
| 162 | + transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64") |
| 163 | +
|
| 164 | +.. rubric:: Feature definitions |
| 165 | + :name: bCe9CA4yUcO |
| 166 | + |
| 167 | +You can now load the feature definitions by passing a data frame |
| 168 | +containing the feature data. In the following code for the fraud |
| 169 | +detection example, the identity feature and transaction feature are each |
| 170 | +loaded by using ``load_feature_definitions``, and this function |
| 171 | +automatically detects the data type of each column of data. For |
| 172 | +developers using a schema rather than automatic detection, refer to the |
| 173 | +`Creating Feature Groups with Data Wrangler example <https://docs.aws.amazon.com/sagemaker/latest/dg/data-wrangler-data-export.html#data-wrangler-data-export-feature-store>`__ for |
| 174 | +code that shows loading the schema, mapping it and adding as a |
| 175 | +``FeatureDefinition`` that is used when you create the ``FeatureGroup``. |
| 176 | +This example also covers a boto3 implementation, instead of using the |
| 177 | +SageMaker Python SDK. |
| 178 | + |
| 179 | +.. code:: python |
| 180 | +
|
| 181 | + identity_feature_group.load_feature_definitions(data_frame=identity_data); # output is suppressed |
| 182 | + transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data); # output is suppressed |
| 183 | +
|
| 184 | +.. rubric:: Create a feature group |
| 185 | + :name: bCe9CAwMEgY |
| 186 | + |
| 187 | +The last step for creating the feature group is to use the |
| 188 | +``create`` function. The following code shows all of the available |
| 189 | +parameters. The online store is not created by default, so you must set |
| 190 | +this as \ ``True`` if you want to enable it. The ``s3_uri`` is the |
| 191 | +location of your offline store. |
| 192 | + |
| 193 | +.. code:: python |
| 194 | +
|
| 195 | + # create a FeatureGroup |
| 196 | + feature_group.create( |
| 197 | + description = "Some info about the feature group", |
| 198 | + feature_group_name = feature_group_name, |
| 199 | + record_identifier_name = record_identifier_name, |
| 200 | + event_time_feature_name = event_time_feature_name, |
| 201 | + feature_definitions = feature_definitions, |
| 202 | + role_arn = role, |
| 203 | + s3_uri = offline_feature_store_bucket, |
| 204 | + enable_online_store = True, |
| 205 | + online_store_kms_key_id = None, |
| 206 | + offline_store_kms_key_id = None, |
| 207 | + disable_glue_table_creation = False, |
| 208 | + data_catalog_config = None, |
| 209 | + tags = ["tag1","tag2"]) |
| 210 | +
|
| 211 | +The following code from the fraud detection example shows a minimal |
| 212 | +``create`` call for each of the two features groups being created. |
| 213 | +
|
| 214 | +.. code:: python |
| 215 | +
|
| 216 | + identity_feature_group.create( |
| 217 | + s3_uri=offline_feature_store_bucket, |
| 218 | + record_identifier_name=record_identifier_name, |
| 219 | + event_time_feature_name=event_time_feature_name, |
| 220 | + role_arn=role, |
| 221 | + enable_online_store=True |
| 222 | + ) |
| 223 | +
|
| 224 | + transaction_feature_group.create( |
| 225 | + s3_uri=offline_feature_store_bucket, |
| 226 | + record_identifier_name=record_identifier_name, |
| 227 | + event_time_feature_name=event_time_feature_name, |
| 228 | + role_arn=role, |
| 229 | + enable_online_store=True |
| 230 | + ) |
| 231 | +
|
| 232 | +Creating a feature group takes time as the data is loaded. You will need |
| 233 | +to wait until it is created before you can use it. You can check status |
| 234 | +using the following method. |
| 235 | +
|
| 236 | +.. code:: python |
| 237 | +
|
| 238 | + status = feature_group.describe().get("FeatureGroupStatus") |
| 239 | +
|
| 240 | +While it is creating you will get a ``Creating`` as a response. When |
| 241 | +this has finished successfully the response will be ``Created``. The |
| 242 | +other possible statuses are ``CreateFailed``, ``Deleting``, or |
| 243 | +``DeleteFailed``. |
| 244 | + |
| 245 | +.. rubric:: Describe a feature group |
| 246 | + :name: bCe9CA2TNON |
| 247 | + |
| 248 | +You can retrieve information about your feature group with the |
| 249 | +``describe`` function. |
| 250 | +
|
| 251 | +.. code:: python |
| 252 | +
|
| 253 | + feature_group.describe() |
| 254 | +
|
| 255 | +.. rubric:: List feature groups |
| 256 | + :name: bCe9CA2wPF2 |
| 257 | +
|
| 258 | +You can list all of your feature groups with the |
| 259 | +``list_feature_groups`` function. |
| 260 | +
|
| 261 | +.. code:: python |
| 262 | +
|
| 263 | + sagemaker_client.list_feature_groups() |
| 264 | +
|
| 265 | +.. rubric:: Put records in a feature group |
| 266 | + :name: bCe9CAymRdA |
| 267 | +
|
| 268 | +You can use the ``ingest`` function to load your feature data. You pass |
| 269 | +in a data frame of feature data, set the number of workers, and choose |
| 270 | +to wait for it to return or not. The following example demonstrates |
| 271 | +using the ``ingest`` function. |
| 272 | + |
| 273 | +.. code:: python |
| 274 | +
|
| 275 | + feature_group.ingest( |
| 276 | + data_frame=feature_data, max_workers=3, wait=True |
| 277 | + ) |
| 278 | +
|
| 279 | +For each feature group you have, run the ``ingest`` function on the |
| 280 | +feature data you want to load. |
| 281 | + |
| 282 | +.. rubric:: Get records from a feature group |
| 283 | + :name: bCe9CA25xj5 |
| 284 | + |
| 285 | +You can use the ``get_record`` function to retrieve the data for a |
| 286 | +specific feature by its record identifier. The following example uses an |
| 287 | +example identifier to retrieve the record. |
| 288 | +
|
| 289 | +.. code:: python |
| 290 | +
|
| 291 | + record_identifier_value = str(2990130) |
| 292 | + featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value) |
| 293 | +
|
| 294 | +An example response from the fraud detection example: |
| 295 | +
|
| 296 | +.. code:: python |
| 297 | +
|
| 298 | + ... |
| 299 | + 'Record': [{'FeatureName': 'TransactionID', 'ValueAsString': '2990130'}, |
| 300 | + {'FeatureName': 'isFraud', 'ValueAsString': '0'}, |
| 301 | + {'FeatureName': 'TransactionDT', 'ValueAsString': '152647'}, |
| 302 | + {'FeatureName': 'TransactionAmt', 'ValueAsString': '75.0'}, |
| 303 | + {'FeatureName': 'ProductCD', 'ValueAsString': 'H'}, |
| 304 | + {'FeatureName': 'card1', 'ValueAsString': '4577'}, |
| 305 | + ... |
| 306 | +
|
| 307 | +.. rubric:: Hive DDL commands |
| 308 | + :name: bCe9CA30nHn |
| 309 | +
|
| 310 | +The SageMaker Python SDK’s FeatureStore class also provides the |
| 311 | +functionality to generate Hive DDL commands. The schema of the table is |
| 312 | +generated based on the feature definitions. Columns are named after |
| 313 | +feature name and data-type are inferred based on feature type. |
| 314 | +
|
| 315 | +.. code:: python |
| 316 | +
|
| 317 | + print(feature_group.as_hive_ddl()) |
| 318 | +
|
| 319 | +An example output: |
| 320 | +
|
| 321 | +.. code:: python |
| 322 | +
|
| 323 | + CREATE EXTERNAL TABLE IF NOT EXISTS sagemaker_featurestore.identity-feature-group-27-19-33-00 ( |
| 324 | + TransactionID INT |
| 325 | + id_01 FLOAT |
| 326 | + id_02 FLOAT |
| 327 | + id_03 FLOAT |
| 328 | + id_04 FLOAT |
| 329 | + ... |
| 330 | +
|
| 331 | +.. rubric:: Build a Training Dataset |
| 332 | + :name: bCe9CAVnDLV |
| 333 | +
|
| 334 | +Feature Store automatically builds a Amazon Glue Data Catalog when |
| 335 | +Feature Groups are created and can optionally be turned off. The |
| 336 | +following we show how to create a single training dataset with feature |
| 337 | +values from both identity and transaction feature groups created above. |
| 338 | +Also, the following shows how to run an Amazon Athena query to join data |
| 339 | +stored in the Offline Store from both identity and transaction feature |
| 340 | +groups. |
| 341 | +
|
| 342 | +
|
| 343 | +To start, create an Athena query using\ ``athena_query()``\ for both |
| 344 | +identity and transaction feature groups. The ``table_name`` is the Glue |
| 345 | +table that is auto-generated by Feature Store. |
| 346 | + |
| 347 | +.. code:: python |
| 348 | +
|
| 349 | + identity_query = identity_feature_group.athena_query() |
| 350 | + transaction_query = transaction_feature_group.athena_query() |
| 351 | +
|
| 352 | + identity_table = identity_query.table_name |
| 353 | + transaction_table = transaction_query.table_name |
| 354 | +
|
| 355 | +.. rubric:: Writing and Executing your Athena Query |
| 356 | + :name: bCe9CArSR5J |
| 357 | + |
| 358 | +You will write your query using SQL on these feature groups, and then |
| 359 | +execute the query with the ``.run()`` command and specify your S3 bucket |
| 360 | +location for the data set to be saved there. |
| 361 | + |
| 362 | +.. code:: python |
| 363 | +
|
| 364 | + # Athena query |
| 365 | + query_string = 'SELECT * FROM "'+transaction_table+'" LEFT JOIN "'+identity_table+'" ON "'+transaction_table+'".transactionid = "'+identity_table+'".transactionid' |
| 366 | +
|
| 367 | + # run Athena query. The output is loaded to a Pandas dataframe. |
| 368 | + dataset = pd.DataFrame() |
| 369 | + identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/query_results/') |
| 370 | + identity_query.wait() |
| 371 | + dataset = identity_query.as_dataframe() |
| 372 | +
|
| 373 | +From here you can train a model using this data set and then perform |
| 374 | +inference. |
| 375 | + |
| 376 | +.. rubric:: Delete a feature group |
| 377 | + :name: bCe9CA61b78 |
| 378 | + |
| 379 | +You can delete a feature group with the ``delete`` function. |
| 380 | +
|
| 381 | +.. code:: python |
| 382 | +
|
| 383 | + feature_group.delete() |
| 384 | +
|
| 385 | +The following code example is from the fraud detection example. |
| 386 | +
|
| 387 | +.. code:: python |
| 388 | +
|
| 389 | + identity_feature_group.delete() |
| 390 | + transaction_feature_group.delete() |
0 commit comments