Skip to content

Custom s3 path for uploading spark configuration files #3200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nebur395 opened this issue Jun 28, 2022 · 4 comments
Closed

Custom s3 path for uploading spark configuration files #3200

nebur395 opened this issue Jun 28, 2022 · 4 comments
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: feature request

Comments

@nebur395
Copy link

Describe the feature you'd like
In order to be able to organise our S3 bucket in which the input and the outputs are being uploaded for our Sagemaker pipelines, we want to be able to specify a custom spark configuration path. To improve our data lineage we are trying to store these files by pipeline name and execution ids.

Currently there is no possibility to do so and those spark config files are being uploaded automatically following this convention:
https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/spark/processing.py#L391-L394.

So Ideally we would like to be able to specify a custom s3 path or at least a prefix for the uploaded files in S3. It's important to be able to use ExecutionVariables in that s3 path. Something like this:

serialized_configuration = BytesIO(json.dumps(configuration["content"]).encode("utf-8"))
if configuration["s3_uri"] != None:
    s3_uri = (
        f"{configuration["s3_uri"]}/{self._conf_file_name}"
    )
else:
    s3_uri = (
        f"s3://{self.sagemaker_session.default_bucket()}/{self._current_job_name}/"
        f"input/{self._conf_container_input_name}/{self._conf_file_name}"
    )

How would this feature be used? Please describe.

import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join

boto_session = boto3.Session(region_name="eu-west-1")

sagemaker_client = boto_session.client("sagemaker")

default_bucket = "XYZ"

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    default_bucket=default_bucket
)

bucket_pipeline_execution_prefix = [
    "s3:/", default_bucket, ExecutionVariables.PIPELINE_NAME, ExecutionVariables.PIPELINE_EXECUTION_ID
]

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    role="XYZ",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    image_uri="XYZ",
    sagemaker_session=sagemaker_session
)

spark_config = {}

run_args = spark_processor.get_run_args(
    submit_app="./processing.py",
    configuration={ 
        content: spark_config,
        s3_uri: Join(on="/", values=[
            *bucket_pipeline_execution_prefix, "spark-config"
        ])
    },
    inputs=[],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            output_name='demo-output',
            destination=Join(on="/", values=[
                *bucket_pipeline_execution_prefix, "preprocessed"
            ])
        )
    ],
    spark_event_logs_s3_uri=Join(on="/", values=[
                *bucket_pipeline_execution_prefix, "spark-event-logs"
            ]) # work in progress: https://github.com/aws/sagemaker-python-sdk/pull/3167
)

step_process = ProcessingStep(
    name="demo-processing",
    processor=spark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    code=run_args.code,
)

pipeline = Pipeline(
    name='demo-pipeline',
    parameters=[],
    sagemaker_session=sagemaker_session,
    steps=[step_process],
)

pipeline.upsert(role_arn="XYZ")

execution = pipeline.start()

In the end the s3 bucket should look like something similar to this:

|- XYZ 
|--- demo-pipeline
|----- execution-1
|------- spark-config
|--------- configuration.json
|------- spark-event-logs
|--------- application_0001
|------- preprocessed
|--------- *.* # processing output files

Therefore now you have everything related with that demo-processing (spark app logs, spark app config files, spark outputs...) under s3://XYZ/demo-pipeline/execution-1/, increasing data lineage, and reproducibility

@navaj0 navaj0 added type: feature request component: pipelines Relates to the SageMaker Pipeline Platform labels Jun 29, 2022
@jmahlik
Copy link
Contributor

jmahlik commented Jun 30, 2022

I also ran in to this when using the spark processors for a normal job where certain prefixes in a bucket are locked down. There's no way to write the files to a different prefix other than the root of the bucket.

I ended up patching around it with the hack below. Ideally, this could either be a direct input to the spark processors or specified in the conf like @nebur395 was thinking. It seems like specifying the url in the config might fail _validate_configuration though.

from sagemaker.spark.processing import _SparkProcessorBase, PySparkProcessor
from io import BytesIO
from sagemaker.s3 import S3Uploader

def _stage_configuration(self, configuration):
    """Serializes and uploads the user-provided EMR application configuration to S3.

    This method prepares an input channel.

    Args:
        configuration (Dict): the configuration dict for the EMR application configuration.
    """

    serialized_configuration = BytesIO(json.dumps(configuration).encode("utf-8"))
    s3_uri = (
        # Patch
        f"{my_custom_prefix}"
        f"input/{self._conf_container_input_name}/{self._conf_file_name}"
    )

    S3Uploader.upload_string_as_file_body(
        body=serialized_configuration,
        desired_s3_uri=s3_uri,
        sagemaker_session=self.sagemaker_session,
    )

    conf_input = ProcessingInput(
        source=s3_uri,
        destination=f"{self._conf_container_base_path}{self._conf_container_input_name}",
        input_name=_SparkProcessorBase._conf_container_input_name,
    )
    return conf_input

_SparkProcessorBase._stage_configuration = _stage_configuration

spark_processor = PySparkProcessor(
    base_job_name="my-job",
    role=role,
    instance_count=1,
    instance_type=instance_type,
    image_uri=image,
    sagemaker_session=sagemaker_session,
)

@DmitriyLamzin
Copy link

Yes. One more reason to have this feature is that with every pipeline deployment operation a configuration file is uploaded to a new directory, which breaks a step caching functionality.

So I would use this feature in the opposite way. I would set a static path for the configuration file (or based on its hash, whatever). This would allow using the step caching.

@jmahlik
Copy link
Contributor

jmahlik commented Aug 23, 2023

I believe this is fixed?

@martinRenou
Copy link
Collaborator

Closing as fixed by #3486

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: feature request
Projects
None yet
Development

No branches or pull requests

5 participants