Skip to content

multiple calls to pipeline.definition() causes extra "code" ProcessingInput to the appended to ProcessingStep #3451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nmadan opened this issue Nov 3, 2022 · 8 comments
Assignees
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug

Comments

@nmadan
Copy link
Member

nmadan commented Nov 3, 2022

Describe the bug
I have multiple calls to pipeline.definition() in my notebook. Everytime I make this call, an extra "code" ProcessingInput gets appended to my ProcessingStep argument definition

To reproduce

prepare_labeled_data_processor = \
    PyTorchProcessor(
        code_location=code_location(),
        instance_count=1,
        instance_type="ml.m5.large",
        **common_attributes
)

step_args = prepare_labeled_data_processor.run(
    source_dir='.',
    code='prepare_labeled_data.py',
    inputs=[ProcessingInput(
        source=p['env/locations/prepared_reviews_labeled_data'],
        destination='/opt/ml/processing/input/data')],
    outputs=[
        ProcessingOutput(
            output_name='labeled-data-train',
            source='/opt/ml/processing/output/train',
            destination=join_expr('labeled-data-train')
        ),
        ProcessingOutput(
            output_name='labeled-data-valid',
            source='/opt/ml/processing/output/valid',
            destination=join_expr('labeled-valid-train')
        )
    ], 
    arguments=[
       '--max-lines-per-output-file', '3000',
       '--max-output-files', '20',
       '--train-valid-ratio', train_valid_split_ratio, 
       '--seed', seed,
    ], 
)

prepare_labeled_data_step = ProcessingStep(
    name='prepare-labeled-data',
    step_args=step_args,
)

After one call to pipeline.definition(), my step json looks like:

