Skip to content

upserting a pipeline kills caching #2736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
neilmcguigan opened this issue Oct 27, 2021 · 8 comments
Closed

upserting a pipeline kills caching #2736

neilmcguigan opened this issue Oct 27, 2021 · 8 comments
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: feature request

Comments

@neilmcguigan
Copy link

Describe the bug
Upserting a pipeline always causes cache misses

To reproduce
Create a pipeline with caching enabled. Run the pipeline. Upsert the pipeline. Run the pipeline again. You'll get a cache miss every time

cache_config = CacheConfig(
    enable_caching=True,
    expire_after = "P30M"
)

# define the pre-processor:
processor = PySparkProcessor(
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    framework_version="3.0"
)

step1 = ProcessingStep(
    name="step1",
    processor=processor,
    code="processor1.py",
    cache_config=cache_config
)

# define the pipeline
pipeline = Pipeline(
    name="pipeline1",
    steps=[step1]
)

pipeline.upsert(role_arn="...")

pipeline.start()

Expected behavior
I expect a cache hit, even if I upsert a pipeline

System information
A description of your system. Please provide:

  • SageMaker Python SDK version: 2.66.2 (latest)
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): any
  • Framework version: any
  • Python version: 3.8.10
  • CPU or GPU: any
  • Custom Docker image (Y/N): no

Additional context
Add any other context about the problem here.

@staubhp
Copy link
Contributor

staubhp commented Oct 28, 2021

This is a result of Processors in the SDK appending timestamps to their output paths. This changes the input arguments to the ProcessingStep and results in a cache miss when running the pipeline.

You can override the automatically generated output path by setting the destination arg on the ProcessingOutput, like this:

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination="s3://myBucket/myStaticOutputPath"),
    ],
    code="abalone/preprocessing.py",
)

Note that this will fix your caching issue, but a new problem is introduced: All executions of this pipeline will write to the same S3 path. A new execution will overwrite the output of an old execution (generally OK), but concurrent pipeline executions will both be writing to the same S3 path at the same time (can break things if your processing jobs are outputting statically named paths and files).

To address that, you could parameterize the S3 path like this:

processor_output_path =  ParameterString(
    name="processor_output_path",
    default_value='s3://myBucket/myStaticOutputPath',
)

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=processor_output_path),
    ],
    code="abalone/preprocessing.py",
)

At least this way you have some control over the behavior when starting an execution. If you want caching to work then you can call pipeline.start() with no parameters. If you want a run to write to a different path (because you're running it concurrently with another execution for example) you can specify a different path: pipeline.start(parameters={'processor_output_path': 's3://myBucket/myDifferentOutputPath'})

@staubhp staubhp added the component: pipelines Relates to the SageMaker Pipeline Platform label Oct 28, 2021
@purak24
Copy link

purak24 commented Nov 8, 2021

This still results in a cache miss. I believe the reason is that even the code script ("abalone/preprocessing.py") will be a ProcessingInput behind the scenes. The file key on S3 where the code script is uploaded is also getting appended with timestamp resulting in different inputs and hence cache miss. Note this happens even if we provide the name in ProcessingStep as that is treated as a base_job_name instead of _current_job_name. Is there any way we can set the _current_job_name from the ProcessingStep to circumvent this?

@staubhp
Copy link
Contributor

staubhp commented Nov 30, 2021

You're right, it turns out we are not passing the job_name from the processor when normalizing the args in the ProcessingStep, which will result in a generated job name each time.

This should be a simple fix which we'll prioritize. The only workaround I can offer now is providing an S3 URI for your code arg instead of a local file to prevent the SDK from uploading it with the generated job name.

@giovannirescia-upwork
Copy link

giovannirescia-upwork commented Dec 3, 2021

@staubhp what about for the training step? I am trying the Abalon example and still getting cache misses.

@staubhp
Copy link
Contributor

staubhp commented Dec 6, 2021

@giovannirescia-upwork Training has a different problem; it is appending a timestamp to the default profiler rule:

{'Name': 'AbaloneTrain',
   'Type': 'Training',
   ...
    'ProfilerRuleConfigurations': [{'RuleConfigurationName': 'ProfilerReport-1638820982',
      'RuleEvaluatorImage': '895741380848.dkr.ecr.us-west-2.amazonaws.com/sagemaker-debugger-rules:latest',
   ...
}

If you're not using profiling, you can disable this in your estimator to get rid of that dynamic element:

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    disable_profiler=True
)

If you need profiling, you'll need to set the profiler_config on the Estimator to avoid the timestamped default name the SDK generates.

@jerrypeng7773
Copy link
Contributor

@neilmcguigan Looks like the fix was merged, closing now, feel free to re-open if you have any other concern.

@eugeneyarovoi
Copy link

If you want caching to work then you can call pipeline.start() with no parameters. If you want a run to write to a different path (because you're running it concurrently with another execution for example) you can specify a different path

I'm wondering if there's going to be any better solution than this. Sometimes we don't know whether an execution will need to execute concurrently with another execution or not.

My preferred behavior, which would solve this problem, would be something like this: the output paths of a step shouldn't be part of the cache key; rather, they should be copied from the cache hit when applicable. So, if the step is a cache miss and it executes, its output path will be what was assigned, for example a path with a timestamp or unique identifier, so we're not conflicting with any other path. If, however, the step is a cache hit, it should inherit the output paths from the step that was cached - after all, its outputs are obviously the same since the step isn't being executed. This seems more in line with how a cache should work, a sort of key-value store. In the current implementation, the cache isn't used to retrieve any outputs, which I'd argue is counter-intuitive.

Consider now how this would handle 2 concurrent executions of a step. If both executions are a cache miss, by this logic, both executions complete and both write to their own unique location. Both are then valid for any downstream pipeline steps. Future executions of the step will cache hit and retrieve one of these executions, the one that finished last. That execution's outputs will now be the (skipped due to caching) step's outputs. For example, if we need this step's output as the input to another step, the information will be propagated.

Right now, this kind of logic doesn't work because setting a unique output location (or omitting the explicit output location) for a step causes it to always cache miss. Is there any way to exclude the outputs from the cache key so that caching can work like this?

@omarkahwaji
Copy link

@staubhp For training step, the suggested fix does not work and the step is not cached when the pipeline is upserted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: feature request
Projects
None yet
Development

No branches or pull requests

8 participants