30
30
PYTHON_ENABLE_DEBUG_LOGGING ,
31
31
PYTHON_SCRIPT_FILE_NAME ,
32
32
PYTHON_SCRIPT_FILE_NAME_DEFAULT ,
33
- PYTHON_LANGUAGE_RUNTIME )
33
+ PYTHON_LANGUAGE_RUNTIME , PYTHON_ENABLE_INIT_INDEXING ,
34
+ METADATA_PROPERTIES_WORKER_INDEXED )
34
35
from .extension import ExtensionManager
35
36
from .logging import disable_console_logging , enable_console_logging
36
37
from .logging import (logger , error_logger , is_system_log_category ,
@@ -72,9 +73,12 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
72
73
self ._function_data_cache_enabled = False
73
74
self ._functions = functions .Registry ()
74
75
self ._shmem_mgr = SharedMemoryManager ()
75
-
76
76
self ._old_task_factory = None
77
77
78
+ # Used to store metadata returns
79
+ self ._function_metadata_result = None
80
+ self ._function_metadata_exception = None
81
+
78
82
# We allow the customer to change synchronous thread pool max worker
79
83
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
80
84
# For 3.[6|7|8] The default value is 1.
@@ -297,6 +301,14 @@ async def _handle__worker_init_request(self, request):
297
301
# dictionary which will be later used in the invocation request
298
302
bindings .load_binding_registry ()
299
303
304
+ if is_envvar_true (PYTHON_ENABLE_INIT_INDEXING ):
305
+ try :
306
+ self .load_function_metadata (
307
+ worker_init_request .function_app_directory ,
308
+ caller_info = "worker_init_request" )
309
+ except Exception as ex :
310
+ self ._function_metadata_exception = ex
311
+
300
312
return protos .StreamingMessage (
301
313
request_id = self .request_id ,
302
314
worker_init_response = protos .WorkerInitResponse (
@@ -313,82 +325,114 @@ async def _handle__worker_status_request(self, request):
313
325
request_id = request .request_id ,
314
326
worker_status_response = protos .WorkerStatusResponse ())
315
327
328
+ def load_function_metadata (self , function_app_directory , caller_info ):
329
+ """
330
+ This method is called to index the functions in the function app
331
+ directory and save the results in function_metadata_result or
332
+ function_metadata_exception in case of an exception.
333
+ """
334
+ script_file_name = get_app_setting (
335
+ setting = PYTHON_SCRIPT_FILE_NAME ,
336
+ default_value = f'{ PYTHON_SCRIPT_FILE_NAME_DEFAULT } ' )
337
+
338
+ logger .debug (
339
+ 'Received load metadata request from %s, request ID %s, '
340
+ 'script_file_name: %s' ,
341
+ caller_info , self .request_id , script_file_name )
342
+
343
+ validate_script_file_name (script_file_name )
344
+ function_path = os .path .join (function_app_directory ,
345
+ script_file_name )
346
+
347
+ self ._function_metadata_result = (
348
+ self .index_functions (function_path )) \
349
+ if os .path .exists (function_path ) else None
350
+
316
351
async def _handle__functions_metadata_request (self , request ):
317
352
metadata_request = request .functions_metadata_request
318
- directory = metadata_request .function_app_directory
353
+ function_app_directory = metadata_request .function_app_directory
354
+
319
355
script_file_name = get_app_setting (
320
356
setting = PYTHON_SCRIPT_FILE_NAME ,
321
357
default_value = f'{ PYTHON_SCRIPT_FILE_NAME_DEFAULT } ' )
322
- function_path = os .path .join (directory , script_file_name )
358
+ function_path = os .path .join (function_app_directory ,
359
+ script_file_name )
323
360
324
361
logger .info (
325
- 'Received WorkerMetadataRequest, request ID %s, function_path: %s' ,
362
+ 'Received WorkerMetadataRequest, request ID %s, '
363
+ 'function_path: %s' ,
326
364
self .request_id , function_path )
327
365
328
- try :
329
- validate_script_file_name (script_file_name )
330
-
331
- if not os .path .exists (function_path ):
332
- # Fallback to legacy model
333
- return protos .StreamingMessage (
334
- request_id = request .request_id ,
335
- function_metadata_response = protos .FunctionMetadataResponse (
336
- use_default_metadata_indexing = True ,
337
- result = protos .StatusResult (
338
- status = protos .StatusResult .Success )))
339
-
340
- fx_metadata_results = self .index_functions (function_path )
366
+ if not is_envvar_true (PYTHON_ENABLE_INIT_INDEXING ):
367
+ try :
368
+ self .load_function_metadata (
369
+ function_app_directory ,
370
+ caller_info = "functions_metadata_request" )
371
+ except Exception as ex :
372
+ self ._function_metadata_exception = ex
341
373
374
+ if self ._function_metadata_exception :
342
375
return protos .StreamingMessage (
343
376
request_id = request .request_id ,
344
377
function_metadata_response = protos .FunctionMetadataResponse (
345
- function_metadata_results = fx_metadata_results ,
346
378
result = protos .StatusResult (
347
- status = protos .StatusResult .Success )))
379
+ status = protos .StatusResult .Failure ,
380
+ exception = self ._serialize_exception (
381
+ self ._function_metadata_exception ))))
382
+ else :
383
+ metadata_result = self ._function_metadata_result
348
384
349
- except Exception as ex :
350
385
return protos .StreamingMessage (
351
- request_id = self .request_id ,
386
+ request_id = request .request_id ,
352
387
function_metadata_response = protos .FunctionMetadataResponse (
388
+ use_default_metadata_indexing = False if metadata_result else
389
+ True ,
390
+ function_metadata_results = metadata_result ,
353
391
result = protos .StatusResult (
354
- status = protos .StatusResult .Failure ,
355
- exception = self ._serialize_exception (ex ))))
392
+ status = protos .StatusResult .Success )))
356
393
357
394
async def _handle__function_load_request (self , request ):
358
395
func_request = request .function_load_request
359
396
function_id = func_request .function_id
360
397
function_metadata = func_request .metadata
361
398
function_name = function_metadata .name
399
+ function_app_directory = function_metadata .directory
362
400
363
401
logger .info (
364
402
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
365
- 'function_name: %s,' , self .request_id , function_id , function_name )
403
+ 'function_name: %s' ,
404
+ self .request_id , function_id , function_name )
366
405
367
406
programming_model = "V2"
368
407
try :
369
408
if not self ._functions .get_function (function_id ):
370
- script_file_name = get_app_setting (
371
- setting = PYTHON_SCRIPT_FILE_NAME ,
372
- default_value = f'{ PYTHON_SCRIPT_FILE_NAME_DEFAULT } ' )
373
- validate_script_file_name (script_file_name )
374
- function_path = os .path .join (
375
- function_metadata .directory ,
376
- script_file_name )
377
-
378
- if function_metadata .properties .get ("worker_indexed" , False ) \
379
- or os .path .exists (function_path ):
409
+
410
+ if function_metadata .properties .get (
411
+ METADATA_PROPERTIES_WORKER_INDEXED , False ):
380
412
# This is for the second worker and above where the worker
381
413
# indexing is enabled and load request is called without
382
414
# calling the metadata request. In this case we index the
383
415
# function and update the workers registry
384
- _ = self .index_functions (function_path )
416
+
417
+ try :
418
+ self .load_function_metadata (
419
+ function_app_directory ,
420
+ caller_info = "functions_load_request" )
421
+ except Exception as ex :
422
+ self ._function_metadata_exception = ex
423
+
424
+ # For the second worker, if there was an exception in
425
+ # indexing, we raise it here
426
+ if self ._function_metadata_exception :
427
+ raise Exception (self ._function_metadata_exception )
428
+
385
429
else :
386
430
# legacy function
387
431
programming_model = "V1"
388
432
389
433
func = loader .load_function (
390
- func_request . metadata . name ,
391
- func_request . metadata . directory ,
434
+ function_name ,
435
+ function_app_directory ,
392
436
func_request .metadata .script_file ,
393
437
func_request .metadata .entry_point )
394
438
@@ -562,6 +606,7 @@ async def _handle__function_environment_reload_request(self, request):
562
606
563
607
func_env_reload_request = \
564
608
request .function_environment_reload_request
609
+ directory = func_env_reload_request .function_app_directory
565
610
566
611
# Append function project root to module finding sys.path
567
612
if func_env_reload_request .function_app_directory :
@@ -587,14 +632,20 @@ async def _handle__function_environment_reload_request(self, request):
587
632
root_logger .setLevel (logging .DEBUG )
588
633
589
634
# Reload azure google namespaces
590
- DependencyManager .reload_customer_libraries (
591
- func_env_reload_request .function_app_directory
592
- )
635
+ DependencyManager .reload_customer_libraries (directory )
593
636
594
637
# calling load_binding_registry again since the
595
638
# reload_customer_libraries call clears the registry
596
639
bindings .load_binding_registry ()
597
640
641
+ if is_envvar_true (PYTHON_ENABLE_INIT_INDEXING ):
642
+ try :
643
+ self .load_function_metadata (
644
+ directory ,
645
+ caller_info = "environment_reload_request" )
646
+ except Exception as ex :
647
+ self ._function_metadata_exception = ex
648
+
598
649
# Change function app directory
599
650
if getattr (func_env_reload_request ,
600
651
'function_app_directory' , None ):
0 commit comments