{
   "Arguments":{
      "AppSpecification":{
         "ContainerArguments":[
            "--max-lines-per-output-file",
            "3000",
            "--max-output-files",
            "20",
            "--train-valid-ratio",
            {
               "Get":"Parameters.train_valid_split_ratio"
            },
            "--seed",
            {
               "Get":"Parameters.seed"
            }
         ],
         "ContainerEntrypoint":[
            "/bin/bash",
            "/opt/ml/processing/input/entrypoint/runproc.sh"
         ],
         "ImageUri":"763104351884.dkr.ecr.eu-west-1.amazonaws.com/pytorch-training:1.10-cpu-py38"
      },
      "ProcessingInputs":[
         {
            "AppManaged":false,
            "InputName":"input-1",
            "S3Input":{
               "LocalPath":"/opt/ml/processing/input/data",
               "S3CompressionType":"None",
               "S3DataDistributionType":"FullyReplicated",
               "S3DataType":"S3Prefix",
               "S3InputMode":"File",
               "S3Uri":"s3://stackoverflow-dataset/labeled-data/train"
            }
         },
         {
            "AppManaged":false,
            "InputName":"code",
            "S3Input":{
               "LocalPath":"/opt/ml/processing/input/code/",
               "S3CompressionType":"None",
               "S3DataDistributionType":"FullyReplicated",
               "S3DataType":"S3Prefix",
               "S3InputMode":"File",
               "S3Uri":"s3://sagemaker-eu-west-1-426334991426/pipeline-code/stackoverflow-clf-small/code/7bf6d0f7251c2ec38f3c2f7b991a1acb/sourcedir.tar.gz"
            }
         },
         {
            "AppManaged":false,
            "InputName":"code",
            "S3Input":{
               "LocalPath":"/opt/ml/processing/input/code/",
               "S3CompressionType":"None",
               "S3DataDistributionType":"FullyReplicated",
               "S3DataType":"S3Prefix",
               "S3InputMode":"File",
               "S3Uri":"s3://sagemaker-eu-west-1-426334991426/pipeline-code/stackoverflow-clf-small/code/7bf6d0f7251c2ec38f3c2f7b991a1acb/sourcedir.tar.gz"
            }
         },
         {
            "AppManaged":false,
            "InputName":"entrypoint",
            "S3Input":{
               "LocalPath":"/opt/ml/processing/input/entrypoint",
               "S3CompressionType":"None",
               "S3DataDistributionType":"FullyReplicated",
               "S3DataType":"S3Prefix",
               "S3InputMode":"File",
               "S3Uri":"s3://sagemaker-eu-west-1-426334991426/pipeline-code/stackoverflow-clf-small/code/7bf6d0f7251c2ec38f3c2f7b991a1acb/runproc.sh"
            }
         }
      ],
      "ProcessingOutputConfig":{
         "Outputs":[
            {
               "AppManaged":false,
               "OutputName":"labeled-data-train",
               "S3Output":{
                  "LocalPath":"/opt/ml/processing/output/train",
                  "S3UploadMode":"EndOfJob",
                  "S3Uri":{
                     "Std:Join":{
                        "On":"/",
                        "Values":[
                           "s3://nlp-article/pipeline-runs",
                           "2022",
                           "11",
                           {
                              "Get":"Execution.PipelineExecutionId"
                           },
                           "labeled-data-train"
                        ]
                     }
                  }
               }
            }

After every subsequent call, another "code" ProcessingInput gets added
Expected behavior
No extra ProcessingInputs are appended

Additional context
This issue was noticed in sdk version 2.116. It does not exist in v2.115.

@nmadan nmadan added type: bug component: pipelines Relates to the SageMaker Pipeline Platform labels Nov 3, 2022
@nmadan
Copy link
Member Author

nmadan commented Nov 3, 2022

We are seeing this issue not just when pipeline.definition() is called but when step.arguments() is called https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/workflow/steps.py#L862

@brockwade633
Copy link
Contributor

brockwade633 commented Nov 7, 2022

More Context

This issue stems from the lack of idempotency within the FrameworkProcessor, PySparkProcessor, and SparkJarProcessor classes, and thus the problem is limited to those scenarios. These sub classes modify the original user input and often add to it in their run() methods before calling the run() of the inherited class. This means the user inputs continually grow every time run() is called, so the method is not idempotent. So, any upstream method that calls run() will aggravate the issue, including pipeline.definition() and step.arguments mentioned above.

Before the caching improvements were implemented, run() methods were usually executed only once within a notebook. This is because the step arguments were created directly from those function calls, and pipeline.definition() simply built up the step request structures from those passed args. This bug surfaced more prominently after the caching improvements changes because now every time pipeline.definition() is called (and other upstream methods like pipeline.upsert(), pipeline.update()), it directly executes the run() methods. The same thing applies to calling step.arguments.

Work Around

The issue can be avoided by building pipelines and running notebooks in the previous fashion, using sdk version <= 2.115. If using sdk version 2.116, this issue can be mitigated by re-running the cell that first defines the processing input list and then re-building the pipeline. For example:

cell 1:
prepare_labeled_data_processor = PyTorchProcessor(...)

cell 2
step_args = prepare_labeled_data_processor.run(inputs=[...])

cell 3
processing_step = ProcessingStep(step_args=step_args)
pipeline = Pipeline(steps=[processing_step])

cell 4
pipeline.definition()

cell 5
pipeline.create()

cell 6
execution = pipeline.execute()

Run cells 1-4, then re-run cell 2. This will re-define the original, correct inputs. Then move on to cell 5-6.

Another iteration of this issue occurs in the case of PySparkProcessor configurations, where instead of code inputs being duplicated, configuration inputs are duplicated. This specific case along with potential workarounds is discussed in more detail here.

Fix

In this PR, existing unit tests have been extended to test for idempotency in Processing, Training, Transform and Tuning steps. The FrameworkProcessor, PySparkProcessor, and SparkJarProcessor in particular have been adjusted so their run() methods are idempotent: they no longer modify the user inputs directly, but rather make a copy of them.

@harshraj22
Copy link

Mentioned Work Around doesn't seem to work when using a script instead of colab notebooks

@knikure
Copy link
Contributor

knikure commented Dec 5, 2022

Closing this issue as PR#3460 is merged

@knikure knikure closed this as completed Dec 5, 2022
@brockwade633
Copy link
Contributor

Hi @harshraj22, yes, the workaround will be a little more difficult in the context of a regular python script file, rather than a Jupyter notebook. Do you have multiple calls to pipeline.definition() or step.arguments() in your script? One thing you can try is to limit those to just a single call. I will provide an update as soon as I have more info to share.

@harshraj22
Copy link

No there's no multiple call.

step_args = tp.run(
        inputs=inputs,
        outputs=outputs,
        code='src/code/evaluate.py',
        # source_dir='src/code'
        )
step_eval = ProcessingStep(
    name = evaluate_step_name,
    step_args = step_args
    # property_files=[evaluation_report]
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        training_epochs,
        accuracy_mse_threshold
    ],
    steps=[step_preprocess_data, step_train_model, step_eval]#, step_cond],
)

import json

definition = json.loads(pipeline.definition())
pprint(definition)
step_args = tp.run(
        inputs=inputs,
        outputs=outputs,
        code='src/code/evaluate.py',
        ) # https://github.com/aws/sagemaker-python-sdk/issues/3451

pipeline.upsert(role_arn=role)


execution = pipeline.start()

@Cayd3Nine
Copy link

Cayd3Nine commented Dec 14, 2022

Hi @harshraj22,

I had same issue, tried upgrading it was still there, when running locally from script. It looks that inside the pipeline start the step.arguments are called multiple time, I found there if self.step_args: that then calls execute_job_functions that sends the code.

To mitigate the issue one workaround that works for me is to do not create step_args outside/do not call run. You can pass processor inputs/outputs/args/code directly to the ProcessingStep. In your case something like this:

step_eval = ProcessingStep(
    name = evaluate_step_name,
    processor = your_processor,
    inputs=inputs,
    outputs=outputs,
    code="src/code/evaluate.py",
    #step_args = step_args - dont do this
    # property_files=[evaluation_report]
)

This uploaded the code only once.

@brockwade633
Copy link
Contributor

Hi @harshraj22 and @Cayd3Nine, thanks for your comments. There have been several fixes included in the latest package version 2.122.0. Can you try again with this version? If you are still experiencing issues, can you describe what behavior you are seeing? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug
Projects
None yet
Development

No branches or pull requests

5 participants