Skip to content

S3 Estimator and Image Classification #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
88bd056
Merge pull request #1 from aws/master
Jan 17, 2018
ac7b854
image classification algorithm api
Jan 17, 2018
8b96f69
image classification api
Jan 17, 2018
aea77a1
sync
Jan 17, 2018
7167aec
sync
Jan 17, 2018
3d985e7
estimator is done. waiting on tests.
Jan 17, 2018
24353e2
formatting for flake
Jan 24, 2018
3d91eb7
merge
Jan 31, 2018
2de775a
conflicts
Jan 31, 2018
a919bce
ic-sdk push
Feb 6, 2018
5b9eec0
removed duplicate doc
Feb 6, 2018
7f1389a
Merge branch 'master' into master
Feb 6, 2018
ddd0e68
moving forward with the recent updates
Feb 15, 2018
39c6ba4
Merge branch 'master' of https://github.com/ragavvenkatesan/sagemaker…
Feb 15, 2018
c61c7ef
updating for sync
Feb 15, 2018
2825073
Update __init__.py
Feb 15, 2018
85564ef
style changes to code
Feb 15, 2018
b43e652
Merge branch 'master' of https://github.com/ragavvenkatesan/sagemaker…
Feb 15, 2018
c5ead9a
merge conflicts
Feb 15, 2018
13cf73b
Merge branch 'master' into master
Feb 15, 2018
8e305fa
unit tests fixed
Feb 15, 2018
db548c2
sync conflicts
Feb 15, 2018
9c9469f
flake errors fixed
Feb 15, 2018
8a4f3ea
integ tests environment fix
Feb 16, 2018
8557394
flake fails fixed
Feb 16, 2018
80e0283
Merge branch 'master' into master
iquintero Feb 21, 2018
066e8b7
Merge branch 'master' into master
Mar 1, 2018
5754cba
answered all the review suggestions
Mar 1, 2018
7c80d16
Merge branch 'master' of https://github.com/ragavvenkatesan/sagemaker…
Mar 1, 2018
e068276
Merge branch 'master' into master
iquintero Mar 2, 2018
baf9f6e
Merge branch 'master' into master
iquintero Mar 20, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ examples/tensorflow/distributed_mnist/data
doc/_build
**/.DS_Store
venv/
*~
*.rec
3 changes: 3 additions & 0 deletions src/sagemaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from sagemaker.amazon.pca import PCA, PCAModel, PCAPredictor
from sagemaker.amazon.lda import LDA, LDAModel, LDAPredictor
from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerModel, LinearLearnerPredictor
from sagemaker.amazon.image_classification import ImageClassification, ImageClassificationModel
from sagemaker.amazon.image_classification import ImageClassificationPredictor
from sagemaker.amazon.factorization_machines import FactorizationMachines, FactorizationMachinesModel
from sagemaker.amazon.factorization_machines import FactorizationMachinesPredictor
from sagemaker.amazon.ntm import NTM, NTMModel, NTMPredictor
Expand All @@ -34,5 +36,6 @@
LinearLearnerModel, LinearLearnerPredictor,
LDA, LDAModel, LDAPredictor,
FactorizationMachines, FactorizationMachinesModel, FactorizationMachinesPredictor,
ImageClassification, ImageClassificationModel, ImageClassificationPredictor,
Model, NTM, NTMModel, NTMPredictor, RealTimePredictor, Session,
container_def, s3_input, production_variant, get_execution_role]
101 changes: 99 additions & 2 deletions src/sagemaker/amazon/amazon_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def fit(self, records, mini_batch_size=None, **kwargs):
records (:class:`~RecordSet`): The records to train this ``Estimator`` on
mini_batch_size (int or None): The size of each mini-batch to use when training. If None, a
default value will be used.
distribution (s3 distribution type): S3 Distribution.
"""
self.feature_dim = records.feature_dim
self.mini_batch_size = mini_batch_size
Expand Down Expand Up @@ -152,9 +153,98 @@ def record_set(self, train, labels=None, channel="train"):
return RecordSet(manifest_s3_file, num_records=train.shape[0], feature_dim=train.shape[1], channel=channel)


class RecordSet(object):
class AmazonS3AlgorithmEstimatorBase(EstimatorBase):
"""Base class for Amazon first-party Estimator implementations. This class isn't
intended to be instantiated directly. This is difference from the base class
because this class handles S3 data"""

mini_batch_size = hp('mini_batch_size', (validation, validation.gt(0)))

def __init__(self, role, train_instance_count, train_instance_type, algorithm, **kwargs):
"""Initialize an AmazonAlgorithmEstimatorBase.

