Skip to content

Implemented write_spmatrix_to_sparse_tensor #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ examples/tensorflow/distributed_mnist/data
*.iml
doc/_build
**/.DS_Store
venv/
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def read(fname):
],

# Declare minimal set for installation
install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1'],
install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1', 'scipy>=1.0.0'],

extras_require={
'test': ['tox', 'flake8', 'pytest', 'pytest-cov', 'pytest-xdist',
Expand Down
59 changes: 59 additions & 0 deletions src/sagemaker/amazon/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import sys

import numpy as np
from scipy.sparse import issparse

from sagemaker.amazon.record_pb2 import Record

Expand Down Expand Up @@ -64,6 +65,24 @@ def _write_label_tensor(resolved_type, record, scalar):
record.label["values"].float32_tensor.values.extend([scalar])


def _write_keys_tensor(resolved_type, record, vector):
if resolved_type == "Int32":
record.features["values"].int32_tensor.keys.extend(vector)
elif resolved_type == "Float64":
record.features["values"].float64_tensor.keys.extend(vector)
elif resolved_type == "Float32":
record.features["values"].float32_tensor.keys.extend(vector)


def _write_shape(resolved_type, record, scalar):
if resolved_type == "Int32":
record.features["values"].int32_tensor.shape.extend([scalar])
elif resolved_type == "Float64":
record.features["values"].float64_tensor.shape.extend([scalar])
elif resolved_type == "Float32":
record.features["values"].float32_tensor.shape.extend([scalar])


def write_numpy_to_dense_tensor(file, array, labels=None):
"""Writes a numpy array to a dense tensor"""

Expand All @@ -89,6 +108,46 @@ def write_numpy_to_dense_tensor(file, array, labels=None):
_write_recordio(file, record.SerializeToString())


def write_spmatrix_to_sparse_tensor(file, array, labels=None):
"""Writes a scipy sparse matrix to a sparse tensor"""

if not issparse(array):
raise TypeError("Array must be sparse")

# Validate shape of array and labels, resolve array and label types
if not len(array.shape) == 2:
raise ValueError("Array must be a Matrix")
if labels is not None:
if not len(labels.shape) == 1:
raise ValueError("Labels must be a Vector")
if labels.shape[0] not in array.shape:
raise ValueError("Label shape {} not compatible with array shape {}".format(
labels.shape, array.shape))
resolved_label_type = _resolve_type(labels.dtype)
resolved_type = _resolve_type(array.dtype)

csr_array = array.tocsr()
n_rows, n_cols = csr_array.shape

record = Record()
for row_idx in range(n_rows):
record.Clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this isn't done at the end of the for-loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row = csr_array.getrow(row_idx)
# Write values
_write_feature_tensor(resolved_type, record, row.data)
# Write keys
_write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64))

# Write labels
if labels is not None:
_write_label_tensor(resolved_label_type, record, labels[row_idx])

# Write shape
_write_shape(resolved_type, record, n_cols)

_write_recordio(file, record.SerializeToString())


def read_records(file):
"""Eagerly read a collection of amazon Record protobuf objects from file."""
records = []
Expand Down
199 changes: 198 additions & 1 deletion tests/unit/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import numpy as np
import tempfile
import pytest
import itertools
from scipy.sparse import coo_matrix
from sagemaker.amazon.common import (record_deserializer, write_numpy_to_dense_tensor, _read_recordio,
numpy_to_record_serializer)
numpy_to_record_serializer, write_spmatrix_to_sparse_tensor)
from sagemaker.amazon.record_pb2 import Record


Expand Down Expand Up @@ -131,3 +133,198 @@ def test_invalid_label():
with tempfile.TemporaryFile() as f:
with pytest.raises(ValueError):
write_numpy_to_dense_tensor(f, array, label_data)


def test_dense_float_write_spmatrix_to_sparse_tensor():
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data))
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float64_tensor.values == expected_data
assert record.features["values"].float64_tensor.keys == expected_keys
assert record.features["values"].float64_tensor.shape == [len(expected_data)]


def test_dense_float32_write_spmatrix_to_sparse_tensor():
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data).astype(np.dtype('float32')))
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float32_tensor.values == expected_data
assert record.features["values"].float32_tensor.keys == expected_keys
assert record.features["values"].float32_tensor.shape == [len(expected_data)]


