Skip to content
This repository was archived by the owner on May 23, 2024. It is now read-only.

How can I forwards effectively numpy arrays to a SageMaker endpoint with inference? #118

Closed
bessszilard opened this issue Feb 14, 2020 · 17 comments

Comments

@bessszilard
Copy link

I deployed an object detection algorithm on a SageMaker endpoint, with this structure:

image

I tested the model locally, so I created a session, I loaded the model, and I predicted with session.run(). On very similar hardware, the local model predicts was 10x faster than the on deployed one. I know that a lot of factors that can increase the remote prediction time. To get clear sight, I added the time log to my inference.py function. The results are:

Selection_256

The np2json code is:

   start = timeit.default_timer()
   image_json = json.dumps ({
            'inputs': image.tolist()
        })
   stop = timeit.default_timer()
   print("np2json: {:.4f} sec".format(stop - start))

The RGB image size is 633 x 1333 x 3, and during the preprocessing, I convert it to float16.

Based on the measurements, I realized the np2json function increased the latency significantly, which was not needed at local predictions. Is there a more efficient solution to send a numpy array to the deployed model?

Thanks for the help!

@laurenyu
Copy link
Contributor

just to clarify, which version of TFS are you using?

you could write your own input_handler to handle the content type "application/x-npy" rather than using the default one, which expects JSON (pre-/post-processing docs).

For the predictor, you'll want to do something like:

from sagemaker.predictor import npy_serializer
from sagemaker.tensorflow import serving

model = serving.Model(...)  # or estimator = TensorFlow(...)
predictor = model.deploy(...)  # or estimator.deploy

predictor.content_type = 'application/x-npy'
predictor.serializer = npy_serializer

predictor.predict(...)

There is also sagemaker.predictor.numpy_deserializer if you want to implement your own output_handler to use numpy instead of JSON

@bessszilard
Copy link
Author

bessszilard commented Feb 17, 2020

TFS version

TensorFlow ModelServer: 1.15.0-rc2+dev.sha.ac2bc98
TensorFlow Library: 1.15.0

I use already a custom input handler, and output handler as the figure shows:
image

If I set the content_type to x-npy, as I understand, it will change the content between the client and the endpoint (green arrow). My problem is at the red arrow inside the endpoint.

I will not use the predictor object directly, because I will communicate with the endpoint with a POST message, via API Gateway.

@laurenyu
Copy link
Contributor

if you are set on using the REST API, I unfortunately don't believe there is a way around using JSON:

The request and response is a JSON object.

https://www.tensorflow.org/tfx/serving/api_rest

@bessszilard
Copy link
Author

Can I use other methods as well then REST API for SageMaker endpoints?

@nadiaya
Copy link
Contributor

nadiaya commented Apr 1, 2020

We expose gRPC port, so you should be able to use it instead of REST API in your inference.py input_handler.

@nadiaya
Copy link
Contributor

nadiaya commented Apr 1, 2020

Also since you are using images, as an alternative solution you could change your TF model input to accept image data. There's an example notebook that shows how to do this. It's written for Batch transform jobs, but the model preparation and inference script would be the same for an Endpoint:

def input_handler(data, context):
    """ Pre-process request input before it is sent to TensorFlow Serving REST API
    Args:
        data (obj): the request data stream
        context (Context): an object containing request and configuration details
    Returns:
        (dict): a JSON-serializable dict that contains request body and headers
    """

    if context.request_content_type == 'application/x-image':
        payload = data.read()
        encoded_image = base64.b64encode(payload).decode('utf-8')
        instance = [{"b64": encoded_image}]
        return json.dumps({"instances": instance})
    else:
        _return_error(415, 'Unsupported content type "{}"'.format(context.request_content_type or 'Unknown'))

@nadiaya
Copy link
Contributor

nadiaya commented Jun 8, 2020

Closing the issue. Please, feel free to reach out if you have any further questions!

@nadiaya nadiaya closed this as completed Jun 8, 2020
@Patrick-devX
Copy link

Patrick-devX commented Mar 14, 2021

@laurenyu
@nadiaya

I have a TensorFlow Model deployed with AWS SageMaker endpoint exposed . The TensorFlow model accepts 3 inputs as follows

{'input1' : numpy array , 'input2' : numpy array , 'input3' :numpy array, 'input4' :numpy array }

I get this error.
TypeError: Object of type 'ndarray' is not JSON serializable