Args:
algortihm (str): Use one of the supported algorithms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the typo? I don't see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

algortihm

"""
super(AmazonS3AlgorithmEstimatorBase, self).__init__(role, train_instance_count, train_instance_type,
**kwargs)
self.algorithm = algorithm

def train_image(self):
return registry(self.sagemaker_session.boto_region_name, algorithm=self.algorithm) + "/" + type(self).repo

def hyperparameters(self):
return hp.serialize_all(self)

def fit(self, s3set, mini_batch_size=None, distribution='ShardedByS3Key', **kwargs):
"""Fit this Estimator on serialized Record objects, stored in S3.

def __init__(self, s3_data, num_records, feature_dim, s3_data_type='ManifestFile', channel='train'):
``records`` should be a list of instances of :class:`~RecordSet`. This defines a collection of
s3 data files to train this ``Estimator`` on.

More information on the Amazon Record format is available at:
https://docs.aws.amazon.com/sagemaker/latest/dg/cdf-training.html

See :meth:`~AmazonS3AlgorithmEstimatorBase.s3_record_set` to construct a ``RecordSet`` object
from :class:`~numpy.ndarray` arrays.

Args:
s3set (list): This is a list of :class:`~S3Set` items The list of records to train
this ``Estimator`` will depend on each algorithm and type of input data.
distribution (str): The s3 distribution of data.
mini_batch_size (int or None): The size of each mini-batch to use when training. If None, a
default value will be used.
"""
default_mini_batch_size = 32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why dont you make 32 the default value for mini_batch_size in the method signature?

def fit(self, s3set, mini_batch_size=32, distribution='ShardedByS3Key', **kwargs):

then you don't even have to do this whole thing. and you can just set it as
self.mini_batch_size = mini_batch_size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasosn why: 1. Its a protocol used in the other alogrithms. 2. We want to make this a must supply parameter for user. If I assume a default and it fails because of memory error, it becomes a customer error, which is wrong.

self.mini_batch_size = mini_batch_size or default_mini_batch_size
data = {}
for item in s3set:
data[item.channel] = s3_input(item.s3_location, distribution=item.distribution,
content_type=item.content_type,
s3_data_type=item.s3_data_type)
super(AmazonS3AlgorithmEstimatorBase, self).fit(data, **kwargs)

def s3_record_set(self, s3_loc, content_type, channel="train"):
"""Build a :class:`~RecordSet` from a S3 location with data in it.

Args:
s3_loc (str): A s3 bucket where data is located
channel (str): The SageMaker TrainingJob channel this RecordSet should be assigned to.
content_type (str): Content type of the data.
Returns:
RecordSet: A RecordSet referencing the encoded, uploading training and label data.
"""
return S3Set(s3_loc, content_type=content_type, channel=channel)


class S3Set(object):
def __init__(self, s3_location, content_type=None, s3_data_type='S3Prefix', distribution='FullyReplicated',
channel='train'):
"""A collection of Amazon :class:~`Record` objects serialized and stored in S3.

