Skip to content

Commit da9a2b5

Browse files
author
Egor Malykh
committed
implemented write_numpy_to_sparse_tensor
1 parent f298f54 commit da9a2b5

File tree

2 files changed

+242
-1
lines changed

2 files changed

+242
-1
lines changed

src/sagemaker/amazon/common.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import sys
1616

1717
import numpy as np
18+
from scipy.sparse import issparse
1819

1920
from sagemaker.amazon.record_pb2 import Record
2021

@@ -64,6 +65,24 @@ def _write_label_tensor(resolved_type, record, scalar):
6465
record.label["values"].float32_tensor.values.extend([scalar])
6566

6667

68+
def _write_keys_tensor(resolved_type, record, vector):
69+
if resolved_type == "Int32":
70+
record.features["values"].int32_tensor.keys.extend(vector)
71+
elif resolved_type == "Float64":
72+
record.features["values"].float64_tensor.keys.extend(vector)
73+
elif resolved_type == "Float32":
74+
record.features["values"].float32_tensor.keys.extend(vector)
75+
76+
77+
def _write_shape(resolved_type, record, scalar):
78+
if resolved_type == "Int32":
79+
record.features["values"].int32_tensor.shape.extend([scalar])
80+
elif resolved_type == "Float64":
81+
record.features["values"].float64_tensor.shape.extend([scalar])
82+
elif resolved_type == "Float32":
83+
record.features["values"].float32_tensor.shape.extend([scalar])
84+
85+
6786
def write_numpy_to_dense_tensor(file, array, labels=None):
6887
"""Writes a numpy array to a dense tensor"""
6988

@@ -89,6 +108,46 @@ def write_numpy_to_dense_tensor(file, array, labels=None):
89108
_write_recordio(file, record.SerializeToString())
90109

91110

111+
def write_numpy_to_sparse_tensor(file, array, labels=None):
112+
"""Writes a numpy array to a dense tensor"""
113+
114+
if not issparse(array):
115+
raise TypeError("Array must be sparse")
116+
117+
# Validate shape of array and labels, resolve array and label types
118+
if not len(array.shape) == 2:
119+
raise ValueError("Array must be a Matrix")
120+
if labels is not None:
121+
if not len(labels.shape) == 1:
122+
raise ValueError("Labels must be a Vector")
123+
if labels.shape[0] not in array.shape:
124+
raise ValueError("Label shape {} not compatible with array shape {}".format(
125+
labels.shape, array.shape))
126+
resolved_label_type = _resolve_type(labels.dtype)
127+
resolved_type = _resolve_type(array.dtype)
128+
129+
csr_array = array.tocsr()
130+
n_rows, n_cols = csr_array.shape
131+
132+
record = Record()
133+
for row_idx in range(n_rows):
134+
record.Clear()
135+
row = csr_array.getrow(row_idx)
136+
# Write values
137+
_write_feature_tensor(resolved_type, record, row.data)
138+
# Write keys
139+
_write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64))
140+
141+
# Write labels
142+
if labels is not None:
143+
_write_label_tensor(resolved_label_type, record, labels[row_idx])
144+
145+
# Write shape
146+
_write_shape(resolved_type, record, n_cols)
147+
148+
_write_recordio(file, record.SerializeToString())
149+
150+
92151
def read_records(file):
93152
"""Eagerly read a collection of amazon Record protobuf objects from file."""
94153
records = []

tests/unit/test_common.py

Lines changed: 183 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import numpy as np
1414
import tempfile
1515
import pytest
16+
import itertools
17+
from scipy.sparse import coo_matrix
1618
from sagemaker.amazon.common import (record_deserializer, write_numpy_to_dense_tensor, _read_recordio,
17-
numpy_to_record_serializer)
19+
numpy_to_record_serializer, write_numpy_to_sparse_tensor)
1820
from sagemaker.amazon.record_pb2 import Record
1921

2022

