@@ -58,7 +58,6 @@ def default_handler(data, context):
58
58
59
59
60
60
class PythonServiceResource :
61
-
62
61
def __init__ (self ):
63
62
if SAGEMAKER_MULTI_MODEL_ENABLED :
64
63
self ._model_tfs_rest_port = {}
@@ -83,9 +82,9 @@ def __init__(self):
83
82
log .info ("Inference script exists, importing handlers." )
84
83
# Single-Model Mode & Multi-Model Mode both use one inference.py
85
84
self ._handler , self ._input_handler , self ._output_handler = self ._import_handlers ()
86
- self ._handlers = self ._make_handler (self . _handler ,
87
- self ._input_handler ,
88
- self . _output_handler )
85
+ self ._handlers = self ._make_handler (
86
+ self . _handler , self ._input_handler , self . _output_handler
87
+ )
89
88
else :
90
89
log .info ("Inference script does not exist, using default handlers." )
91
90
self ._handlers = default_handler
@@ -108,7 +107,7 @@ def _pick_port(self, ports):
108
107
return random .choice (ports )
109
108
110
109
def _parse_sagemaker_port_range_mme (self , port_range ):
111
- lower , upper = port_range .split ('-' )
110
+ lower , upper = port_range .split ("-" )
112
111
lower = int (lower )
113
112
upper = lower + int ((int (upper ) - lower ) * 0.9 ) # only utilizing 90% of the ports
114
113
rest_port = lower
@@ -132,16 +131,14 @@ def _handle_load_model_post(self, res, data): # noqa: C901
132
131
# model is already loaded
133
132
if model_name in self ._model_tfs_pid :
134
133
res .status = falcon .HTTP_409
135
- res .body = json .dumps ({
136
- "error" : "Model {} is already loaded." .format (model_name )
137
- })
134
+ res .body = json .dumps ({"error" : "Model {} is already loaded." .format (model_name )})
138
135
139
136
# check if there are available ports
140
137
if not self ._ports_available ():
141
138
res .status = falcon .HTTP_507
142
- res .body = json .dumps ({
143
- "error" : "Memory exhausted: no available ports to load the model."
144
- } )
139
+ res .body = json .dumps (
140
+ { "error" : "Memory exhausted: no available ports to load the model." }
141
+ )
145
142
with lock ():
146
143
self ._model_tfs_rest_port [model_name ] = self ._tfs_ports ["rest_port" ].pop ()
147
144
self ._model_tfs_grpc_port [model_name ] = self ._tfs_ports ["grpc_port" ].pop ()
@@ -157,7 +154,8 @@ def _handle_load_model_post(self, res, data): # noqa: C901
157
154
f .write (tfs_config )
158
155
159
156
batching_config_file = "/sagemaker/batching/{}/batching-config.cfg" .format (
160
- model_name )
157
+ model_name
158
+ )
161
159
if self ._tfs_enable_batching :
162
160
tfs_utils .create_batching_config (batching_config_file )
163
161
@@ -170,22 +168,26 @@ def _handle_load_model_post(self, res, data): # noqa: C901
170
168
)
171
169
p = subprocess .Popen (cmd .split ())
172
170
173
- tfs_utils .wait_for_model (self ._model_tfs_rest_port [model_name ], model_name ,
174
- self ._tfs_wait_time_seconds )
171
+ tfs_utils .wait_for_model (
172
+ self ._model_tfs_rest_port [model_name ], model_name , self ._tfs_wait_time_seconds
173
+ )
175
174
176
175
log .info ("started tensorflow serving (pid: %d)" , p .pid )
177
176
# update model name <-> tfs pid map
178
177
self ._model_tfs_pid [model_name ] = p
179
178
180
179
res .status = falcon .HTTP_200
181
- res .body = json .dumps ({
182
- "success" :
183
- "Successfully loaded model {}, "
180
+ res .body = json .dumps (
181
+ {
182
+ "success" : " Successfully loaded model {}, "
184
183
"listening on rest port {} "
185
- "and grpc port {}." .format (model_name ,
186
- self ._model_tfs_rest_port ,
187
- self ._model_tfs_grpc_port ,)
188
- })
184
+ "and grpc port {}." .format (
185
+ model_name ,
186
+ self ._model_tfs_rest_port ,
187
+ self ._model_tfs_grpc_port ,
188
+ )
189
+ }
190
+ )
189
191
except MultiModelException as multi_model_exception :
190
192
self ._cleanup_config_file (tfs_config_file )
191
193
self ._cleanup_config_file (batching_config_file )
@@ -199,25 +201,28 @@ def _handle_load_model_post(self, res, data): # noqa: C901
199
201
raise MultiModelException (falcon .HTTP_500 , multi_model_exception .msg )
200
202
except FileExistsError as e :
201
203
res .status = falcon .HTTP_409
202
- res .body = json .dumps ({
203
- "error" : "Model {} is already loaded. {}" .format (model_name , str (e ))
204
- } )
204
+ res .body = json .dumps (
205
+ { "error" : "Model {} is already loaded. {}" .format (model_name , str (e ))}
206
+ )
205
207
except OSError as os_error :
206
208
self ._cleanup_config_file (tfs_config_file )
207
209
self ._cleanup_config_file (batching_config_file )
208
210
if os_error .errno == 12 :
209
- raise MultiModelException (falcon .HTTP_507 ,
210
- "Memory exhausted: "
211
- "not enough memory to start TFS instance" )
211
+ raise MultiModelException (
212
+ falcon .HTTP_507 ,
213
+ "Memory exhausted: " "not enough memory to start TFS instance" ,
214
+ )
212
215
else :
213
216
raise MultiModelException (falcon .HTTP_500 , os_error .strerror )
214
217
else :
215
218
res .status = falcon .HTTP_404
216
- res .body = json .dumps ({
217
- "error" :
218
- "Could not find valid base path {} for servable {}" .format (base_path ,
219
- model_name )
220
- })
219
+ res .body = json .dumps (
220
+ {
221
+ "error" : "Could not find valid base path {} for servable {}" .format (
222
+ base_path , model_name
223
+ )
224
+ }
225
+ )
221
226
222
227
def _cleanup_config_file (self , config_file ):
223
228
if os .path .exists (config_file ):
@@ -228,31 +233,37 @@ def _handle_invocation_post(self, req, res, model_name=None):
228
233
if model_name :
229
234
if model_name not in self ._model_tfs_rest_port :
230
235
res .status = falcon .HTTP_404
231
- res .body = json .dumps ({
232
- "error" : "Model {} is not loaded yet." .format (model_name )
233
- } )
236
+ res .body = json .dumps (
237
+ { "error" : "Model {} is not loaded yet." .format (model_name )}
238
+ )
234
239
return
235
240
else :
236
241
log .info ("model name: {}" .format (model_name ))
237
242
rest_port = self ._model_tfs_rest_port [model_name ]
238
243
log .info ("rest port: {}" .format (str (self ._model_tfs_rest_port [model_name ])))
239
244
grpc_port = self ._model_tfs_grpc_port [model_name ]
240
245
log .info ("grpc port: {}" .format (str (self ._model_tfs_grpc_port [model_name ])))
241
- data , context = tfs_utils .parse_request (req , rest_port , grpc_port ,
242
- self ._tfs_default_model_name ,
243
- model_name = model_name )
246
+ data , context = tfs_utils .parse_request (
247
+ req ,
248
+ rest_port ,
249
+ grpc_port ,
250
+ self ._tfs_default_model_name ,
251
+ model_name = model_name ,
252
+ )
244
253
else :
245
254
res .status = falcon .HTTP_400
246
- res .body = json .dumps ({
247
- "error" : "Invocation request does not contain model name."
248
- })
255
+ res .body = json .dumps ({"error" : "Invocation request does not contain model name." })
249
256
else :
250
257
# Randomly pick port used for routing incoming request.
251
258
grpc_port = self ._pick_port (self ._tfs_grpc_ports )
252
259
rest_port = self ._pick_port (self ._tfs_rest_ports )
253
- data , context = tfs_utils .parse_request (req , rest_port , grpc_port ,
254
- self ._tfs_default_model_name ,
255
- channel = self ._channels [grpc_port ])
260
+ data , context = tfs_utils .parse_request (
261
+ req ,
262
+ rest_port ,
263
+ grpc_port ,
264
+ self ._tfs_default_model_name ,
265
+ channel = self ._channels [grpc_port ],
266
+ )
256
267
257
268
try :
258
269
res .status = falcon .HTTP_200
@@ -261,9 +272,7 @@ def _handle_invocation_post(self, req, res, model_name=None):
261
272
except Exception as e : # pylint: disable=broad-except
262
273
log .exception ("exception handling request: {}" .format (e ))
263
274
res .status = falcon .HTTP_500
264
- res .body = json .dumps ({
265
- "error" : str (e )
266
- }).encode ("utf-8" ) # pylint: disable=E1101
275
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" ) # pylint: disable=E1101
267
276
268
277
def _setup_channel (self , grpc_port ):
269
278
if grpc_port not in self ._channels :
@@ -309,39 +318,31 @@ def on_get(self, req, res, model_name=None): # pylint: disable=W0613
309
318
except ValueError as e :
310
319
log .exception ("exception handling request: {}" .format (e ))
311
320
res .status = falcon .HTTP_500
312
- res .body = json .dumps ({
313
- "error" : str (e )
314
- }).encode ("utf-8" )
321
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" )
315
322
res .status = falcon .HTTP_200
316
323
res .body = json .dumps (models_info )
317
324
else :
318
325
if model_name not in self ._model_tfs_rest_port :
319
326
res .status = falcon .HTTP_404
320
- res .body = json .dumps ({
321
- "error" : "Model {} is loaded yet." .format (model_name )
322
- } ).encode ("utf-8" )
327
+ res .body = json .dumps (
328
+ { "error" : "Model {} is loaded yet." .format (model_name )}
329
+ ).encode ("utf-8" )
323
330
else :
324
331
port = self ._model_tfs_rest_port [model_name ]
325
332
uri = "http://localhost:{}/v1/models/{}" .format (port , model_name )
326
333
try :
327
334
info = requests .get (uri )
328
335
res .status = falcon .HTTP_200
329
- res .body = json .dumps ({
330
- "model" : info
331
- }).encode ("utf-8" )
336
+ res .body = json .dumps ({"model" : info }).encode ("utf-8" )
332
337
except ValueError as e :
333
338
log .exception ("exception handling GET models request." )
334
339
res .status = falcon .HTTP_500
335
- res .body = json .dumps ({
336
- "error" : str (e )
337
- }).encode ("utf-8" )
340
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" )
338
341
339
342
def on_delete (self , req , res , model_name ): # pylint: disable=W0613
340
343
if model_name not in self ._model_tfs_pid :
341
344
res .status = falcon .HTTP_404
342
- res .body = json .dumps ({
343
- "error" : "Model {} is not loaded yet" .format (model_name )
344
- })
345
+ res .body = json .dumps ({"error" : "Model {} is not loaded yet" .format (model_name )})
345
346
else :
346
347
try :
347
348
self ._model_tfs_pid [model_name ].kill ()
@@ -356,14 +357,12 @@ def on_delete(self, req, res, model_name): # pylint: disable=W0613
356
357
del self ._model_tfs_grpc_port [model_name ]
357
358
del self ._model_tfs_pid [model_name ]
358
359
res .status = falcon .HTTP_200
359
- res .body = json .dumps ({
360
- "success" : "Successfully unloaded model {}." .format (model_name )
361
- } )
360
+ res .body = json .dumps (
361
+ { "success" : "Successfully unloaded model {}." .format (model_name )}
362
+ )
362
363
except OSError as error :
363
364
res .status = falcon .HTTP_500
364
- res .body = json .dumps ({
365
- "error" : str (error )
366
- }).encode ("utf-8" )
365
+ res .body = json .dumps ({"error" : str (error )}).encode ("utf-8" )
367
366
368
367
def validate_model_dir (self , model_path ):
369
368
# model base path doesn't exits
0 commit comments