diff --git a/tests/integ/test_feature_store.py b/tests/integ/test_feature_store.py index 644735c65e..84aed11d48 100644 --- a/tests/integ/test_feature_store.py +++ b/tests/integ/test_feature_store.py @@ -39,6 +39,7 @@ ) from sagemaker.session import get_execution_role, Session from tests.integ.timeout import timeout +from urllib.parse import urlparse BUCKET_POLICY = { "Version": "2012-10-17", @@ -635,8 +636,8 @@ def test_create_dataset_with_feature_group_base( ) with timeout(minutes=10) and cleanup_offline_store( - base_table_name, feature_store_session - ) and cleanup_offline_store(feature_group_table_name, feature_store_session): + base, feature_store_session + ) and cleanup_offline_store(feature_group, feature_store_session): feature_store = FeatureStore(sagemaker_session=feature_store_session) df, query_string = ( feature_store.create_dataset(base=base, output_path=offline_store_s3_uri) @@ -663,7 +664,7 @@ def test_create_dataset_with_feature_group_base( assert sorted_df.equals(expect_df) assert ( - query_string + query_string.strip() == "WITH fg_base AS (WITH table_base AS (\n" + "SELECT *\n" + "FROM (\n" @@ -817,8 +818,8 @@ def test_create_dataset_with_feature_group_base_with_additional_params( ) with timeout(minutes=10) and cleanup_offline_store( - base_table_name, feature_store_session - ) and cleanup_offline_store(feature_group_table_name, feature_store_session): + base, feature_store_session + ) and cleanup_offline_store(feature_group, feature_store_session): feature_store = FeatureStore(sagemaker_session=feature_store_session) df, query_string = ( feature_store.create_dataset(base=base, output_path=offline_store_s3_uri) @@ -850,7 +851,7 @@ def test_create_dataset_with_feature_group_base_with_additional_params( assert sorted_df.equals(expect_df) assert ( - query_string + query_string.strip() == "WITH fg_base AS (WITH table_base AS (\n" + "SELECT *\n" + "FROM (\n" @@ -1068,25 +1069,29 @@ def cleanup_feature_group(feature_group: FeatureGroup): @contextmanager -def cleanup_offline_store(table_name: str, feature_store_session: Session): +def cleanup_offline_store(feature_group: FeatureGroup, feature_store_session: Session): try: yield finally: + feature_group_metadata = feature_group.describe() + feature_group_name = feature_group_metadata["FeatureGroupName"] try: + s3_uri = feature_group_metadata["OfflineStoreConfig"]["S3StorageConfig"][ + "ResolvedOutputS3Uri" + ] + parsed_uri = urlparse(s3_uri) + bucket_name, prefix = parsed_uri.netloc, parsed_uri.path + prefix = prefix.strip("/") + prefix = prefix[:-5] if prefix.endswith("/data") else prefix region_name = feature_store_session.boto_session.region_name s3_client = feature_store_session.boto_session.client( service_name="s3", region_name=region_name ) - account_id = feature_store_session.account_id() - bucket_name = f"sagemaker-test-featurestore-{region_name}-{account_id}" - response = s3_client.list_objects_v2( - Bucket=bucket_name, - Prefix=f"{account_id}/sagemaker/{region_name}/offline-store/{table_name}/", - ) + response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) files_in_folder = response["Contents"] files_to_delete = [] for f in files_in_folder: files_to_delete.append({"Key": f["Key"]}) s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": files_to_delete}) - except Exception: - raise RuntimeError(f"Failed to delete data under {table_name}") + except Exception as e: + raise RuntimeError(f"Failed to delete data for feature_group {feature_group_name}", e)