@@ -131,3 +133,183 @@ def test_invalid_label():
131133
with tempfile.TemporaryFile() as f:
132134
with pytest.raises(ValueError):
133135
write_numpy_to_dense_tensor(f, array, label_data)
136+
137+
138+
def test_dense_float_write_numpy_to_sparse_tensor():
139+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
140+
keys_data = [[0, 1, 2], [0, 1, 2]]
141+
array = coo_matrix(np.array(array_data))
142+
with tempfile.TemporaryFile() as f:
143+
write_numpy_to_sparse_tensor(f, array)
144+
f.seek(0)
145+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
146+
record = Record()
147+
record.ParseFromString(record_data)
148+
assert record.features["values"].float64_tensor.values == expected_data
149+
assert record.features["values"].float64_tensor.keys == expected_keys
150+
assert record.features["values"].float64_tensor.shape == [len(expected_data)]
151+
152+
153+
def test_dense_float32_write_numpy_to_sparse_tensor():
154+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
155+
keys_data = [[0, 1, 2], [0, 1, 2]]
156+
array = coo_matrix(np.array(array_data).astype(np.dtype('float32')))
157+
with tempfile.TemporaryFile() as f:
158+
write_numpy_to_sparse_tensor(f, array)
159+
f.seek(0)
160+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
161+
record = Record()
162+
record.ParseFromString(record_data)
163+
assert record.features["values"].float32_tensor.values == expected_data
164+
assert record.features["values"].float32_tensor.keys == expected_keys
165+
assert record.features["values"].float32_tensor.shape == [len(expected_data)]
166+
167+
168+
def test_dense_int_write_numpy_to_sparse_tensor():
169+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
170+
keys_data = [[0, 1, 2], [0, 1, 2]]
171+
array = coo_matrix(np.array(array_data).astype(np.dtype('int')))
172+
with tempfile.TemporaryFile() as f:
173+
write_numpy_to_sparse_tensor(f, array)
174+
f.seek(0)
175+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
176+
record = Record()
177+
record.ParseFromString(record_data)
178+
assert record.features["values"].int32_tensor.values == expected_data
179+
assert record.features["values"].int32_tensor.keys == expected_keys
180+
assert record.features["values"].int32_tensor.shape == [len(expected_data)]
181+
182+
183+
def test_dense_int_numpy_to_sparse_label():
184+
array_data = [[1, 2, 3], [10, 20, 3]]
185+
keys_data = [[0, 1, 2], [0, 1, 2]]
186+
array = coo_matrix(np.array(array_data))
187+
label_data = np.array([99, 98, 97])
188+
with tempfile.TemporaryFile() as f:
189+
write_numpy_to_sparse_tensor(f, array, label_data)
190+
f.seek(0)
191+
for record_data, expected_data, expected_keys, label in zip(_read_recordio(f), array_data, keys_data, label_data):
192+
record = Record()
193+
record.ParseFromString(record_data)
194+
assert record.features["values"].int32_tensor.values == expected_data
195+
assert record.features["values"].int32_tensor.keys == expected_keys
196+
assert record.label["values"].int32_tensor.values == [label]
197+
assert record.features["values"].int32_tensor.shape == [len(expected_data)]
198+
199+
200+
def test_dense_float32_numpy_to_sparse_label():
201+
array_data = [[1, 2, 3], [10, 20, 3]]
202+
keys_data = [[0, 1, 2], [0, 1, 2]]
203+
array = coo_matrix(np.array(array_data).astype('float32'))
204+
label_data = np.array([99, 98, 97])
205+
with tempfile.TemporaryFile() as f:
206+
write_numpy_to_sparse_tensor(f, array, label_data)
207+
f.seek(0)
208+
for record_data, expected_data, expected_keys, label in zip(_read_recordio(f), array_data, keys_data, label_data):
209+
record = Record()
210+
record.ParseFromString(record_data)
211+
assert record.features["values"].float32_tensor.values == expected_data
212+
assert record.features["values"].float32_tensor.keys == expected_keys
213+
assert record.label["values"].int32_tensor.values == [label]
214+
assert record.features["values"].float32_tensor.shape == [len(expected_data)]
215+
216+
217+
def test_dense_float64_numpy_to_sparse_label():
218+
array_data = [[1, 2, 3], [10, 20, 3]]
219+
keys_data = [[0, 1, 2], [0, 1, 2]]
220+
array = coo_matrix(np.array(array_data).astype('float64'))
221+
label_data = np.array([99, 98, 97])
222+
with tempfile.TemporaryFile() as f:
223+
write_numpy_to_sparse_tensor(f, array, label_data)
224+
f.seek(0)
225+
for record_data, expected_data, expected_keys, label in zip(_read_recordio(f), array_data, keys_data, label_data):
226+
record = Record()
227+
record.ParseFromString(record_data)
228+
assert record.features["values"].float64_tensor.values == expected_data
229+
assert record.features["values"].float64_tensor.keys == expected_keys
230+
assert record.label["values"].int32_tensor.values == [label]
231+
assert record.features["values"].float64_tensor.shape == [len(expected_data)]
232+
233+
234+
def test_invalid_sparse_label():
235+
array_data = [[1, 2, 3], [10, 20, 3]]
236+
array = coo_matrix(np.array(array_data))
237+
label_data = np.array([99, 98, 97, 1000]).astype(np.dtype('float64'))
238+
with tempfile.TemporaryFile() as f:
239+
with pytest.raises(ValueError):
240+
write_numpy_to_sparse_tensor(f, array, label_data)
241+
242+
243+
def test_sparse_float_write_numpy_to_sparse_tensor():
244+
n = 4
245+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
246+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
247+
248+
flatten_data = list(itertools.chain.from_iterable(array_data))
249+
y_indices = list(itertools.chain.from_iterable(keys_data))
250+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
251+
x_indices = list(itertools.chain.from_iterable(x_indices))
252+
253+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float64')
254+
with tempfile.TemporaryFile() as f:
255+
write_numpy_to_sparse_tensor(f, array)
256+
f.seek(0)
257+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
258+
record = Record()
259+
record.ParseFromString(record_data)
260+
assert record.features["values"].float64_tensor.values == expected_data
261+
assert record.features["values"].float64_tensor.keys == expected_keys
262+
assert record.features["values"].float64_tensor.shape == [n]
263+
264+
265+
def test_sparse_float32_write_numpy_to_sparse_tensor():
266+
n = 4
267+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
268+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
269+
270+
flatten_data = list(itertools.chain.from_iterable(array_data))
271+
y_indices = list(itertools.chain.from_iterable(keys_data))
272+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
273+
x_indices = list(itertools.chain.from_iterable(x_indices))
274+
275+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float32')
276+
with tempfile.TemporaryFile() as f:
277+
write_numpy_to_sparse_tensor(f, array)
278+
f.seek(0)
279+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
280+
record = Record()
281+
record.ParseFromString(record_data)
282+
assert record.features["values"].float32_tensor.values == expected_data
283+
assert record.features["values"].float32_tensor.keys == expected_keys
284+
assert record.features["values"].float32_tensor.shape == [n]
285+
286+
287+
def test_sparse_int_write_numpy_to_sparse_tensor():
288+
n = 4
289+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
290+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
291+
292+
flatten_data = list(itertools.chain.from_iterable(array_data))
293+
y_indices = list(itertools.chain.from_iterable(keys_data))
294+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
295+
x_indices = list(itertools.chain.from_iterable(x_indices))
296+
297+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='int')
298+
with tempfile.TemporaryFile() as f:
299+
write_numpy_to_sparse_tensor(f, array)
300+
f.seek(0)
301+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
302+
record = Record()
303+
record.ParseFromString(record_data)
304+
assert record.features["values"].int32_tensor.values == expected_data
305+
assert record.features["values"].int32_tensor.keys == expected_keys
306+
assert record.features["values"].int32_tensor.shape == [n]
307+
308+
309+
def test_dense_to_sparse_array():
310+
array_data = [[1, 2, 3], [10, 20, 3]]
311+
array = np.array(array_data)
312+
label_data = np.array([99, 98, 97]).astype(np.dtype('float64'))
313+
with tempfile.TemporaryFile() as f:
314+
with pytest.raises(TypeError):
315+
write_numpy_to_sparse_tensor(f, array, label_data)

0 commit comments

Comments
 (0)