def test_dense_int_write_spmatrix_to_sparse_tensor():
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data).astype(np.dtype('int')))
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].int32_tensor.values == expected_data
assert record.features["values"].int32_tensor.keys == expected_keys
assert record.features["values"].int32_tensor.shape == [len(expected_data)]


def test_dense_int_spmatrix_to_sparse_label():
array_data = [[1, 2, 3], [10, 20, 3]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data))
label_data = np.array([99, 98, 97])
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array, label_data)
f.seek(0)
for record_data, expected_data, expected_keys, label in zip(
_read_recordio(f),
array_data,
keys_data,
label_data
):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].int32_tensor.values == expected_data
assert record.features["values"].int32_tensor.keys == expected_keys
assert record.label["values"].int32_tensor.values == [label]
assert record.features["values"].int32_tensor.shape == [len(expected_data)]


def test_dense_float32_spmatrix_to_sparse_label():
array_data = [[1, 2, 3], [10, 20, 3]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data).astype('float32'))
label_data = np.array([99, 98, 97])
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array, label_data)
f.seek(0)
for record_data, expected_data, expected_keys, label in zip(
_read_recordio(f),
array_data,
keys_data,
label_data
):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float32_tensor.values == expected_data
assert record.features["values"].float32_tensor.keys == expected_keys
assert record.label["values"].int32_tensor.values == [label]
assert record.features["values"].float32_tensor.shape == [len(expected_data)]


def test_dense_float64_spmatrix_to_sparse_label():
array_data = [[1, 2, 3], [10, 20, 3]]
keys_data = [[0, 1, 2], [0, 1, 2]]
array = coo_matrix(np.array(array_data).astype('float64'))
label_data = np.array([99, 98, 97])
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array, label_data)
f.seek(0)
for record_data, expected_data, expected_keys, label in zip(
_read_recordio(f),
array_data,
keys_data,
label_data
):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float64_tensor.values == expected_data
assert record.features["values"].float64_tensor.keys == expected_keys
assert record.label["values"].int32_tensor.values == [label]
assert record.features["values"].float64_tensor.shape == [len(expected_data)]


def test_invalid_sparse_label():
array_data = [[1, 2, 3], [10, 20, 3]]
array = coo_matrix(np.array(array_data))
label_data = np.array([99, 98, 97, 1000]).astype(np.dtype('float64'))
with tempfile.TemporaryFile() as f:
with pytest.raises(ValueError):
write_spmatrix_to_sparse_tensor(f, array, label_data)


def test_sparse_float_write_spmatrix_to_sparse_tensor():
n = 4
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]

flatten_data = list(itertools.chain.from_iterable(array_data))
y_indices = list(itertools.chain.from_iterable(keys_data))
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
x_indices = list(itertools.chain.from_iterable(x_indices))

array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float64')
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float64_tensor.values == expected_data
assert record.features["values"].float64_tensor.keys == expected_keys
assert record.features["values"].float64_tensor.shape == [n]


def test_sparse_float32_write_spmatrix_to_sparse_tensor():
n = 4
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]

flatten_data = list(itertools.chain.from_iterable(array_data))
y_indices = list(itertools.chain.from_iterable(keys_data))
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
x_indices = list(itertools.chain.from_iterable(x_indices))

array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float32')
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].float32_tensor.values == expected_data
assert record.features["values"].float32_tensor.keys == expected_keys
assert record.features["values"].float32_tensor.shape == [n]


def test_sparse_int_write_spmatrix_to_sparse_tensor():
n = 4
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]

flatten_data = list(itertools.chain.from_iterable(array_data))
y_indices = list(itertools.chain.from_iterable(keys_data))
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
x_indices = list(itertools.chain.from_iterable(x_indices))

array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='int')
with tempfile.TemporaryFile() as f:
write_spmatrix_to_sparse_tensor(f, array)
f.seek(0)
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
record = Record()
record.ParseFromString(record_data)
assert record.features["values"].int32_tensor.values == expected_data
assert record.features["values"].int32_tensor.keys == expected_keys
assert record.features["values"].int32_tensor.shape == [n]


def test_dense_to_sparse():
array_data = [[1, 2, 3], [10, 20, 3]]
array = np.array(array_data)
label_data = np.array([99, 98, 97]).astype(np.dtype('float64'))
with tempfile.TemporaryFile() as f:
with pytest.raises(TypeError):
write_spmatrix_to_sparse_tensor(f, array, label_data)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ exclude =
*pb2.py
.tox
tests/data/
venv/
max-complexity = 10

[testenv]
Expand Down