@@ -58,7 +58,6 @@ def default_handler(data, context):
58
58
59
59
60
60
class PythonServiceResource :
61
-
62
61
def __init__ (self ):
63
62
if SAGEMAKER_MULTI_MODEL_ENABLED :
64
63
self ._model_tfs_rest_port = {}
@@ -81,9 +80,9 @@ def __init__(self):
81
80
if os .path .exists (INFERENCE_SCRIPT_PATH ):
82
81
# Single-Model Mode & Multi-Model Mode both use one inference.py
83
82
self ._handler , self ._input_handler , self ._output_handler = self ._import_handlers ()
84
- self ._handlers = self ._make_handler (self . _handler ,
85
- self ._input_handler ,
86
- self . _output_handler )
83
+ self ._handlers = self ._make_handler (
84
+ self . _handler , self ._input_handler , self . _output_handler
85
+ )
87
86
else :
88
87
self ._handlers = default_handler
89
88
@@ -105,7 +104,7 @@ def _pick_port(self, ports):
105
104
return random .choice (ports )
106
105
107
106
def _parse_sagemaker_port_range_mme (self , port_range ):
108
- lower , upper = port_range .split ('-' )
107
+ lower , upper = port_range .split ("-" )
109
108
lower = int (lower )
110
109
upper = lower + int ((int (upper ) - lower ) * 0.9 ) # only utilizing 90% of the ports
111
110
rest_port = lower
@@ -129,16 +128,14 @@ def _handle_load_model_post(self, res, data): # noqa: C901
129
128
# model is already loaded
130
129
if model_name in self ._model_tfs_pid :
131
130
res .status = falcon .HTTP_409
132
- res .body = json .dumps ({
133
- "error" : "Model {} is already loaded." .format (model_name )
134
- })
131
+ res .body = json .dumps ({"error" : "Model {} is already loaded." .format (model_name )})
135
132
136
133
# check if there are available ports
137
134
if not self ._ports_available ():
138
135
res .status = falcon .HTTP_507
139
- res .body = json .dumps ({
140
- "error" : "Memory exhausted: no available ports to load the model."
141
- } )
136
+ res .body = json .dumps (
137
+ { "error" : "Memory exhausted: no available ports to load the model." }
138
+ )
142
139
with lock ():
143
140
self ._model_tfs_rest_port [model_name ] = self ._tfs_ports ["rest_port" ].pop ()
144
141
self ._model_tfs_grpc_port [model_name ] = self ._tfs_ports ["grpc_port" ].pop ()
@@ -154,7 +151,8 @@ def _handle_load_model_post(self, res, data): # noqa: C901
154
151
f .write (tfs_config )
155
152
156
153
batching_config_file = "/sagemaker/batching/{}/batching-config.cfg" .format (
157
- model_name )
154
+ model_name
155
+ )
158
156
if self ._tfs_enable_batching :
159
157
tfs_utils .create_batching_config (batching_config_file )
160
158
@@ -167,22 +165,26 @@ def _handle_load_model_post(self, res, data): # noqa: C901
167
165
)
168
166
p = subprocess .Popen (cmd .split ())
169
167
170
- tfs_utils .wait_for_model (self ._model_tfs_rest_port [model_name ], model_name ,
171
- self ._tfs_wait_time_seconds )
168
+ tfs_utils .wait_for_model (
169
+ self ._model_tfs_rest_port [model_name ], model_name , self ._tfs_wait_time_seconds
170
+ )
172
171
173
172
log .info ("started tensorflow serving (pid: %d)" , p .pid )
174
173
# update model name <-> tfs pid map
175
174
self ._model_tfs_pid [model_name ] = p
176
175
177
176
res .status = falcon .HTTP_200
178
- res .body = json .dumps ({
179
- "success" :
180
- "Successfully loaded model {}, "
177
+ res .body = json .dumps (
178
+ {
179
+ "success" : " Successfully loaded model {}, "
181
180
"listening on rest port {} "
182
- "and grpc port {}." .format (model_name ,
183
- self ._model_tfs_rest_port ,
184
- self ._model_tfs_grpc_port ,)
185
- })
181
+ "and grpc port {}." .format (
182
+ model_name ,
183
+ self ._model_tfs_rest_port ,
184
+ self ._model_tfs_grpc_port ,
185
+ )
186
+ }
187
+ )
186
188
except MultiModelException as multi_model_exception :
187
189
self ._cleanup_config_file (tfs_config_file )
188
190
self ._cleanup_config_file (batching_config_file )
@@ -196,25 +198,28 @@ def _handle_load_model_post(self, res, data): # noqa: C901
196
198
raise MultiModelException (falcon .HTTP_500 , multi_model_exception .msg )
197
199
except FileExistsError as e :
198
200
res .status = falcon .HTTP_409
199
- res .body = json .dumps ({
200
- "error" : "Model {} is already loaded. {}" .format (model_name , str (e ))
201
- } )
201
+ res .body = json .dumps (
202
+ { "error" : "Model {} is already loaded. {}" .format (model_name , str (e ))}
203
+ )
202
204
except OSError as os_error :
203
205
self ._cleanup_config_file (tfs_config_file )
204
206
self ._cleanup_config_file (batching_config_file )
205
207
if os_error .errno == 12 :
206
- raise MultiModelException (falcon .HTTP_507 ,
207
- "Memory exhausted: "
208
- "not enough memory to start TFS instance" )
208
+ raise MultiModelException (
209
+ falcon .HTTP_507 ,
210
+ "Memory exhausted: " "not enough memory to start TFS instance" ,
211
+ )
209
212
else :
210
213
raise MultiModelException (falcon .HTTP_500 , os_error .strerror )
211
214
else :
212
215
res .status = falcon .HTTP_404
213
- res .body = json .dumps ({
214
- "error" :
215
- "Could not find valid base path {} for servable {}" .format (base_path ,
216
- model_name )
217
- })
216
+ res .body = json .dumps (
217
+ {
218
+ "error" : "Could not find valid base path {} for servable {}" .format (
219
+ base_path , model_name
220
+ )
221
+ }
222
+ )
218
223
219
224
def _cleanup_config_file (self , config_file ):
220
225
if os .path .exists (config_file ):
@@ -225,31 +230,37 @@ def _handle_invocation_post(self, req, res, model_name=None):
225
230
if model_name :
226
231
if model_name not in self ._model_tfs_rest_port :
227
232
res .status = falcon .HTTP_404
228
- res .body = json .dumps ({
229
- "error" : "Model {} is not loaded yet." .format (model_name )
230
- } )
233
+ res .body = json .dumps (
234
+ { "error" : "Model {} is not loaded yet." .format (model_name )}
235
+ )
231
236
return
232
237
else :
233
238
log .info ("model name: {}" .format (model_name ))
234
239
rest_port = self ._model_tfs_rest_port [model_name ]
235
240
log .info ("rest port: {}" .format (str (self ._model_tfs_rest_port [model_name ])))
236
241
grpc_port = self ._model_tfs_grpc_port [model_name ]
237
242
log .info ("grpc port: {}" .format (str (self ._model_tfs_grpc_port [model_name ])))
238
- data , context = tfs_utils .parse_request (req , rest_port , grpc_port ,
239
- self ._tfs_default_model_name ,
240
- model_name = model_name )
243
+ data , context = tfs_utils .parse_request (
244
+ req ,
245
+ rest_port ,
246
+ grpc_port ,
247
+ self ._tfs_default_model_name ,
248
+ model_name = model_name ,
249
+ )
241
250
else :
242
251
res .status = falcon .HTTP_400
243
- res .body = json .dumps ({
244
- "error" : "Invocation request does not contain model name."
245
- })
252
+ res .body = json .dumps ({"error" : "Invocation request does not contain model name." })
246
253
else :
247
254
# Randomly pick port used for routing incoming request.
248
255
grpc_port = self ._pick_port (self ._tfs_grpc_ports )
249
256
rest_port = self ._pick_port (self ._tfs_rest_ports )
250
- data , context = tfs_utils .parse_request (req , rest_port , grpc_port ,
251
- self ._tfs_default_model_name ,
252
- channel = self ._channels [grpc_port ])
257
+ data , context = tfs_utils .parse_request (
258
+ req ,
259
+ rest_port ,
260
+ grpc_port ,
261
+ self ._tfs_default_model_name ,
262
+ channel = self ._channels [grpc_port ],
263
+ )
253
264
254
265
try :
255
266
res .status = falcon .HTTP_200
@@ -258,9 +269,7 @@ def _handle_invocation_post(self, req, res, model_name=None):
258
269
except Exception as e : # pylint: disable=broad-except
259
270
log .exception ("exception handling request: {}" .format (e ))
260
271
res .status = falcon .HTTP_500
261
- res .body = json .dumps ({
262
- "error" : str (e )
263
- }).encode ("utf-8" ) # pylint: disable=E1101
272
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" ) # pylint: disable=E1101
264
273
265
274
def _setup_channel (self , grpc_port ):
266
275
if grpc_port not in self ._channels :
@@ -306,39 +315,31 @@ def on_get(self, req, res, model_name=None): # pylint: disable=W0613
306
315
except ValueError as e :
307
316
log .exception ("exception handling request: {}" .format (e ))
308
317
res .status = falcon .HTTP_500
309
- res .body = json .dumps ({
310
- "error" : str (e )
311
- }).encode ("utf-8" )
318
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" )
312
319
res .status = falcon .HTTP_200
313
320
res .body = json .dumps (models_info )
314
321
else :
315
322
if model_name not in self ._model_tfs_rest_port :
316
323
res .status = falcon .HTTP_404
317
- res .body = json .dumps ({
318
- "error" : "Model {} is loaded yet." .format (model_name )
319
- } ).encode ("utf-8" )
324
+ res .body = json .dumps (
325
+ { "error" : "Model {} is loaded yet." .format (model_name )}
326
+ ).encode ("utf-8" )
320
327
else :
321
328
port = self ._model_tfs_rest_port [model_name ]
322
329
uri = "http://localhost:{}/v1/models/{}" .format (port , model_name )
323
330
try :
324
331
info = requests .get (uri )
325
332
res .status = falcon .HTTP_200
326
- res .body = json .dumps ({
327
- "model" : info
328
- }).encode ("utf-8" )
333
+ res .body = json .dumps ({"model" : info }).encode ("utf-8" )
329
334
except ValueError as e :
330
335
log .exception ("exception handling GET models request." )
331
336
res .status = falcon .HTTP_500
332
- res .body = json .dumps ({
333
- "error" : str (e )
334
- }).encode ("utf-8" )
337
+ res .body = json .dumps ({"error" : str (e )}).encode ("utf-8" )
335
338
336
339
def on_delete (self , req , res , model_name ): # pylint: disable=W0613
337
340
if model_name not in self ._model_tfs_pid :
338
341
res .status = falcon .HTTP_404
339
- res .body = json .dumps ({
340
- "error" : "Model {} is not loaded yet" .format (model_name )
341
- })
342
+ res .body = json .dumps ({"error" : "Model {} is not loaded yet" .format (model_name )})
342
343
else :
343
344
try :
344
345
self ._model_tfs_pid [model_name ].kill ()
@@ -353,14 +354,12 @@ def on_delete(self, req, res, model_name): # pylint: disable=W0613
353
354
del self ._model_tfs_grpc_port [model_name ]
354
355
del self ._model_tfs_pid [model_name ]
355
356
res .status = falcon .HTTP_200
356
- res .body = json .dumps ({
357
- "success" : "Successfully unloaded model {}." .format (model_name )
358
- } )
357
+ res .body = json .dumps (
358
+ { "success" : "Successfully unloaded model {}." .format (model_name )}
359
+ )
359
360
except OSError as error :
360
361
res .status = falcon .HTTP_500
361
- res .body = json .dumps ({
362
- "error" : str (error )
363
- }).encode ("utf-8" )
362
+ res .body = json .dumps ({"error" : str (error )}).encode ("utf-8" )
364
363
365
364
def validate_model_dir (self , model_path ):
366
365
# model base path doesn't exits
0 commit comments