Skip to content

fix: use pathlib.PurePosixPath for S3 URLs and Unix paths #1763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions src/sagemaker/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from distutils.dir_util import copy_tree
from six.moves.urllib.parse import urlparse

from sagemaker import s3


def copy_directory_structure(destination_directory, relative_path):
"""Create all the intermediate directories required for relative_path to
Expand Down Expand Up @@ -62,8 +64,8 @@ def move_to_destination(source, destination, job_name, sagemaker_session):
final_uri = destination
elif parsed_uri.scheme == "s3":
bucket = parsed_uri.netloc
path = _create_s3_prefix(parsed_uri.path, job_name)
final_uri = "s3://%s/%s" % (bucket, path)
path = s3.s3_path_join(parsed_uri.path, job_name)
final_uri = s3.s3_path_join("s3://", bucket, path)
sagemaker_session.upload_data(source, bucket, path)
else:
raise ValueError("Invalid destination URI, must be s3:// or file://, got: %s" % destination)
Expand All @@ -72,22 +74,6 @@ def move_to_destination(source, destination, job_name, sagemaker_session):
return final_uri


def _create_s3_prefix(path, job_name):
"""Constructs a path out of the given path and job name to be
used as an S3 prefix.

Args:
path (str): the original path. If the path is only ``"/"``,
then it is ignored.
job_name (str): the job name to be appended to the path.