Outside SageMaker everithing works fine.

Please i need your Help!

@laurenyu
Copy link
Contributor

@Patrick-devX can you post the full stacktrace?

@Patrick-devX
Copy link

Patrick-devX commented Mar 15, 2021

@laurenyu

TypeError Traceback (most recent call last)
in
----> 1 pred_state_ = predictor.predict([num, cat_1, cat_2, cat_3])
2 pred_state = pred_state_.argmax(axis=-1)
3
4 next_steps = le_actionen.classes_[pred_state]

~/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/sagemaker/tensorflow/model.py in predict(self, data, initial_args)
104 args["CustomAttributes"] = self._model_attributes
105
--> 106 return super(TensorFlowPredictor, self).predict(data, args)
107
108

~/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant, inference_id)
127
128 request_args = self._create_request_args(
--> 129 data, initial_args, target_model, target_variant, inference_id
130 )
131 response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)

~/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/sagemaker/predictor.py in _create_request_args(self, data, initial_args, target_model, target_variant, inference_id)
162 args["InferenceId"] = inference_id
163
--> 164 data = self.serializer.serialize(data)
165
166 args["Body"] = data

~/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/sagemaker/serializers.py in serialize(self, data)
227 return json.dumps(data.tolist())
228
--> 229 return json.dumps(data)
230
231

~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/init.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
229 cls is None and indent is None and separators is None and
230 default is None and not sort_keys and not kw):
--> 231 return _default_encoder.encode(obj)
232 if cls is None:
233 cls = JSONEncoder

~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in encode(self, o)
197 # exceptions aren't as detailed. The list call should be roughly
198 # equivalent to the PySequence_Fast that ''.join() would do.
--> 199 chunks = self.iterencode(o, _one_shot=True)
200 if not isinstance(chunks, (list, tuple)):
201 chunks = list(chunks)

