Skip to content

SageMaker pipeline parallelism_config doesn't work #4017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jrevuelta-chwy opened this issue Jul 21, 2023 · 1 comment · Fixed by #4066
Closed

SageMaker pipeline parallelism_config doesn't work #4017

jrevuelta-chwy opened this issue Jul 21, 2023 · 1 comment · Fixed by #4066
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug

Comments

@jrevuelta-chwy
Copy link

Describe the bug
I tried to set the parameter parallelism_config for a SageMaker pipeline but pipeline isn't honoring this config and still starting all available to run steps.

doc: https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-pipeline.html#build-and-manage-pipeline-execution

To reproduce
Run this script on SageMaker Studio:

import json

import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import TrainingStep


role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
instance_type = 'ml.m5.large'

pytorch_config1 = dict(
    entry_point='train.py',
    source_dir='training',
    instance_type=instance_type,
    instance_count=1,
    role=role,
    framework_version='2.0.0',
    py_version='py310',
    hyperparameters={'epochs': 2, 'batch_size': 64, 'learning_rate': 0.1},
    sagemaker_session=pipeline_session,
)
estimator1 = PyTorch(**pytorch_config1)
train_args = estimator1.fit()
step_train1 = TrainingStep(
    name="ExampleTrain1",
    step_args=train_args,
)

pytorch_config2 = dict(
    entry_point='train.py',
    source_dir='training',
    instance_type=instance_type,
    instance_count=1,
    role=role,
    framework_version='2.0.0',
    py_version='py310',
    hyperparameters={'epochs': 3, 'batch_size': 64, 'learning_rate': 0.2},
    sagemaker_session=pipeline_session,
)
estimator2 = PyTorch(**pytorch_config2)
train_args2 = estimator2.fit()
step_train2 = TrainingStep(
    name="ExampleTrain2",
    step_args=train_args2,
)


pipeline = Pipeline(
    name="MyPipeline",
    steps=[step_train1, step_train2],
    sagemaker_session=pipeline_session

)

role_arn = sagemaker.get_execution_role()
pipeline_config = dict(role_arn=role, parallelism_config=dict(MaxParallelExecutionSteps=1))
print(pipeline_config)

pipeline.upsert(**pipeline_config)

execution = pipeline.start()

print('execution describe:')
print(execution.describe())

execution.wait()

steps = execution.list_steps()
print(f'steps: {steps}')
print(f'len(steps): {len(steps)}')


definition = json.loads(pipeline.definition())
print(f'json definition: {definition}')

and then this script in training/train.py

import os
import json

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor


print(os.environ)
config = json.loads(os.environ.get('SM_HPS'))
print(config)
print(type(config))

epochs = config.get('epochs', 5)
batch_size = config.get('batch_size', 64)
lr = config.get('learning_rate', 0.1)
print(epochs)
print(batch_size)
print(lr)


# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)


def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


#epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

Expected behavior
I expected to only see one train step running concurrently.

Screenshots or logs
Attached screenshots.
Screenshot 2023-07-21 at 10 59 27 AM
Screenshot 2023-07-21 at 10 59 45 AM

System information
A description of your system. Please provide:

  • SageMaker Python SDK version:
    sagemaker 2.173.0
    boto3 1.28.7
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans):
    PyTorch
  • Framework version:
    2.0.0 on SageMaker
  • Python version:
    3.11 and 3.10
  • CPU or GPU:
    CPU
  • Custom Docker image (Y/N):
    N

Additional context
Add any other context about the problem here.

@thbrooks22 thbrooks22 added the component: pipelines Relates to the SageMaker Pipeline Platform label Jul 24, 2023
@qidewenwhen
Copy link
Member

qidewenwhen commented Aug 11, 2023

Hi @jrevuelta-chwy, thanks for reaching out. And thanks for providing us with such detailed info which helps us a lot on reproducing the issue and investigation.

After some investigation, it turns out the parallelism_config works as expected, but it's the pipeline.upsert to be blamed. It failed to pass the parallelism_config to the pipeline.update. I guess you've reused the common name "MyPipeline" (like I did) over multiple pipelines. Thus the first time you run the code snippet above, it's actually updating an existing pipeline without properly updating the parallelism_config

I've opened a PR to fix the issue.

To unblock yourself before the fix release, please try configuring the parallelism_config in pipeline.start:

execution = pipeline.start(parallelism_config=dict(MaxParallelExecutionSteps=1))

Or by simply change the pipeline name to create a new one each time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: pipelines Relates to the SageMaker Pipeline Platform type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants