@@ -82,6 +82,11 @@ def feature_group_name():
82
82
return f"my-feature-group-{ int (time .time () * 10 ** 7 )} "
83
83
84
84
85
+ @pytest .fixture
86
+ def base_name ():
87
+ return f"my-base-{ int (time .time () * 10 ** 7 )} "
88
+
89
+
85
90
@pytest .fixture
86
91
def offline_store_s3_uri (feature_store_session , region_name ):
87
92
bucket = f"sagemaker-test-featurestore-{ region_name } -{ feature_store_session .account_id ()} "
@@ -109,6 +114,32 @@ def pandas_data_frame():
109
114
return df
110
115
111
116
117
+ @pytest .fixture
118
+ def base_dataframe ():
119
+ base_data = [
120
+ [1 , 187512346.0 , 123 , 128 ],
121
+ [2 , 187512347.0 , 168 , 258 ],
122
+ [3 , 187512348.0 , 125 , 184 ],
123
+ [1 , 187512349.0 , 195 , 206 ],
124
+ ]
125
+ return pd .DataFrame (
126
+ base_data , columns = ["base_id" , "base_time" , "base_feature_1" , "base_feature_2" ]
127
+ )
128
+
129
+
130
+ @pytest .fixture
131
+ def feature_group_dataframe ():
132
+ feature_group_data = [
133
+ [1 , 187512246.0 , 456 , 325 ],
134
+ [2 , 187512247.0 , 729 , 693 ],
135
+ [3 , 187512348.0 , 129 , 901 ],
136
+ [1 , 187512449.0 , 289 , 286 ],
137
+ ]
138
+ return pd .DataFrame (
139
+ feature_group_data , columns = ["fg_id" , "fg_time" , "fg_feature_1" , "fg_feature_2" ]
140
+ )
141
+
142
+
112
143
@pytest .fixture
113
144
def pandas_data_frame_without_string ():
114
145
df = pd .DataFrame (
@@ -527,6 +558,135 @@ def test_ingest_multi_process(
527
558
assert output ["FeatureGroupArn" ].endswith (f"feature-group/{ feature_group_name } " )
528
559
529
560
561
+ def test_create_dataset_with_feature_group_base (
562
+ feature_store_session ,
563
+ region_name ,
564
+ role ,
565
+ base_name ,
566
+ feature_group_name ,
567
+ offline_store_s3_uri ,
568
+ base_dataframe ,
569
+ feature_group_dataframe ,
570
+ ):
571
+ base = FeatureGroup (name = base_name , sagemaker_session = feature_store_session )
572
+ feature_group = FeatureGroup (name = feature_group_name , sagemaker_session = feature_store_session )
573
+ with cleanup_feature_group (base ) and cleanup_feature_group (feature_group ):
574
+ _create_feature_group_and_ingest_data (
575
+ base , base_dataframe , offline_store_s3_uri , "base_id" , "base_time" , role
576
+ )
577
+ _create_feature_group_and_ingest_data (
578
+ feature_group , feature_group_dataframe , offline_store_s3_uri , "fg_id" , "fg_time" , role
579
+ )
580
+ base_table_name = _get_athena_table_name_after_data_replication (
581
+ feature_store_session , base , offline_store_s3_uri
582
+ )
583
+ feature_group_table_name = _get_athena_table_name_after_data_replication (
584
+ feature_store_session , feature_group , offline_store_s3_uri
585
+ )
586
+
587
+ with timeout (minutes = 10 ) and cleanup_offline_store (
588
+ base_table_name , feature_store_session
589
+ ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
590
+ feature_store = FeatureStore (sagemaker_session = feature_store_session )
591
+ df , query_string = (
592
+ feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
593
+ .with_feature_group (feature_group )
594
+ .to_dataframe ()
595
+ )
596
+ sorted_df = df .sort_values (by = list (df .columns )).reset_index (drop = True )
597
+ merged_df = base_dataframe .merge (
598
+ feature_group_dataframe , left_on = "base_id" , right_on = "fg_id"
599
+ )
600
+ expect_df = merged_df .sort_values (by = list (merged_df .columns )).reset_index (drop = True )
601
+ assert sorted_df .equals (expect_df )
602
+ assert (
603
+ query_string
604
+ == 'WITH fg_base AS (SELECT table_base."base_id", table_base."base_time", '
605
+ + 'table_base."base_feature_1", table_base."base_feature_2"\n '
606
+ + "FROM (\n "
607
+ + "SELECT *, row_number() OVER (\n "
608
+ + 'PARTITION BY dedup_base."base_id", dedup_base."base_feature_1", '
609
+ + 'dedup_base."base_feature_2"\n '
610
+ + 'ORDER BY dedup_base."base_time" DESC, dedup_base."api_invocation_time" DESC, '
611
+ + 'dedup_base."write_time" DESC\n '
612
+ + ") AS row_base\n "
613
+ + f'FROM "sagemaker_featurestore"."{ base_table_name } " dedup_base\n '
614
+ + ") AS table_base\n "
615
+ + "WHERE row_base = 1\n "
616
+ + "AND NOT is_deleted),\n "
617
+ + 'fg_0 AS (SELECT table_0."fg_id", table_0."fg_time", table_0."fg_feature_1", '
618
+ + 'table_0."fg_feature_2"\n '
619
+ + "FROM (\n "
620
+ + "SELECT *, row_number() OVER (\n "
621
+ + 'PARTITION BY dedup_0."fg_id", dedup_0."fg_feature_1", dedup_0."fg_feature_2"\n '
622
+ + 'ORDER BY dedup_0."fg_time" DESC, dedup_0."api_invocation_time" DESC, '
623
+ + 'dedup_0."write_time" DESC\n '
624
+ + ") AS row_0\n "
625
+ + f'FROM "sagemaker_featurestore"."{ feature_group_table_name } " dedup_0\n '
626
+ + ") AS table_0\n "
627
+ + "WHERE row_0 = 1\n "
628
+ + "AND NOT is_deleted)\n "
629
+ + "SELECT *\n "
630
+ + "FROM fg_base\n "
631
+ + "JOIN fg_0\n "
632
+ + 'ON fg_base."base_id" = fg_0."fg_id"'
633
+ )
634
+
635
+
636
+ def _create_feature_group_and_ingest_data (
637
+ feature_group : FeatureGroup ,
638
+ dataframe : DataFrame ,
639
+ offline_store_s3_uri : str ,
640
+ record_identifier_name : str ,
641
+ event_time_name : str ,
642
+ role : str ,
643
+ ):
644
+ feature_group .load_feature_definitions (data_frame = dataframe )
645
+ feature_group .create (
646
+ s3_uri = offline_store_s3_uri ,
647
+ record_identifier_name = record_identifier_name ,
648
+ event_time_feature_name = event_time_name ,
649
+ role_arn = role ,
650
+ enable_online_store = True ,
651
+ )
652
+ _wait_for_feature_group_create (feature_group )
653
+
654
+ ingestion_manager = feature_group .ingest (data_frame = dataframe , max_workers = 3 , wait = False )
655
+ ingestion_manager .wait ()
656
+ assert 0 == len (ingestion_manager .failed_rows )
657
+
658
+
659
+ def _get_athena_table_name_after_data_replication (
660
+ feature_store_session , feature_group : FeatureGroup , offline_store_s3_uri
661
+ ):
662
+ feature_group_metadata = feature_group .describe ()
663
+ resolved_output_s3_uri = (
664
+ feature_group_metadata .get ("OfflineStoreConfig" , None )
665
+ .get ("S3StorageConfig" , None )
666
+ .get ("ResolvedOutputS3Uri" , None )
667
+ )
668
+ s3_prefix = resolved_output_s3_uri .replace (f"{ offline_store_s3_uri } /" , "" )
669
+ region_name = feature_store_session .boto_session .region_name
670
+ s3_client = feature_store_session .boto_session .client (
671
+ service_name = "s3" , region_name = region_name
672
+ )
673
+ while True :
674
+ objects_in_bucket = s3_client .list_objects (
675
+ Bucket = offline_store_s3_uri .replace ("s3://" , "" ), Prefix = s3_prefix
676
+ )
677
+ if "Contents" in objects_in_bucket and len (objects_in_bucket ["Contents" ]) > 1 :
678
+ break
679
+ else :
680
+ print (f"Waiting for { feature_group .name } data in offline store..." )
681
+ time .sleep (60 )
682
+ print (f"{ feature_group .name } data available." )
683
+ return (
684
+ feature_group_metadata .get ("OfflineStoreConfig" , None )
685
+ .get ("DataCatalogConfig" , None )
686
+ .get ("TableName" , None )
687
+ )
688
+
689
+
530
690
def _wait_for_feature_group_create (feature_group : FeatureGroup ):
531
691
status = feature_group .describe ().get ("FeatureGroupStatus" )
532
692
while status == "Creating" :
@@ -560,3 +720,28 @@ def cleanup_feature_group(feature_group: FeatureGroup):
560
720
feature_group .delete ()
561
721
except Exception :
562
722
raise RuntimeError (f"Failed to delete feature group with name { feature_group .name } " )
723
+
724
+
725
+ @contextmanager
726
+ def cleanup_offline_store (table_name : str , feature_store_session : Session ):
727
+ try :
728
+ yield
729
+ finally :
730
+ try :
731
+ region_name = feature_store_session .boto_session .region_name
732
+ s3_client = feature_store_session .boto_session .client (
733
+ service_name = "s3" , region_name = region_name
734
+ )
735
+ account_id = feature_store_session .account_id ()
736
+ bucket_name = f"sagemaker-test-featurestore-{ region_name } -{ account_id } "
737
+ response = s3_client .list_objects_v2 (
738
+ Bucket = bucket_name ,
739
+ Prefix = f"{ account_id } /sagemaker/{ region_name } /offline-store/{ table_name } /" ,
740
+ )
741
+ files_in_folder = response ["Contents" ]
742
+ files_to_delete = []
743
+ for f in files_in_folder :
744
+ files_to_delete .append ({"Key" : f ["Key" ]})
745
+ s3_client .delete_objects (Bucket = bucket_name , Delete = {"Objects" : files_to_delete })
746
+ except Exception :
747
+ raise RuntimeError (f"Failed to delete data under { table_name } " )
0 commit comments