19
19
20
20
import botocore
21
21
22
+ from sagemaker .experiments ._utils import is_already_exist_error
23
+
22
24
logger = logging .getLogger (__name__ )
23
25
24
26
@@ -171,12 +173,20 @@ def create_artifact(self, sagemaker_session):
171
173
if self .etag :
172
174
source_ids .append ({"SourceIdType" : "S3ETag" , "Value" : self .etag })
173
175
174
- response = sagemaker_session .sagemaker_client .create_artifact (
175
- ArtifactName = self .name ,
176
- ArtifactType = self .artifact_type ,
177
- Source = {"SourceUri" : self .source_uri , "SourceTypes" : source_ids },
178
- )
179
- self .artifact_arn = response ["ArtifactArn" ]
176
+ try :
177
+ response = sagemaker_session .sagemaker_client .create_artifact (
178
+ ArtifactName = self .name ,
179
+ ArtifactType = self .artifact_type ,
180
+ Source = {"SourceUri" : self .source_uri , "SourceTypes" : source_ids },
181
+ )
182
+ self .artifact_arn = response ["ArtifactArn" ]
183
+ except botocore .exceptions .ClientError as err :
184
+ err_info = err .response ["Error" ]
185
+ if not is_already_exist_error (err_info ):
186
+ raise
187
+ logger .warning (
188
+ "Skip creating the artifact since it already exists: %s" , err_info ["Message" ]
189
+ )
180
190
181
191
def add_association (self , sagemaker_session ):
182
192
"""Associate the artifact with a source/destination ARN (e.g. trial component arn)
@@ -191,9 +201,17 @@ def add_association(self, sagemaker_session):
191
201
# if the trial component (job) is the source then it produced the artifact,
192
202
# otherwise the artifact contributed to the trial component (job)
193
203
association_edge_type = "Produced" if self .source_arn else "ContributedTo"
194
- sagemaker_session .sagemaker_client .add_association (
195
- SourceArn = source_arn , DestinationArn = dest_arn , AssociationType = association_edge_type
196
- )
204
+ try :
205
+ sagemaker_session .sagemaker_client .add_association (
206
+ SourceArn = source_arn , DestinationArn = dest_arn , AssociationType = association_edge_type
207
+ )
208
+ except botocore .exceptions .ClientError as err :
209
+ err_info = err .response ["Error" ]
210
+ if not is_already_exist_error (err_info ):
211
+ raise
212
+ logger .warning (
213
+ "Skip associating since the association already exists: %s" , err_info ["Message" ]
214
+ )
197
215
198
216
199
217
class _LineageArtifactTracker (object ):
@@ -246,87 +264,3 @@ def save(self):
246
264
for artifact in self .artifacts :
247
265
artifact .create_artifact (self .sagemaker_session )
248
266
artifact .add_association (self .sagemaker_session )
249
-
250
-
251
- class _ArtifactConverter (object ):
252
- """Converts data to easily consumed by Studio."""
253
-
254
- @classmethod
255
- def convert_dict_to_fields (cls , values ):
256
- """Converts a dictionary to list of field types.
257
-
258
- Args:
259
- values (dict): The values of the dictionary.
260
-
261
- Returns:
262
- dict: Dictionary of fields.
263
- """
264
- fields = []
265
- for key in values :
266
- fields .append ({"name" : key , "type" : "string" })
267
- return fields
268
-
269
- @classmethod
270
- def convert_data_frame_to_values (cls , data_frame ):
271
- """Converts a pandas data frame to a dictionary in the table artifact format.
272
-
273
- Args:
274
- data_frame (DataFrame): The pandas data frame to convert.
275
-
276
- Returns:
277
- dict: dictionary of values in the format needed to log the artifact.
278
- """
279
- df_dict = data_frame .to_dict ()
280
- new_df = {}
281
- for key in df_dict :
282
- col_value = df_dict [key ]
283
- values = []
284
-
285
- for row_key in col_value :
286
- values .append (col_value [row_key ])
287
-
288
- new_df [key ] = values
289
-
290
- return new_df
291
-
292
- @classmethod
293
- def convert_data_frame_to_fields (cls , data_frame ):
294
- """Converts a dataframe to a dictionary describing the type of fields.
295
-
296
- Args:
297
- data_frame(DataFrame): The data frame to convert.
298
-
299
- Returns:
300
- dict: Dictionary of fields.
301
- """
302
- fields = []
303
-
304
- for key in data_frame :
305
- col_type = data_frame .dtypes [key ]
306
- fields .append (
307
- {"name" : key , "type" : _ArtifactConverter .convert_df_type_to_simple_type (col_type )}
308
- )
309
- return fields
310
-
311
- @classmethod
312
- def convert_df_type_to_simple_type (cls , data_frame_type ):
313
- """Converts a dataframe type to a type for rendering a table in Studio.
314
-
315
- Args:
316
- data_frame_type (str): The pandas type.
317
-
318
- Returns:
319
- str: The type of the table field.
320
- """
321
-
322
- type_pairs = [
323
- ("datetime" , "datetime" ),
324
- ("float" , "number" ),
325
- ("int" , "number" ),
326
- ("uint" , "number" ),
327
- ("boolean" , "boolean" ),
328
- ]
329
- for pair in type_pairs :
330
- if str (data_frame_type ).lower ().startswith (pair [0 ]):
331
- return pair [1 ]
332
- return "string"
0 commit comments