Skip to content

ClientError when executing SM Pipeline with multiple Spark processing steps #3477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jpcpereira opened this issue Nov 21, 2022 · 7 comments
Closed
Assignees
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug

Comments

@jpcpereira
Copy link

Describe the bug
When executing a SageMaker Pipeline that includes several PySpark Processing steps, a ClientError occurs at the start of the pipeline execution. This seems to be due to the fact that the Spark settings passed to the PySparkProcessor via the configuration argument are handled by the SageMaker Python SDK using a ProcessingInput, which uses conf as input name for each and every ProcessingInput created for each of the Spark Processing jobs. This leads to the ClientError below, as the input names have to be globally unique.

To reproduce
Run a PySparkProcessor making use of the configuration argument to pass Spark configurations such as the following:

pyspark_config = [{
    "Classification": "spark-defaults",
    "Properties": {
      "spark.executor.memory": "15g",
      "spark.driver.memory": "40g",
}]

Expected behavior
SageMaker Pipeline with multiple Spark Processing steps executes successfully.

Screenshots or logs
The following ClientError is displayed in the SageMaker Studio UI:

ClientError: Failed to invoke sagemaker:CreateProcessingJob. Error Details: Input and Output names must be globally unique: [conf]

ProcessingInput created by SageMaker in multiple jobs has always the same input name (conf):

{
    "AppManaged": false,
    "InputName": "conf",
    "S3Input": {
    "LocalPath": "/opt/ml/processing/input/conf",
    "S3CompressionType": "None",
    "S3DataDistributionType": "FullyReplicated",
    "S3DataType": "S3Prefix",
    "S3InputMode": "File",
    "S3Uri": "XXXXX/configuration.json"
}
  • S3Uri masked with XXXXX to hide private information.

System information
A description of your system. Please provide:

  • SageMaker Python SDK version: 2.117
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): Spark
  • Framework version: Spark 3.1
  • Python version: 3.7
  • CPU or GPU: CPU
  • Custom Docker image (Y/N): No, using the AWS pre-built image (657317673100.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-spark-processing:3.1-v1)

Additional context
Let me know if you need further information about the reported behavior.

@wsykala
Copy link

wsykala commented Nov 23, 2022

Looks like the error is not really about the multiple jobs having conf input name. The name can be the same across different jobs. You can't however have the same input names in a single job. Tis is most likely related to the behaviour I described in #3484.

@jpcpereira Downgrading the version of sagemaker to 2.115.0 should make the error disappear. It's not ideal, but can be a workaround until it is fixed.

@jpcpereira
Copy link
Author

Looks like the error is not really about the multiple jobs having conf input name. The name can be the same across different jobs. You can't however have the same input names in a single job. Tis is most likely related to the behaviour I described in #3484.

@jpcpereira Downgrading the version of sagemaker to 2.115.0 should make the error disappear. It's not ideal, but can be a workaround until it is fixed.

Hi @wsykala ,

Ok, that might be the case indeed, but then I would think that the fix that we implemented would not make it work:
We simply manually created ourselves the ProcessingInput with the spark configuration (and a unique input_name across jobs) and removed the passing of the same using the configuration argument, i.e.,

processing_step_args = spark_processor.run(
        submit_app=f"scripts/{submit_app}",
        job_name=spark_processor.base_job_name,
        arguments=[str(i) for i in spark_arguments],
        inputs=[
            ProcessingInput(
                source="scripts",
                input_name="scripts",
                destination="/opt/ml/processing/input/code/scripts",
            ),
            ProcessingInput(
                source=spark_config_path,
                input_name=f"conf-{step_name}",
                destination="/opt/ml/processing/input/conf",
            ),
        ],
        #configuration=spark_config,
    )

This runs successfully with SageMaker 2.117.0.

@wsykala
Copy link

wsykala commented Nov 23, 2022

@jpcpereira I reproduced your example - created a dummy PySparkProcessor with some input, and I passed a spark_config object. Then I printed out the arguments property two times. This triggered the same behaviour as I described in #3484, meaning that I had two conf inputs. Interestingly, if the configuration is not passed, then this behaviour does not happen (the same as your fix).

My guess is that somewhere in your code you are accessing the arguments property - maybe taking output of one job as an input to other job, and this together with passing the configuration argument, was the reason for your error.
I believe that you can safely name your inputs as conf instead of conf-{step_name} and as long as you don't pass the spark_config in run, your processing job will still work.

@jpcpereira
Copy link
Author

@jpcpereira I reproduced your example - created a dummy PySparkProcessor with some input, and I passed a spark_config object. Then I printed out the arguments property two times. This triggered the same behaviour as I described in #3484, meaning that I had two conf inputs. Interestingly, if the configuration is not passed, then this behaviour does not happen (the same as your fix).

My guess is that somewhere in your code you are accessing the arguments property - maybe taking output of one job as an input to other job, and this together with passing the configuration argument, was the reason for your error. I believe that you can safely name your inputs as conf instead of conf-{step_name} and as long as you don't pass the spark_config in run, your processing job will still work.

Hi @wsykala ,

Ok I see, so we just really need to pass the spark config directly as a ProcessingInput even with the input name conf and it should work.
We will stick to this solution until the issue you've described is fixed.

Thanks,
João P.

@brockwade633
Copy link
Contributor

brockwade633 commented Nov 28, 2022

Hi @wsykala and @jpcpereira, thanks for the info in this thread. I will leave a little more context here for this issue, it is due to idempotency problems with certain Processor subclasses, and the root issue is being tracked here. The configuration input is another example of this. In the meantime, as you stated, removing the configuration param altogether helps mitigate the problem by preventing the user inputs from being modified.

@brockwade633 brockwade633 self-assigned this Nov 28, 2022
@brockwade633
Copy link
Contributor

Hi @wsykala and @jpcpereira, a new sagemaker package version (2.120.0) is available that includes the fix for the idempotency issues. Are you able to confirm that you are no longer seeing the duplicate conf inputs behavior? Thanks.

@jpcpereira
Copy link
Author

Hi @wsykala and @jpcpereira, a new sagemaker package version (2.120.0) is available that includes the fix for the idempotency issues. Are you able to confirm that you are no longer seeing the duplicate conf inputs behavior? Thanks.

Hi @brockwade633 & @wsykala , we have tested running the SageMaker Pipeline with the Processing jobs' definition that was failing before and confirmed that the fix works. We are now able to pass the Spark configuration without issues.
Thanks for the effort,
João Pereira

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug
Projects
None yet
Development

No branches or pull requests

3 participants