Args:
s3_location (str): The S3 location of the training data
distribution (str): The s3 distribution of data.
content_type (str): Mandatory content type of the data.
s3_data_type (str): Valid values: 'S3Prefix', 'ManifestFile'. If 'S3Prefix', ``s3_data`` defines
a prefix of s3 objects to train on. All objects with s3 keys beginning with ``s3_data`` will
be used to train. If 'ManifestFile', then ``s3_data`` defines a single s3 manifest file, listing
each s3 object to train on.
channel (str): The SageMaker Training Job channel this RecordSet should be bound to
"""
self.s3_location = s3_location
self.distribution = distribution
self.s3_data_type = s3_data_type
self.channel = channel
self.content_type = content_type

def __repr__(self):
"""Return an unambiguous representation of this S3Set"""
return str((S3Set, self.__dict__))


class RecordSet(object):
def __init__(self, s3_data, num_records=None, feature_dim=None, s3_data_type='ManifestFile', channel='train'):
"""A collection of Amazon :class:~`Record` objects serialized and stored in S3.

Args:
Expand Down Expand Up @@ -235,6 +325,13 @@ def registry(region_name, algorithm=None):
"us-west-2": "174872318107",
"eu-west-1": "438346466558"
}[region_name]
elif algorithm in ['image_classification']:
account_id = {
"us-east-1": "811284229777",
"us-east-2": "825641698319",
"us-west-2": "433757028032",
"eu-west-1": "685385470294"
}[region_name]
elif algorithm in ["lda"]:
account_id = {
"us-east-1": "766337827248",
Expand Down
29 changes: 23 additions & 6 deletions src/sagemaker/amazon/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

import numpy as np
from scipy.sparse import issparse

import json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please maintain the import order:

1.- python built in libraries
2.- 3rd party imports
3.- local library imports (from sagemaker...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this still needs to be fixed. import json should go before the numpy import.

Also, please maintain the alphabetical order of the imports when you change it.

import io
import json
import struct
....

from sagemaker.amazon.record_pb2 import Record


class numpy_to_record_serializer(object):

def __init__(self, content_type='application/x-recordio-protobuf'):
self.content_type = content_type

Expand All @@ -35,8 +34,18 @@ def __call__(self, array):
return buf


class record_deserializer(object):
class file_to_image_serializer(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep one naming convention. FileToImageSerializer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using this because the other methods are also in this convention.. Refer numpy_to_recod_serializer. ..

def __init__(self, content_type='application/x-image'):
self.content_type = content_type

def __call__(self, file):
with open(file, 'rb') as f:
payload = f.read()
payload = bytearray(payload)
return payload


class record_deserializer(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

RecordDeserializer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I am maintaining this because of the other methods... refer
response_deseiralizer.

def __init__(self, accept='application/x-recordio-protobuf'):
self.accept = accept

Expand All @@ -47,6 +56,14 @@ def __call__(self, stream, content_type):
stream.close()


class response_deserializer(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResponseDeserializer

def __init__(self, accept='application/json'):
self.accept = accept

def __call__(self, stream, content_type=None):
return json.loads(stream)


def _write_feature_tensor(resolved_type, record, vector):
if resolved_type == "Int32":
record.features["values"].int32_tensor.values.extend(vector)
Expand Down Expand Up @@ -94,7 +111,7 @@ def write_numpy_to_dense_tensor(file, array, labels=None):
raise ValueError("Labels must be a Vector")
if labels.shape[0] not in array.shape:
raise ValueError("Label shape {} not compatible with array shape {}".format(
labels.shape, array.shape))
labels.shape, array.shape))
resolved_label_type = _resolve_type(labels.dtype)
resolved_type = _resolve_type(array.dtype)

Expand Down Expand Up @@ -122,7 +139,7 @@ def write_spmatrix_to_sparse_tensor(file, array, labels=None):
raise ValueError("Labels must be a Vector")
if labels.shape[0] not in array.shape:
raise ValueError("Label shape {} not compatible with array shape {}".format(
labels.shape, array.shape))
labels.shape, array.shape))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align this as it was before. Also applies to the one above.

resolved_label_type = _resolve_type(labels.dtype)
resolved_type = _resolve_type(array.dtype)

Expand Down Expand Up @@ -182,7 +199,7 @@ def _write_recordio(f, data):


def _read_recordio(f):
while(True):
while (True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this space. (it was fine before).

try:
read_kmagic, = struct.unpack('I', f.read(4))
except struct.error:
Expand Down
Loading