30
30
PYTHON_ENABLE_DEBUG_LOGGING ,
31
31
PYTHON_SCRIPT_FILE_NAME ,
32
32
PYTHON_SCRIPT_FILE_NAME_DEFAULT ,
33
- PYTHON_LANGUAGE_RUNTIME )
33
+ PYTHON_LANGUAGE_RUNTIME , INIT_INDEXING )
34
34
from .extension import ExtensionManager
35
35
from .logging import disable_console_logging , enable_console_logging
36
36
from .logging import (logger , error_logger , is_system_log_category ,
@@ -72,9 +72,12 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
72
72
self ._function_data_cache_enabled = False
73
73
self ._functions = functions .Registry ()
74
74
self ._shmem_mgr = SharedMemoryManager ()
75
-
76
75
self ._old_task_factory = None
77
76
77
+ # Used to store metadata returns
78
+ self .function_metadata_result = None
79
+ self .function_metadata_exception = None
80
+
78
81
# We allow the customer to change synchronous thread pool max worker
79
82
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
80
83
# For 3.[6|7|8] The default value is 1.
@@ -297,6 +300,9 @@ async def _handle__worker_init_request(self, request):
297
300
# dictionary which will be later used in the invocation request
298
301
bindings .load_binding_registry ()
299
302
303
+ if is_envvar_true (INIT_INDEXING ):
304
+ self .get_function_metadata (worker_init_request )
305
+
300
306
return protos .StreamingMessage (
301
307
request_id = self .request_id ,
302
308
worker_init_response = protos .WorkerInitResponse (
@@ -313,52 +319,57 @@ async def _handle__worker_status_request(self, request):
313
319
request_id = request .request_id ,
314
320
worker_status_response = protos .WorkerStatusResponse ())
315
321
316
- async def _handle__functions_metadata_request (self , request ):
317
- metadata_request = request .functions_metadata_request
318
- directory = metadata_request .function_app_directory
322
+ def get_function_metadata (self , directory , caller_info ):
319
323
script_file_name = get_app_setting (
320
324
setting = PYTHON_SCRIPT_FILE_NAME ,
321
325
default_value = f'{ PYTHON_SCRIPT_FILE_NAME_DEFAULT } ' )
322
- function_path = os .path .join (directory , script_file_name )
323
326
324
327
logger .info (
325
- 'Received WorkerMetadataRequest, request ID %s, function_path : %s' ,
326
- self .request_id , function_path )
328
+ 'Received WorkerMetadataRequest from %s , request ID %s, script_file_name : %s' ,
329
+ caller_info , self .request_id , script_file_name )
327
330
328
331
try :
329
332
validate_script_file_name (script_file_name )
333
+ function_path = os .path .join (directory , script_file_name )
330
334
331
- if not os .path .exists (function_path ):
332
- # Fallback to legacy model
333
- return protos .StreamingMessage (
334
- request_id = request .request_id ,
335
- function_metadata_response = protos .FunctionMetadataResponse (
336
- use_default_metadata_indexing = True ,
337
- result = protos .StatusResult (
338
- status = protos .StatusResult .Success )))
335
+ self .function_metadata_result = self .index_functions (function_path ) \
336
+ if os .path .exists (function_path ) else None
339
337
340
- fx_metadata_results = self .index_functions (function_path )
338
+ except Exception as ex :
339
+ self .function_metadata_exception = self ._serialize_exception (ex )
341
340
341
+ async def _handle__functions_metadata_request (self , request ):
342
+ metadata_request = request .functions_metadata_request
343
+ directory = metadata_request .function_app_directory
344
+
345
+ if not is_envvar_true (INIT_INDEXING ):
346
+ self .get_function_metadata (directory ,
347
+ caller_info = sys ._getframe ().f_code .co_name )
348
+
349
+ if self .function_metadata_exception :
342
350
return protos .StreamingMessage (
343
- request_id = request .request_id ,
351
+ request_id = self .request_id ,
344
352
function_metadata_response = protos .FunctionMetadataResponse (
345
- function_metadata_results = fx_metadata_results ,
346
353
result = protos .StatusResult (
347
- status = protos .StatusResult .Success )))
354
+ status = protos .StatusResult .Failure ,
355
+ exception = self .function_metadata_exception )))
356
+ else :
357
+ fmr = self .function_metadata_result
348
358
349
- except Exception as ex :
350
359
return protos .StreamingMessage (
351
- request_id = self .request_id ,
360
+ request_id = request .request_id ,
352
361
function_metadata_response = protos .FunctionMetadataResponse (
362
+ use_default_metadata_indexing = False if fmr else True ,
363
+ function_metadata_results = fmr ,
353
364
result = protos .StatusResult (
354
- status = protos .StatusResult .Failure ,
355
- exception = self ._serialize_exception (ex ))))
365
+ status = protos .StatusResult .Success )))
356
366
357
367
async def _handle__function_load_request (self , request ):
358
368
func_request = request .function_load_request
359
369
function_id = func_request .function_id
360
370
function_metadata = func_request .metadata
361
371
function_name = function_metadata .name
372
+ directory = function_metadata .directory
362
373
363
374
logger .info (
364
375
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
@@ -367,28 +378,23 @@ async def _handle__function_load_request(self, request):
367
378
programming_model = "V2"
368
379
try :
369
380
if not self ._functions .get_function (function_id ):
370
- script_file_name = get_app_setting (
371
- setting = PYTHON_SCRIPT_FILE_NAME ,
372
- default_value = f'{ PYTHON_SCRIPT_FILE_NAME_DEFAULT } ' )
373
- validate_script_file_name (script_file_name )
374
- function_path = os .path .join (
375
- function_metadata .directory ,
376
- script_file_name )
377
-
378
- if function_metadata .properties .get ("worker_indexed" , False ) \
379
- or os .path .exists (function_path ):
381
+
382
+ if function_metadata .properties .get ("worker_indexed" , False )\
383
+ and not is_envvar_true (INIT_INDEXING ):
380
384
# This is for the second worker and above where the worker
381
385
# indexing is enabled and load request is called without
382
386
# calling the metadata request. In this case we index the
383
387
# function and update the workers registry
384
- _ = self .index_functions (function_path )
388
+
389
+ self .get_function_metadata (directory ,
390
+ caller_info = sys ._getframe ().f_code .co_name )
385
391
else :
386
392
# legacy function
387
393
programming_model = "V1"
388
394
389
395
func = loader .load_function (
390
- func_request . metadata . name ,
391
- func_request . metadata . directory ,
396
+ function_name ,
397
+ directory ,
392
398
func_request .metadata .script_file ,
393
399
func_request .metadata .entry_point )
394
400
0 commit comments