Skip to content

SM Studio: Pipeline.definition() or Pipeline.upsert() crashes with a dictionary deepcopy error from within Python 3.7: "TypeError: can't pickle _thread.lock objects" #2478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
dkruijs opened this issue Jun 21, 2021 · 3 comments
Labels
component: pipelines Relates to the SageMaker Pipeline Platform

Comments

@dkruijs
Copy link

dkruijs commented Jun 21, 2021

Describe the bug

UPDATE: see comment below.

SM Studio: When attempting to call Pipeline.definition() or Pipeline.upsert(), the operation crashes with the error "TypeError: can't pickle _thread.lock objects" from within Python 3.7 itself, when attempting a dictionary deepcopy called from workflow/pipeline.py.

To reproduce
A clear, step-by-step set of instructions to reproduce the bug.

Run a SM studio notebook in an ml.t3.medium instance with the Data Science kernel (Python 3.7), with the following package versions installed:

boto                               2.49.0
boto3                             1.17.97
botocore                         1.20.97
sagemaker                      2.46.0

Create a pipeline with any definition and run either of the above commands.

Expected behavior
I expected the pipeline to be described or upserted, or at least to receive an error relating to my input.

Screenshots or logs
If applicable, add screenshots or logs to help explain your problem.

import json

definition = json.loads(pipeline.definition())
definition

stack trace:

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-52-da30710216f5> in <module>
      2 
      3 
----> 4 definition = json.loads(pipeline.definition())
      5 definition
      6 

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in definition(self)
    243             request_dict["PipelineExperimentConfig"]
    244         )
--> 245         request_dict["Steps"] = interpolate(request_dict["Steps"])
    246 
    247         return json.dumps(request_dict)

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in interpolate(request_obj)
    273         RequestType: The request dict with Parameter values replaced by their expression.
    274     """
--> 275     request_obj_copy = deepcopy(request_obj)
    276     return _interpolate(request_obj_copy)
    277 

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_list(x, memo, deepcopy)
    214     append = y.append
    215     for a in x:
--> 216         append(deepcopy(a, memo))
    217     return y
    218 d[list] = _deepcopy_list

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_list(x, memo, deepcopy)
    214     append = y.append
    215     for a in x:
--> 216         append(deepcopy(a, memo))
    217     return y
    218 d[list] = _deepcopy_list

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    178                     y = x
    179                 else:
--> 180                     y = _reconstruct(x, memo, *rv)
    181 
    182     # If is its own copy, don't memoize.

/opt/conda/lib/python3.7/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    279     if state is not None:
    280         if deep:
--> 281             state = deepcopy(state, memo)
    282         if hasattr(y, '__setstate__'):
    283             y.__setstate__(state)

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    148     copier = _deepcopy_dispatch.get(cls)
    149     if copier:
--> 150         y = copier(x, memo)
    151     else:
    152         try:

/opt/conda/lib/python3.7/copy.py in _deepcopy_dict(x, memo, deepcopy)
    239     memo[id(x)] = y
    240     for key, value in x.items():
--> 241         y[deepcopy(key, memo)] = deepcopy(value, memo)
    242     return y
    243 d[dict] = _deepcopy_dict

/opt/conda/lib/python3.7/copy.py in deepcopy(x, memo, _nil)
    167                     reductor = getattr(x, "__reduce_ex__", None)
    168                     if reductor:
--> 169                         rv = reductor(4)
    170                     else:
    171                         reductor = getattr(x, "__reduce__", None)

TypeError: can't pickle _thread.lock objects

System information
A description of your system. Please provide:

  • SageMaker Python SDK version: 2.46.0
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): N/A
  • Framework version: N/A
  • Python version: 3.7.10
  • CPU or GPU: CPU
  • Custom Docker image (Y/N): Y

Additional context
I've attached the notebook file and a pip list output for reference. This notebook is based on the 'Abalone' example.

pip-list.txt
car-data-end-to-end.zip

@dkruijs
Copy link
Author

dkruijs commented Jun 21, 2021

Update: I've found the issue's cause for my environment: adding a step dependency caused the issue for me. In one of my final cells, I added a custom dependency between the first (processing) step and the second (training) step:

from sagemaker.workflow.pipeline import Pipeline

# Since there is no data dependency between the processing and training steps visible to SageMaker, we make this dependency explicit:
step_train.add_depends_on([step_process])


pipeline_name = f"car-data-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, 
           step_train, 
           step_eval, 
           step_cond],
)

If I comment out the line step_train.add_depends_on([step_process]), my pipeline.definition() call works succesfully. So it seems this is a bug...

I found the way to add custom dependencies on this documentation page.

@EthanShouhanCheng EthanShouhanCheng added the component: pipelines Relates to the SageMaker Pipeline Platform label Jun 23, 2021
@kirit93
Copy link

kirit93 commented Jun 25, 2021

@dkruijs - can you try changing the add_depends_on call to step_train.add_depends_on([step_process.name])

The name attribute for the step is what needs to be passed while creating the dependency.

@jerrypeng7773
Copy link
Contributor

This is fixed by PR #2504

add_depends_on now accepts both step.name or just step instance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform
Projects
None yet
Development

No branches or pull requests

4 participants