Returns:
str: an S3 prefix of the form ``"path/job_name"``
"""
path = path.strip("/")
return job_name if path == "" else "/".join((path, job_name))


def recursive_copy(source, destination):
"""A wrapper around distutils.dir_util.copy_tree but won't throw any
exception when the source directory does not exist.
Expand Down
5 changes: 2 additions & 3 deletions src/sagemaker/model_monitor/data_capture_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
"""
from __future__ import print_function, absolute_import

import os

from sagemaker import s3
from sagemaker.session import Session

_MODEL_MONITOR_S3_PATH = "model-monitor"
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(
self.destination_s3_uri = destination_s3_uri
if self.destination_s3_uri is None:
sagemaker_session = sagemaker_session or Session()
self.destination_s3_uri = os.path.join(
self.destination_s3_uri = s3.s3_path_join(
"s3://",
sagemaker_session.default_bucket(),
_MODEL_MONITOR_S3_PATH,
Expand Down
85 changes: 50 additions & 35 deletions src/sagemaker/model_monitor/model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import copy
import json
import os
import pathlib
import logging
import uuid

from six import string_types
from six.moves.urllib.parse import urlparse
from botocore.exceptions import ClientError

from sagemaker import image_uris
from sagemaker import image_uris, s3
from sagemaker.exceptions import UnexpectedStatusException
from sagemaker.model_monitor.monitoring_files import Constraints, ConstraintViolations, Statistics
from sagemaker.network import NetworkConfig
from sagemaker.processing import Processor, ProcessingInput, ProcessingJob, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.session import Session
from sagemaker.utils import name_from_base, retries

Expand Down Expand Up @@ -809,8 +809,10 @@ def _normalize_endpoint_input(self, endpoint_input):
if isinstance(endpoint_input, string_types):
endpoint_input = EndpointInput(
endpoint_name=endpoint_input,
destination=os.path.join(
_CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _CONTAINER_ENDPOINT_INPUT_PATH
destination=str(
pathlib.PurePosixPath(
_CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _CONTAINER_ENDPOINT_INPUT_PATH
)
),
)

Expand Down Expand Up @@ -842,13 +844,13 @@ def _normalize_baseline_inputs(self, baseline_inputs=None):
# and save the S3 uri in the ProcessingInput source.
parse_result = urlparse(file_input.source)
if parse_result.scheme != "s3":
s3_uri = os.path.join(
s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.latest_baselining_job_name,
file_input.input_name,
)
S3Uploader.upload(
s3.S3Uploader.upload(
local_path=file_input.source,
desired_s3_uri=s3_uri,
sagemaker_session=self.sagemaker_session,
Expand All @@ -869,7 +871,7 @@ def _normalize_processing_output(self, output=None):
"""
# If the output is a string, turn it into a ProcessingOutput object.
if isinstance(output, string_types):
s3_uri = os.path.join(
s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.latest_baselining_job_name,
Expand All @@ -893,7 +895,7 @@ def _normalize_monitoring_output(self, output=None):
"""
# If the output is a string, turn it into a ProcessingOutput object.
if output.destination is None:
output.destination = os.path.join(
output.destination = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.monitoring_schedule_name,
Expand All @@ -914,7 +916,7 @@ def _s3_uri_from_local_path(self, path):
"""
parse_result = urlparse(path)
if parse_result.scheme != "s3":
s3_uri = os.path.join(
s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
_MODEL_MONITOR_S3_PATH,
Expand All @@ -923,10 +925,10 @@ def _s3_uri_from_local_path(self, path):
_INPUT_S3_PATH,
str(uuid.uuid4()),
)
S3Uploader.upload(
s3.S3Uploader.upload(
local_path=path, desired_s3_uri=s3_uri, sagemaker_session=self.sagemaker_session
)
path = os.path.join(s3_uri, os.path.basename(path))
path = s3.s3_path_join(s3_uri, os.path.basename(path))
return path

def _wait_for_schedule_changes_to_apply(self):
Expand Down Expand Up @@ -1074,8 +1076,10 @@ def suggest_baseline(

normalized_baseline_dataset_input = self._upload_and_convert_to_processing_input(
source=baseline_dataset,
destination=os.path.join(
_CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _BASELINE_DATASET_INPUT_NAME
destination=str(
pathlib.PurePosixPath(
_CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _BASELINE_DATASET_INPUT_NAME
)
),
name=_BASELINE_DATASET_INPUT_NAME,
)
Expand All @@ -1085,34 +1089,44 @@ def suggest_baseline(

normalized_record_preprocessor_script_input = self._upload_and_convert_to_processing_input(
source=record_preprocessor_script,
destination=os.path.join(
_CONTAINER_BASE_PATH, _CONTAINER_INPUT_PATH, _RECORD_PREPROCESSOR_SCRIPT_INPUT_NAME
destination=str(
pathlib.PurePosixPath(
_CONTAINER_BASE_PATH,
_CONTAINER_INPUT_PATH,
_RECORD_PREPROCESSOR_SCRIPT_INPUT_NAME,
)
),
name=_RECORD_PREPROCESSOR_SCRIPT_INPUT_NAME,
)

record_preprocessor_script_container_path = None
if normalized_record_preprocessor_script_input is not None:
record_preprocessor_script_container_path = os.path.join(
normalized_record_preprocessor_script_input.destination,
os.path.basename(record_preprocessor_script),
record_preprocessor_script_container_path = str(
pathlib.PurePosixPath(
normalized_record_preprocessor_script_input.destination,
os.path.basename(record_preprocessor_script),
)
)

normalized_post_processor_script_input = self._upload_and_convert_to_processing_input(
source=post_analytics_processor_script,
destination=os.path.join(
_CONTAINER_BASE_PATH,
_CONTAINER_INPUT_PATH,
_POST_ANALYTICS_PROCESSOR_SCRIPT_INPUT_NAME,
destination=str(
pathlib.PurePosixPath(
_CONTAINER_BASE_PATH,
_CONTAINER_INPUT_PATH,
_POST_ANALYTICS_PROCESSOR_SCRIPT_INPUT_NAME,
)
),
name=_POST_ANALYTICS_PROCESSOR_SCRIPT_INPUT_NAME,
)

post_processor_script_container_path = None
if normalized_post_processor_script_input is not None:
post_processor_script_container_path = os.path.join(
normalized_post_processor_script_input.destination,
os.path.basename(post_analytics_processor_script),
post_processor_script_container_path = str(
pathlib.PurePosixPath(
normalized_post_processor_script_input.destination,
os.path.basename(post_analytics_processor_script),
)
)

normalized_baseline_output = self._normalize_baseline_output(output_s3_uri=output_s3_uri)
Expand Down Expand Up @@ -1631,7 +1645,7 @@ def _normalize_baseline_output(self, output_s3_uri=None):
sagemaker.processing.ProcessingOutput: The normalized ProcessingOutput object.

"""
s3_uri = output_s3_uri or os.path.join(
s3_uri = output_s3_uri or s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
_MODEL_MONITOR_S3_PATH,
Expand All @@ -1640,7 +1654,7 @@ def _normalize_baseline_output(self, output_s3_uri=None):
_RESULTS_S3_PATH,
)
return ProcessingOutput(
source=os.path.join(_CONTAINER_BASE_PATH, _CONTAINER_OUTPUT_PATH),
source=str(pathlib.PurePosixPath(_CONTAINER_BASE_PATH, _CONTAINER_OUTPUT_PATH)),
destination=s3_uri,
output_name=_DEFAULT_OUTPUT_NAME,
)
Expand All @@ -1655,7 +1669,7 @@ def _normalize_monitoring_output(self, output_s3_uri=None):
sagemaker.model_monitor.MonitoringOutput: The normalized MonitoringOutput object.

"""
s3_uri = output_s3_uri or os.path.join(
s3_uri = output_s3_uri or s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
_MODEL_MONITOR_S3_PATH,
Expand All @@ -1664,7 +1678,8 @@ def _normalize_monitoring_output(self, output_s3_uri=None):
_RESULTS_S3_PATH,
)
output = MonitoringOutput(
source=os.path.join(_CONTAINER_BASE_PATH, _CONTAINER_OUTPUT_PATH), destination=s3_uri
source=str(pathlib.PurePosixPath(_CONTAINER_BASE_PATH, _CONTAINER_OUTPUT_PATH)),
destination=s3_uri,
)

return output
Expand Down Expand Up @@ -1741,7 +1756,7 @@ def _upload_and_convert_to_processing_input(self, source, destination, name):
parse_result = urlparse(url=source)

if parse_result.scheme != "s3":
s3_uri = os.path.join(
s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
_MODEL_MONITOR_S3_PATH,
Expand All @@ -1750,7 +1765,7 @@ def _upload_and_convert_to_processing_input(self, source, destination, name):
_INPUT_S3_PATH,
name,
)
S3Uploader.upload(
s3.S3Uploader.upload(
local_path=source, desired_s3_uri=s3_uri, sagemaker_session=self.sagemaker_session
)
source = s3_uri
Expand Down Expand Up @@ -1839,7 +1854,7 @@ def baseline_statistics(self, file_name=STATISTICS_JSON_DEFAULT_FILE_NAME, kms_k
try:
baselining_job_output_s3_path = self.outputs[0].destination
return Statistics.from_s3_uri(
statistics_file_s3_uri=os.path.join(baselining_job_output_s3_path, file_name),
statistics_file_s3_uri=s3.s3_path_join(baselining_job_output_s3_path, file_name),
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
Expand Down Expand Up @@ -1877,7 +1892,7 @@ def suggested_constraints(self, file_name=CONSTRAINTS_JSON_DEFAULT_FILE_NAME, km
try:
baselining_job_output_s3_path = self.outputs[0].destination
return Constraints.from_s3_uri(
constraints_file_s3_uri=os.path.join(baselining_job_output_s3_path, file_name),
constraints_file_s3_uri=s3.s3_path_join(baselining_job_output_s3_path, file_name),
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
Expand Down Expand Up @@ -1993,7 +2008,7 @@ def statistics(self, file_name=STATISTICS_JSON_DEFAULT_FILE_NAME, kms_key=None):
try:
baselining_job_output_s3_path = self.outputs[0].destination
return Statistics.from_s3_uri(
statistics_file_s3_uri=os.path.join(baselining_job_output_s3_path, file_name),
statistics_file_s3_uri=s3.s3_path_join(baselining_job_output_s3_path, file_name),
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
)
Expand Down Expand Up @@ -2033,7 +2048,7 @@ def constraint_violations(
try:
baselining_job_output_s3_path = self.outputs[0].destination
return ConstraintViolations.from_s3_uri(
constraint_violations_file_s3_uri=os.path.join(
constraint_violations_file_s3_uri=s3.s3_path_join(
baselining_job_output_s3_path, file_name
),
kms_key=kms_key,
Expand Down
23 changes: 11 additions & 12 deletions src/sagemaker/model_monitor/monitoring_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

from botocore.exceptions import ClientError

from sagemaker import s3
from sagemaker.session import Session
from sagemaker.s3 import S3Downloader
from sagemaker.s3 import S3Uploader

NO_SUCH_KEY_CODE = "NoSuchKey"

Expand Down Expand Up @@ -68,7 +67,7 @@ def save(self, new_save_location_s3_uri=None):
if new_save_location_s3_uri is not None:
self.file_s3_uri = new_save_location_s3_uri

return S3Uploader.upload_string_as_file_body(
return s3.S3Uploader.upload_string_as_file_body(
body=json.dumps(self.body_dict),
desired_s3_uri=self.file_s3_uri,
kms_key=self.kms_key,
Expand Down Expand Up @@ -119,7 +118,7 @@ def from_s3_uri(cls, statistics_file_s3_uri, kms_key=None, sagemaker_session=Non
"""
try:
body_dict = json.loads(
S3Downloader.read_file(
s3.S3Downloader.read_file(
s3_uri=statistics_file_s3_uri, sagemaker_session=sagemaker_session
)
)
Expand Down Expand Up @@ -158,10 +157,10 @@ def from_string(
"""
sagemaker_session = sagemaker_session or Session()
file_name = file_name or "statistics.json"
desired_s3_uri = os.path.join(
desired_s3_uri = s3.s3_path_join(
"s3://", sagemaker_session.default_bucket(), "monitoring", str(uuid.uuid4()), file_name
)
s3_uri = S3Uploader.upload_string_as_file_body(
s3_uri = s3.S3Uploader.upload_string_as_file_body(
body=statistics_file_string,
desired_s3_uri=desired_s3_uri,
kms_key=kms_key,
Expand Down Expand Up @@ -245,7 +244,7 @@ def from_s3_uri(cls, constraints_file_s3_uri, kms_key=None, sagemaker_session=No
"""
try:
body_dict = json.loads(
S3Downloader.read_file(
s3.S3Downloader.read_file(
s3_uri=constraints_file_s3_uri, sagemaker_session=sagemaker_session
)
)
Expand Down Expand Up @@ -287,10 +286,10 @@ def from_string(
"""
sagemaker_session = sagemaker_session or Session()
file_name = file_name or "constraints.json"
desired_s3_uri = os.path.join(
desired_s3_uri = s3.s3_path_join(
"s3://", sagemaker_session.default_bucket(), "monitoring", str(uuid.uuid4()), file_name
)
s3_uri = S3Uploader.upload_string_as_file_body(
s3_uri = s3.S3Uploader.upload_string_as_file_body(
body=constraints_file_string,
desired_s3_uri=desired_s3_uri,
kms_key=kms_key,
Expand Down Expand Up @@ -399,7 +398,7 @@ def from_s3_uri(cls, constraint_violations_file_s3_uri, kms_key=None, sagemaker_
"""
try:
body_dict = json.loads(
S3Downloader.read_file(
s3.S3Downloader.read_file(
s3_uri=constraint_violations_file_s3_uri, sagemaker_session=sagemaker_session
)
)
Expand Down Expand Up @@ -442,10 +441,10 @@ def from_string(
"""
sagemaker_session = sagemaker_session or Session()
file_name = file_name or "constraint_violations.json"
desired_s3_uri = os.path.join(
desired_s3_uri = s3.s3_path_join(
"s3://", sagemaker_session.default_bucket(), "monitoring", str(uuid.uuid4()), file_name
)
s3_uri = S3Uploader.upload_string_as_file_body(
s3_uri = s3.S3Uploader.upload_string_as_file_body(
body=constraint_violations_file_string,
desired_s3_uri=desired_s3_uri,
kms_key=kms_key,
Expand Down
Loading