diff --git a/.gitignore b/.gitignore index 67ef7eea53..3ee5780429 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ examples/tensorflow/distributed_mnist/data *.iml doc/_build **/.DS_Store +venv/ diff --git a/setup.py b/setup.py index b2985ad8f7..f29cab11f3 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def read(fname): ], # Declare minimal set for installation - install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1'], + install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1', 'scipy>=1.0.0'], extras_require={ 'test': ['tox', 'flake8', 'pytest', 'pytest-cov', 'pytest-xdist', diff --git a/src/sagemaker/amazon/common.py b/src/sagemaker/amazon/common.py index fcca0632ce..6b5dc0c68a 100644 --- a/src/sagemaker/amazon/common.py +++ b/src/sagemaker/amazon/common.py @@ -15,6 +15,7 @@ import sys import numpy as np +from scipy.sparse import issparse from sagemaker.amazon.record_pb2 import Record @@ -64,6 +65,24 @@ def _write_label_tensor(resolved_type, record, scalar): record.label["values"].float32_tensor.values.extend([scalar]) +def _write_keys_tensor(resolved_type, record, vector): + if resolved_type == "Int32": + record.features["values"].int32_tensor.keys.extend(vector) + elif resolved_type == "Float64": + record.features["values"].float64_tensor.keys.extend(vector) + elif resolved_type == "Float32": + record.features["values"].float32_tensor.keys.extend(vector) + + +def _write_shape(resolved_type, record, scalar): + if resolved_type == "Int32": + record.features["values"].int32_tensor.shape.extend([scalar]) + elif resolved_type == "Float64": + record.features["values"].float64_tensor.shape.extend([scalar]) + elif resolved_type == "Float32": + record.features["values"].float32_tensor.shape.extend([scalar]) + + def write_numpy_to_dense_tensor(file, array, labels=None): """Writes a numpy array to a dense tensor""" @@ -89,6 +108,46 @@ def write_numpy_to_dense_tensor(file, array, labels=None): _write_recordio(file, record.SerializeToString()) +def write_spmatrix_to_sparse_tensor(file, array, labels=None): + """Writes a scipy sparse matrix to a sparse tensor""" + + if not issparse(array): + raise TypeError("Array must be sparse") + + # Validate shape of array and labels, resolve array and label types + if not len(array.shape) == 2: + raise ValueError("Array must be a Matrix") + if labels is not None: + if not len(labels.shape) == 1: + raise ValueError("Labels must be a Vector") + if labels.shape[0] not in array.shape: + raise ValueError("Label shape {} not compatible with array shape {}".format( + labels.shape, array.shape)) + resolved_label_type = _resolve_type(labels.dtype) + resolved_type = _resolve_type(array.dtype) + + csr_array = array.tocsr() + n_rows, n_cols = csr_array.shape + + record = Record() + for row_idx in range(n_rows): + record.Clear() + row = csr_array.getrow(row_idx) + # Write values + _write_feature_tensor(resolved_type, record, row.data) + # Write keys + _write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64)) + + # Write labels + if labels is not None: + _write_label_tensor(resolved_label_type, record, labels[row_idx]) + + # Write shape + _write_shape(resolved_type, record, n_cols) + + _write_recordio(file, record.SerializeToString()) + + def read_records(file): """Eagerly read a collection of amazon Record protobuf objects from file.""" records = [] diff --git a/tests/unit/test_common.py b/tests/unit/test_common.py index e0275b6d00..d4d2b1b4d5 100644 --- a/tests/unit/test_common.py +++ b/tests/unit/test_common.py @@ -13,8 +13,10 @@ import numpy as np import tempfile import pytest +import itertools +from scipy.sparse import coo_matrix from sagemaker.amazon.common import (record_deserializer, write_numpy_to_dense_tensor, _read_recordio, - numpy_to_record_serializer) + numpy_to_record_serializer, write_spmatrix_to_sparse_tensor) from sagemaker.amazon.record_pb2 import Record @@ -131,3 +133,198 @@ def test_invalid_label(): with tempfile.TemporaryFile() as f: with pytest.raises(ValueError): write_numpy_to_dense_tensor(f, array, label_data) + + +def test_dense_float_write_spmatrix_to_sparse_tensor(): + array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data)) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float64_tensor.values == expected_data + assert record.features["values"].float64_tensor.keys == expected_keys + assert record.features["values"].float64_tensor.shape == [len(expected_data)] + + +def test_dense_float32_write_spmatrix_to_sparse_tensor(): + array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data).astype(np.dtype('float32'))) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float32_tensor.values == expected_data + assert record.features["values"].float32_tensor.keys == expected_keys + assert record.features["values"].float32_tensor.shape == [len(expected_data)] + + +def test_dense_int_write_spmatrix_to_sparse_tensor(): + array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data).astype(np.dtype('int'))) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].int32_tensor.values == expected_data + assert record.features["values"].int32_tensor.keys == expected_keys + assert record.features["values"].int32_tensor.shape == [len(expected_data)] + + +def test_dense_int_spmatrix_to_sparse_label(): + array_data = [[1, 2, 3], [10, 20, 3]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data)) + label_data = np.array([99, 98, 97]) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array, label_data) + f.seek(0) + for record_data, expected_data, expected_keys, label in zip( + _read_recordio(f), + array_data, + keys_data, + label_data + ): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].int32_tensor.values == expected_data + assert record.features["values"].int32_tensor.keys == expected_keys + assert record.label["values"].int32_tensor.values == [label] + assert record.features["values"].int32_tensor.shape == [len(expected_data)] + + +def test_dense_float32_spmatrix_to_sparse_label(): + array_data = [[1, 2, 3], [10, 20, 3]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data).astype('float32')) + label_data = np.array([99, 98, 97]) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array, label_data) + f.seek(0) + for record_data, expected_data, expected_keys, label in zip( + _read_recordio(f), + array_data, + keys_data, + label_data + ): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float32_tensor.values == expected_data + assert record.features["values"].float32_tensor.keys == expected_keys + assert record.label["values"].int32_tensor.values == [label] + assert record.features["values"].float32_tensor.shape == [len(expected_data)] + + +def test_dense_float64_spmatrix_to_sparse_label(): + array_data = [[1, 2, 3], [10, 20, 3]] + keys_data = [[0, 1, 2], [0, 1, 2]] + array = coo_matrix(np.array(array_data).astype('float64')) + label_data = np.array([99, 98, 97]) + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array, label_data) + f.seek(0) + for record_data, expected_data, expected_keys, label in zip( + _read_recordio(f), + array_data, + keys_data, + label_data + ): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float64_tensor.values == expected_data + assert record.features["values"].float64_tensor.keys == expected_keys + assert record.label["values"].int32_tensor.values == [label] + assert record.features["values"].float64_tensor.shape == [len(expected_data)] + + +def test_invalid_sparse_label(): + array_data = [[1, 2, 3], [10, 20, 3]] + array = coo_matrix(np.array(array_data)) + label_data = np.array([99, 98, 97, 1000]).astype(np.dtype('float64')) + with tempfile.TemporaryFile() as f: + with pytest.raises(ValueError): + write_spmatrix_to_sparse_tensor(f, array, label_data) + + +def test_sparse_float_write_spmatrix_to_sparse_tensor(): + n = 4 + array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]] + keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]] + + flatten_data = list(itertools.chain.from_iterable(array_data)) + y_indices = list(itertools.chain.from_iterable(keys_data)) + x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))] + x_indices = list(itertools.chain.from_iterable(x_indices)) + + array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float64') + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float64_tensor.values == expected_data + assert record.features["values"].float64_tensor.keys == expected_keys + assert record.features["values"].float64_tensor.shape == [n] + + +def test_sparse_float32_write_spmatrix_to_sparse_tensor(): + n = 4 + array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]] + keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]] + + flatten_data = list(itertools.chain.from_iterable(array_data)) + y_indices = list(itertools.chain.from_iterable(keys_data)) + x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))] + x_indices = list(itertools.chain.from_iterable(x_indices)) + + array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float32') + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].float32_tensor.values == expected_data + assert record.features["values"].float32_tensor.keys == expected_keys + assert record.features["values"].float32_tensor.shape == [n] + + +def test_sparse_int_write_spmatrix_to_sparse_tensor(): + n = 4 + array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]] + keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]] + + flatten_data = list(itertools.chain.from_iterable(array_data)) + y_indices = list(itertools.chain.from_iterable(keys_data)) + x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))] + x_indices = list(itertools.chain.from_iterable(x_indices)) + + array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='int') + with tempfile.TemporaryFile() as f: + write_spmatrix_to_sparse_tensor(f, array) + f.seek(0) + for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data): + record = Record() + record.ParseFromString(record_data) + assert record.features["values"].int32_tensor.values == expected_data + assert record.features["values"].int32_tensor.keys == expected_keys + assert record.features["values"].int32_tensor.shape == [n] + + +def test_dense_to_sparse(): + array_data = [[1, 2, 3], [10, 20, 3]] + array = np.array(array_data) + label_data = np.array([99, 98, 97]).astype(np.dtype('float64')) + with tempfile.TemporaryFile() as f: + with pytest.raises(TypeError): + write_spmatrix_to_sparse_tensor(f, array, label_data) diff --git a/tox.ini b/tox.ini index 2abc130047..a42fe09296 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,7 @@ exclude = *pb2.py .tox tests/data/ + venv/ max-complexity = 10 [testenv]