-
Notifications
You must be signed in to change notification settings - Fork 1.2k
FrameworkProcessor is broken with SageMaker Pipelines #2656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for raising this @dgallitelli As discussed offline & detailed further on the linked PR, the integration between We're actively working on a solution, but some possible things I could suggest to try in the interim if you need:
|
UPDATE 1: Adding shebang does not currently force |
UPDATE 2: For those who need some sort of directions on how to change from ##### COMMENT THE TENSORFLOWPROCESSOR
# from sagemaker.tensorflow import TensorFlowProcessor
# tp = TensorFlowProcessor(
# framework_version='2.3',
# role = get_execution_role(),
# instance_count=1,
# instance_type='ml.m5.large',
# base_job_name='DSM-TF-Demo-Process',
# py_version='py37'
# )
##### AND REPLACE WITH
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role
image_uri = retrieve(
framework='tensorflow',
region='eu-west-1',
version='2.3',
py_version='py37',
image_scope='training',
instance_type='ml.m5.xlarge'
)
sp = ScriptProcessor(
role=get_execution_role(),
image_uri=image_uri,
command=['python3'],
instance_count=1,
instance_type='ml.m5.xlarge'
)
# Now, either run sp.run() or create a sagemaker.workflow.steps.ProcessingStep() , as needed A very short example of a
|
Any updates on this ? |
Is there an ETA on this fix? |
Is this issue fixed? |
This is still not fixed as of today (May 10th 2022). |
Any update on this issue? Facing the same problem. |
Still the case for now. However, there is now a possibility to use the new from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor # or change with any other FrameworkProcessor like HuggingFaceProcessor
from sagemaker.workflow.pipeline_context import PipelineSession
session = PipelineSession()
skpv2 = FrameworkProcessor(
estimator_cls=SKLearn,
framework_version='0.23-1',
role = get_execution_role(),
instance_count=1,
instance_type='ml.m5.large',
sagemaker_session = session
)
step_args = skpv2.run(
code='processing.py',
source_dir="code", # add processing.py and requirements.txt here
inputs=[...], outputs=[...]
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
processing_step = ProcessingStep(
name="MyProcessingStep",
step_args=step_args
)
# [ define the other steps if any ]
pipeline = Pipeline(steps=[...]) Just make sure to update the SageMaker Python SDK to the latest version :) |
Thanks @dgallitelli We would encourage users to adopt this new way to construct We have a |
The Here is a simplified code that helps to connect the dots between: from sagemaker.processing import (
ProcessingInput,
ProcessingOutput,
FrameworkProcessor
)
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
ProcessingStep
)
from sagemaker.tensorflow import TensorFlow
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
preprocessing_processor = FrameworkProcessor(
estimator_cls=TensorFlow,
framework_version='2.4.3',
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
py_version='py37',
command=["python3"],
base_job_name="some-preprocessing-step"
)
train_data_in_s3 = ProcessingOutput(
source="/opt/ml/processing/output/train/",
destination=Join(
on="/",
values=[
"s3:/",
data_s3_bucket,
os.environ["SAGEMAKER_PROJECT_NAME"],
data_s3_key,
'train/'
],
),
output_name='train',
s3_upload_mode='Continuous',
)
test_data_in_s3 = ProcessingOutput(
source="/opt/ml/processing/output/test/",
destination=Join(
on="/",
values=[
"s3:/",
data_s3_bucket,
os.environ["SAGEMAKER_PROJECT_NAME"],
data_s3_key,
'test/'
],
),
output_name='test',
s3_upload_mode='Continuous',
)
data_s3_key_in_project = Join(
on="/",
values=[
os.environ["SAGEMAKER_PROJECT_NAME"],
data_s3_key
],
)
preprocessing_run_args = preprocessing_processor.get_run_args(
code="preprocess.py",
source_dir=BASE_DIR,
inputs=[],
outputs=[train_data_in_s3, test_data_in_s3],
arguments=[
'--data-s3-bucket', "your bucket name",
'--data-s3-key', "your key"
]
)
preprocessing_step = ProcessingStep(
name="your-preprocessing-step-name",
processor=preprocessing_processor,
inputs=preprocessing_run_args.inputs,
outputs=preprocessing_run_args.outputs,
job_arguments=preprocessing_run_args.arguments,
code=preprocessing_run_args.code
)
pipeline_name = "your-pipeline-name"
distributed_ml_training_pipeline = Pipeline(
name=pipeline_name,
parameters=[
# your pipeline parameters here
],
steps=[preprocessing_step, ...]
) If you are using this inside a SageMaker Studio MLOps Project, make sure to declare your requirements.txt inside a MANIFEST.in file to be shipped with the library: https://packaging.python.org/en/latest/guides/using-manifest-in/. |
Is there any running example for a ProcessingStep with PyTorch that allows source_dir? |
Any update when this updated |
I tried the ModelStep example from docs here: https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step Assuming that the Also this example creates the following error for me
So it seems like this example does not work. I'm using version 2.94.0 of the Sagemaker SDK from a local PC (not Sagemaker notebook) to start the Pipeline. Any ideas how this is supposed to work? |
can you please confirm
|
Should MANIFEST.in be located in source_dir or somewhere else? |
It should be at the same level as the setup.py. |
@jerrypeng7773 The additional I guess the reason is the The other step types don't seem to add such a suffix so this looks inconsistent to me. Is there a specific reason to do this for |
Hi @morfaer, both
Compared with the
This can give users a clear hint that what each sub step is doing. In addition, we recently pushed this PR: #3240 to apply this naming convention of |
This method works... it creates a processor instance which the ProcessingStep accepts in the step_args parameter. The method proposed by mohamed-ali did not work for me... it creates a list of arguments for the job-arguments parameter. It may be worth noting here that each method works on a different parameter. When using the step_args parameter, you cannot use the processor argument... because the processor instance supplies the processor for the ProcessingStep. When using the job-arguments parameter, there were no conflicts with the processor parameter, but the job still failed citing the missing 'code' without any reference to the missing 'source_dir'... this means it may have solved the source directory issue, but there was an issue retrieving the .py file from it. From this experience, it appears that using the step-args method via the PipelineSession is a good idea. |
Is there any update on this issue, has it been fixed? Currently running into the same problem |
Yes, using a PipelineSession() will work... follow the example from dgallitelli posted 5/16. |
I am using PipelineSession(), but getting below error:
|
Describe the bug
Trying to use any Processor derived from FrameworkProcessor is bugged with SageMaker Pipelines. There is a problem with the
command
andentrypoint
parameter, wherecommand
does not passpython3
, causing the following error:To reproduce
Expected behavior
The pipeline should go through.
Screenshots or logs
Screenshot from Pipelines:

Logs from CloudWatch:
System information
A description of your system. Please provide:
Additional context
N/A
The text was updated successfully, but these errors were encountered: