39
39
)
40
40
from sagemaker .session import get_execution_role , Session
41
41
from tests .integ .timeout import timeout
42
+ from urllib .parse import urlparse
42
43
43
44
BUCKET_POLICY = {
44
45
"Version" : "2012-10-17" ,
@@ -635,8 +636,8 @@ def test_create_dataset_with_feature_group_base(
635
636
)
636
637
637
638
with timeout (minutes = 10 ) and cleanup_offline_store (
638
- base_table_name , feature_store_session
639
- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
639
+ base , feature_store_session
640
+ ) and cleanup_offline_store (feature_group , feature_store_session ):
640
641
feature_store = FeatureStore (sagemaker_session = feature_store_session )
641
642
df , query_string = (
642
643
feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -663,7 +664,7 @@ def test_create_dataset_with_feature_group_base(
663
664
664
665
assert sorted_df .equals (expect_df )
665
666
assert (
666
- query_string
667
+ query_string . strip ()
667
668
== "WITH fg_base AS (WITH table_base AS (\n "
668
669
+ "SELECT *\n "
669
670
+ "FROM (\n "
@@ -817,8 +818,8 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
817
818
)
818
819
819
820
with timeout (minutes = 10 ) and cleanup_offline_store (
820
- base_table_name , feature_store_session
821
- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
821
+ base , feature_store_session
822
+ ) and cleanup_offline_store (feature_group , feature_store_session ):
822
823
feature_store = FeatureStore (sagemaker_session = feature_store_session )
823
824
df , query_string = (
824
825
feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -850,7 +851,7 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
850
851
851
852
assert sorted_df .equals (expect_df )
852
853
assert (
853
- query_string
854
+ query_string . strip ()
854
855
== "WITH fg_base AS (WITH table_base AS (\n "
855
856
+ "SELECT *\n "
856
857
+ "FROM (\n "
@@ -1068,25 +1069,29 @@ def cleanup_feature_group(feature_group: FeatureGroup):
1068
1069
1069
1070
1070
1071
@contextmanager
1071
- def cleanup_offline_store (table_name : str , feature_store_session : Session ):
1072
+ def cleanup_offline_store (feature_group : FeatureGroup , feature_store_session : Session ):
1072
1073
try :
1073
1074
yield
1074
1075
finally :
1076
+ feature_group_metadata = feature_group .describe ()
1077
+ feature_group_name = feature_group_metadata ["FeatureGroupName" ]
1075
1078
try :
1079
+ s3_uri = feature_group_metadata ["OfflineStoreConfig" ]["S3StorageConfig" ][
1080
+ "ResolvedOutputS3Uri"
1081
+ ]
1082
+ parsed_uri = urlparse (s3_uri )
1083
+ bucket_name , prefix = parsed_uri .netloc , parsed_uri .path
1084
+ prefix = prefix .strip ("/" )
1085
+ prefix = prefix [:- 5 ] if prefix .endswith ("/data" ) else prefix
1076
1086
region_name = feature_store_session .boto_session .region_name
1077
1087
s3_client = feature_store_session .boto_session .client (
1078
1088
service_name = "s3" , region_name = region_name
1079
1089
)
1080
- account_id = feature_store_session .account_id ()
1081
- bucket_name = f"sagemaker-test-featurestore-{ region_name } -{ account_id } "
1082
- response = s3_client .list_objects_v2 (
1083
- Bucket = bucket_name ,
1084
- Prefix = f"{ account_id } /sagemaker/{ region_name } /offline-store/{ table_name } /" ,
1085
- )
1090
+ response = s3_client .list_objects_v2 (Bucket = bucket_name , Prefix = prefix )
1086
1091
files_in_folder = response ["Contents" ]
1087
1092
files_to_delete = []
1088
1093
for f in files_in_folder :
1089
1094
files_to_delete .append ({"Key" : f ["Key" ]})
1090
1095
s3_client .delete_objects (Bucket = bucket_name , Delete = {"Objects" : files_to_delete })
1091
- except Exception :
1092
- raise RuntimeError (f"Failed to delete data under { table_name } " )
1096
+ except Exception as e :
1097
+ raise RuntimeError (f"Failed to delete data for feature_group { feature_group_name } " , e )
0 commit comments