Skip to content

Add backward compatbility for RecordSerializer and RecordDeserializer #5052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 10 additions & 207 deletions src/sagemaker/amazon/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,210 +13,13 @@
"""Placeholder docstring"""
from __future__ import absolute_import

import logging
import struct
import sys

import numpy as np

from sagemaker.amazon.record_pb2 import Record
from sagemaker.utils import DeferredError


def _write_feature_tensor(resolved_type, record, vector):
"""Placeholder Docstring"""
if resolved_type == "Int32":
record.features["values"].int32_tensor.values.extend(vector)
elif resolved_type == "Float64":
record.features["values"].float64_tensor.values.extend(vector)
elif resolved_type == "Float32":
record.features["values"].float32_tensor.values.extend(vector)


def _write_label_tensor(resolved_type, record, scalar):
"""Placeholder Docstring"""
if resolved_type == "Int32":
record.label["values"].int32_tensor.values.extend([scalar])
elif resolved_type == "Float64":
record.label["values"].float64_tensor.values.extend([scalar])
elif resolved_type == "Float32":
record.label["values"].float32_tensor.values.extend([scalar])


def _write_keys_tensor(resolved_type, record, vector):
"""Placeholder Docstring"""
if resolved_type == "Int32":
record.features["values"].int32_tensor.keys.extend(vector)
elif resolved_type == "Float64":
record.features["values"].float64_tensor.keys.extend(vector)
elif resolved_type == "Float32":
record.features["values"].float32_tensor.keys.extend(vector)


def _write_shape(resolved_type, record, scalar):
"""Placeholder Docstring"""
if resolved_type == "Int32":
record.features["values"].int32_tensor.shape.extend([scalar])
elif resolved_type == "Float64":
record.features["values"].float64_tensor.shape.extend([scalar])
elif resolved_type == "Float32":
record.features["values"].float32_tensor.shape.extend([scalar])


def write_numpy_to_dense_tensor(file, array, labels=None):
"""Writes a numpy array to a dense tensor

Args:
file:
array:
labels:
"""

# Validate shape of array and labels, resolve array and label types
if not len(array.shape) == 2:
raise ValueError("Array must be a Matrix")
if labels is not None:
if not len(labels.shape) == 1:
raise ValueError("Labels must be a Vector")
if labels.shape[0] not in array.shape:
raise ValueError(
"Label shape {} not compatible with array shape {}".format(
labels.shape, array.shape
)
)
resolved_label_type = _resolve_type(labels.dtype)
resolved_type = _resolve_type(array.dtype)

# Write each vector in array into a Record in the file object
record = Record()
for index, vector in enumerate(array):
record.Clear()
_write_feature_tensor(resolved_type, record, vector)
if labels is not None:
_write_label_tensor(resolved_label_type, record, labels[index])
_write_recordio(file, record.SerializeToString())


def write_spmatrix_to_sparse_tensor(file, array, labels=None):
"""Writes a scipy sparse matrix to a sparse tensor

Args:
file:
array:
labels:
"""
try:
import scipy
except ImportError as e:
logging.warning(
"scipy failed to import. Sparse matrix functions will be impaired or broken."
)
# Any subsequent attempt to use scipy will raise the ImportError
scipy = DeferredError(e)

if not scipy.sparse.issparse(array):
raise TypeError("Array must be sparse")

# Validate shape of array and labels, resolve array and label types
if not len(array.shape) == 2:
raise ValueError("Array must be a Matrix")
if labels is not None:
if not len(labels.shape) == 1:
raise ValueError("Labels must be a Vector")
if labels.shape[0] not in array.shape:
raise ValueError(
"Label shape {} not compatible with array shape {}".format(
labels.shape, array.shape
)
)
resolved_label_type = _resolve_type(labels.dtype)
resolved_type = _resolve_type(array.dtype)

csr_array = array.tocsr()
n_rows, n_cols = csr_array.shape

record = Record()
for row_idx in range(n_rows):
record.Clear()
row = csr_array.getrow(row_idx)
# Write values
_write_feature_tensor(resolved_type, record, row.data)
# Write keys
_write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64))

# Write labels
if labels is not None:
_write_label_tensor(resolved_label_type, record, labels[row_idx])

# Write shape
_write_shape(resolved_type, record, n_cols)

_write_recordio(file, record.SerializeToString())


def read_records(file):
"""Eagerly read a collection of amazon Record protobuf objects from file.

Args:
file:
"""
records = []
for record_data in read_recordio(file):
record = Record()
record.ParseFromString(record_data)
records.append(record)
return records


# MXNet requires recordio records have length in bytes that's a multiple of 4
# This sets up padding bytes to append to the end of the record, for diferent
# amounts of padding required.
padding = {}
for amount in range(4):
if sys.version_info >= (3,):
padding[amount] = bytes([0x00 for _ in range(amount)])
else:
padding[amount] = bytearray([0x00 for _ in range(amount)])

_kmagic = 0xCED7230A


def _write_recordio(f, data):
"""Writes a single data point as a RecordIO record to the given file.

Args:
f:
data:
"""
length = len(data)
f.write(struct.pack("I", _kmagic))
f.write(struct.pack("I", length))
pad = (((length + 3) >> 2) << 2) - length
f.write(data)
f.write(padding[pad])


def read_recordio(f):
"""Placeholder Docstring"""
while True:
try:
(read_kmagic,) = struct.unpack("I", f.read(4))
except struct.error:
return
assert read_kmagic == _kmagic
(len_record,) = struct.unpack("I", f.read(4))
pad = (((len_record + 3) >> 2) << 2) - len_record
yield f.read(len_record)
if pad:
f.read(pad)


def _resolve_type(dtype):
"""Placeholder Docstring"""
if dtype == np.dtype(int):
return "Int32"
if dtype == np.dtype(float):
return "Float64"
if dtype == np.dtype("float32"):
return "Float32"
raise ValueError("Unsupported dtype {} on array".format(dtype))
# these imports ensure backward compatibility.
from sagemaker.deserializers import RecordDeserializer # noqa: F401 # pylint: disable=W0611
from sagemaker.serializers import RecordSerializer # noqa: F401 # pylint: disable=W0611
from sagemaker.serializer_utils import ( # noqa: F401 # pylint: disable=W0611
read_recordio,
read_records,
write_numpy_to_dense_tensor,
write_spmatrix_to_sparse_tensor,
_write_recordio,
)
2 changes: 1 addition & 1 deletion src/sagemaker/base_deserializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import numpy as np
from six import with_metaclass

from sagemaker.amazon.common import read_records
from sagemaker.serializer_utils import read_records
from sagemaker.utils import DeferredError

try:
Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker/base_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pandas import DataFrame
from six import with_metaclass

from sagemaker.amazon.common import write_numpy_to_dense_tensor
from sagemaker.serializer_utils import write_numpy_to_dense_tensor
from sagemaker.utils import DeferredError

try:
Expand Down
Loading
Loading