~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in iterencode(self, o, _one_shot)
255 self.key_separator, self.item_separator, self.sort_keys,
256 self.skipkeys, _one_shot)
--> 257 return _iterencode(o, 0)
258
259 def _make_iterencode(markers, _default, _encoder, _indent, _floatstr,

~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in default(self, o)
178 """
179 raise TypeError("Object of type '%s' is not JSON serializable" %
--> 180 o.class.name)
181
182 def encode(self, o):

TypeError: Object of type 'ndarray' is not JSON serializable

Thank You!

https://stackoverflow.com/questions/66635745/how-to-send-many-numpy-arrays-to-sagemaker-endpoint

https://gitlab.com/patricksardin08/data-science/-/blob/master/user_traking/deploy_model.ipynb

@laurenyu
Copy link
Contributor

@Patrick-devX the issue is occurring client-side when your predictor object tries to serialize the data as JSON. based on your code, it looks like num is a numpy array, which would explain why your error message says Object of type 'ndarray' is not JSON serializable. Can you use something like numpy.asarray to make your input for predict() JSON-serializable?

if not, you'll probably want to look into:

(btw, in the future, instead of commenting on an unrelated issue with just an error message, open a new issue and provide as much info/logs/code as you can. For client-side errors like this one, you'll have better luck opening a new issue in this repo.)

@Patrick-devX
Copy link

Patrick-devX commented Mar 15, 2021

@laurenyu
thank for your reply
I have used np.asrray, it did not help any further. The inputs variables are all of type ndarray. Apparently they should be, because I tried to pass the inputs data as a list of lists. An error came out: "The expected inputs data should be tensors".
pred_state_ = predictor.predict([12, 3, 3, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6]])

it sends this error:
ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (400) from model with message "{ "error": "instances is a plain list, but expecting list of objects as multiple input tensors required as per tensorinfo_map" }"

Then i tried to use Serializable like this:
predictor.serializer = sagemaker.serializers.NumpySerializer(dtype=None, content_type='application/x-npy')
the following error occurs:
ValueError: could not broadcast input array from shape (55) into shape (1)

the IdentitySerializer:
predictor.serializer = sagemaker.serializers.IdentitySerializer(content_type='application/octet-stream')
sends this error:
ParamValidationError: Parameter validation failed: Invalid type for parameter Body, value: [array([[12.]]), array([[3]], dtype=int32), array([[3]], dtype=int32), array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6]], dtype=int32)], type: <class 'list'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

(I used those Serializer with and without the parameter content_type)

Yes regarding future issues , I understand Thank you.


I find this similar issue.
@laurenyu Thank you so much for your explanation! Now I got another bug.

When I'm in the notebook of training the model, it works for me to predict with the pred_set dataset, whose format is exactly as same as the training dataset

kmeans_predictor = kmeans.deploy(initial_instance_count=1,
instance_type='ml.m4.xlarge')

result = kmeans_predictor.predict(pred_set)

But when I called the endpoint in another notebook and trying to predict with the same pred_set dataset

from sagemaker.predictor import RealTimePredictor
predictor = RealTimePredictor('kmeans-2018-05-02-16-48-42-652')
predictor.predict(pred_set)

I got
ParamValidationError: Parameter validation failed:
Invalid type for parameter Body, value: [[ 6. 0. 1. 1. 20.]
[ 12. 0. 7. 1. 13.]
[ 1. 0. 1. 1. 0.]
...
[ 9. 0. 1. 1. 26.]
[ 10. 0. 2. 1. 19.]
[283. 0. 112. 93. 0.]], type: <class 'numpy.ndarray'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object
from @yolanda0202 : aws/sagemaker-python-sdk#166

@laurenyu
Copy link
Contributor

@Patrick-devX aws/sagemaker-python-sdk#166 is a different issue because using KMeans is different from using TF.

This is the sequence of events with what's happening:

  1. you call predictor.predict(data)
  2. data is serialized into an HTTP request - this is where your original error came from. by default this is done via JSON. based on your errors, it might be easier to stick with the default.
  3. HTTP request gets sent to your endpoint
  4. the endpoint calls tries to deserialize the HTTP request - by default this expects JSON, and returns a dict/list accordingly. you can write your own handler so that it returns a numpy array.
  5. the deserialized data is then used as input for the model - this is where your ModelError came from.

Since your model requires numpy arrays, you're going to need to write your own input handler to deal with step 4 - see https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/deploying_tensorflow_serving.html#providing-python-scripts-for-pre-pos-processing

@Patrick-devX
Copy link

Hi @laurenyu
The problem is somewhere else. If I catch a HTTP request, and then deserialize it with the handler function and form from it Numpy arrays with the same structure running locally.

These Numpy arrays are not accepted by the Endpoint, although they are the same that work successfully outside SageMaer on the same model.

@laurenyu
Copy link
Contributor

@Patrick-devX what is your error message and stacktrace after implementing a handler function? What does your handler function look like?

@Patrick-devX
Copy link

Patrick-devX commented Mar 23, 2021

@laurenyu Here is the handler:

def lambda_handler(event, context):

try:
print("Received event:" + json.dumps(event))
#get the event
request = json.loads(json.dumps(event))

   ` time_pro_event = request['time_pro_event']`
    `os_name = str(request['os_name'])`
    `browser_name = str(request['browser_name'])`
    `user_urls = request['user_urls']`

    #load label encoder objects
    le_betriebsystem = pickle.load(open('Labelencoder/le_betriebsystem.pkl', 'rb'))
    le_browser = pickle.load(open('Labelencoder/le_browser.pkl', 'rb'))
    le_action = pickle.load(open('Labelencoder/le_actionen.pkl', 'rb'))
    print(type(le_betriebsystem))
    
    max_seq_len = 55
    `cat_1, cat_2, cat_3, num =test_data_builder(time_pro_event,`
                                                    `os_name,`  
                                                   ` browser_name,` 
                                                    `user_urls,` 
                                                    `le_betriebsystem, `
                                                    `le_browser,` 
                                                    `le_action,`  
                                                    `max_seq_len, `
                                                    `URL)`
    `pred_state_ = boto3_client.invoke_endpoint(EndpointName = ENDPOINT_NAME, Body = (cat_1, cat_2, cat_3, num))`
    `pred_state = pred_state_.argmax(axis=-1)`
   ` next_steps = le_action.classes_[pred_state]`
        response = {
            "statusCode": 200,
            "the next 2 user steps are": json.dumps([next_steps[0][0],next_steps[0][1]] )
        }
   except Exception as err: 
return{
            'statusCode': 400,
            'body': 'call failed{0}'.format(err)
        }

return response

Debugging:

event = {
          "time_pro_event": [12, 6],
          "os_name": "Windows",
          "browser_name": "Firefox",
          "user_urls": ["www.josera.de/customer/account/","www.josera.de/katzenfutter.html"]
        }

time_pro_event = request['time_pro_event']
os_name = str(request['os_name'])
browser_name = str(request['browser_name'])
user_urls = request['user_urls']

([12, 6],
 'Windows',
 'Firefox',
 ['www.josera.de/customer/account/', 'www.josera.de/katzenfutter.html'])

max_seq_len = 55

**Input**
cat_1, cat_2, cat_3, num = test_data_builder(name_betriebsystem, 
                                             name_browser, 
                                             user_urls, 
                                             le_betriebsystem, 
                                             le_browser, 
                                             le_actionen, 
                                             time_pro_event,
                                             max_seq_len,
                                             URL)
**Output**
`(array([[9.]]),`
` array([[3]], dtype=int32),`
` array([[3]], dtype=int32),`
` array([[  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,`
           `0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,`
           `0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,`
           `0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,`
           `0,   6, 140]], dtype=int32))`

When i try to invoke the model like this:
response = client.invoke_endpoint(EndpointName=endpoint_name, Body=json.dumps((cat_1, cat_2, cat_3, num)))

i become:

TypeError  
 Traceback (most recent call last)
<ipython-input-57-8e6694c9cf2f> in <module>
    6 client = boto3.client('runtime.sagemaker')
      `7 # The sample model expects an input of shape [1,50]`
`----> 8 response = client.invoke_endpoint(EndpointName=endpoint_name, Body=json.dumps((cat_1, cat_2, cat_3, num)))`
      `9 response_body = response['Body']`
     `10 print(response_body.read())`

`~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    `229         cls is None and indent is None and separators is None and`
    230         default is None and not sort_keys and not kw):`
--> 231        ` return _default_encoder.encode(obj)`
    232    ` if cls is None:`
    233        ` cls = JSONEncoder`

`~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in encode(self, o)`
   ` 197         # exceptions aren't as detailed.  The list call should be roughly`
    `198         # equivalent to the PySequence_Fast that ''.join() would do.`
`--> 199         chunks = self.iterencode(o, _one_shot=True)`
   ` 200         if not isinstance(chunks, (list, tuple)):`
    `201             chunks = list(chunks)`

`~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in iterencode(self, o, _one_shot)
    255                 self.key_separator, self.item_separator, self.sort_keys,
    256                 self.skipkeys, _one_shot)`
`--> 257         return _iterencode(o, 0)`
  258 
  `  259 def _make_iterencode(markers, _default, _encoder, _indent, _floatstr,`

`~/anaconda3/envs/tensorflow_p36/lib/python3.6/json/encoder.py in default(self, o)`
    178     

`    179         raise TypeError("Object of type '%s' is not JSON serializable" %`
`--> 180                         o.__class__.__name__)`
`  81 `
   ` 182     def encode(self, o):`


`TypeError: Object of type 'ndarray' is not JSON serializable`


But, when i try the same prediction on the same modell  locally i get this:
`model =   tensorflow.keras.models.load_model(os.path.join(save_path, "user_tracking_model.h5"))`

`array([['josidogsolido', 'hundefutter']], dtype=object)`,    its works fine.  I really don't know what sageMaker wants.

The dimension of trainingsinput data were: 

os_input Dimension: (66,)
browser_input Dimension: (66,)
acktion_input Dimension: (66, 55)
time_nput Dimension: (66,)  #mean
target_nput Dimension: (66, 2, 209)

i used the kernel conda_tensorflow_36.

Thank You. I really need some help

@laurenyu
Copy link
Contributor

@Patrick-devX thanks for providing more context. two things -

First, by input handler, I mean the function input_handler that needs to be provided to the model. Please look at the docs here: https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/deploying_tensorflow_serving.html#providing-python-scripts-for-pre-pos-processing

You will need to implement your own input_handler so that it takes a JSON object and reconstructs the arrays as numpy arrays. Do this in a file named inference.py, and then redeploy the model with this file. (instructions in the docs linked above.)

Second, the error you just posted here is coming from invoking json.dumps on numpy arrays. If you look at the stacktrace, this is before the HTTP request is ever sent. You would get the same error by calling json.dumps((cat_1, cat_2, cat_3, num)) without invoke_endpoint. Here are some ideas for serializing numpy arrays as JSON: https://stackoverflow.com/questions/26646362/numpy-array-is-not-json-serializable

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

4 participants