Skip to content

Moving function indexing to init request #1446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 25, 2024
5 changes: 5 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@

# Paths
CUSTOMER_PACKAGES_PATH = "/home/site/wwwroot/.python_packages/lib/site-packages"

# Flag to index functions in handle init request
PYTHON_ENABLE_INIT_INDEXING = "PYTHON_ENABLE_INIT_INDEXING"

METADATA_PROPERTIES_WORKER_INDEXED = "worker_indexed"
138 changes: 96 additions & 42 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
PYTHON_ENABLE_DEBUG_LOGGING,
PYTHON_SCRIPT_FILE_NAME,
PYTHON_SCRIPT_FILE_NAME_DEFAULT,
PYTHON_LANGUAGE_RUNTIME)
PYTHON_LANGUAGE_RUNTIME, PYTHON_ENABLE_INIT_INDEXING,
METADATA_PROPERTIES_WORKER_INDEXED)
from .extension import ExtensionManager
from .logging import disable_console_logging, enable_console_logging
from .logging import (logger, error_logger, is_system_log_category,
Expand Down Expand Up @@ -72,9 +73,12 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._function_data_cache_enabled = False
self._functions = functions.Registry()
self._shmem_mgr = SharedMemoryManager()

self._old_task_factory = None

# Used to store metadata returns
self._function_metadata_result = None
self._function_metadata_exception = None

# We allow the customer to change synchronous thread pool max worker
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
# For 3.[6|7|8] The default value is 1.
Expand Down Expand Up @@ -297,6 +301,14 @@ async def _handle__worker_init_request(self, request):
# dictionary which will be later used in the invocation request
bindings.load_binding_registry()

if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
worker_init_request.function_app_directory,
caller_info="worker_init_request")
except Exception as ex:
self._function_metadata_exception = ex

return protos.StreamingMessage(
request_id=self.request_id,
worker_init_response=protos.WorkerInitResponse(
Expand All @@ -313,82 +325,117 @@ async def _handle__worker_status_request(self, request):
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())

def load_function_metadata(self, function_app_directory, caller_info):
"""
This method is called to index the functions in the function app
directory and save the results in function_metadata_result or
function_metadata_exception in case of an exception.
"""
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')

logger.debug(
'Received load metadata request from %s, request ID %s, '
'script_file_name: %s',
caller_info, self.request_id, script_file_name)

validate_script_file_name(script_file_name)
function_path = os.path.join(function_app_directory,
script_file_name)

self._function_metadata_result = (
self.index_functions(function_path)) \
if os.path.exists(function_path) else None

async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
function_app_directory = metadata_request.function_app_directory

script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
function_path = os.path.join(directory, script_file_name)
function_path = os.path.join(function_app_directory,
script_file_name)

logger.info(
'Received WorkerMetadataRequest, request ID %s, function_path: %s',
'Received WorkerMetadataRequest, request ID %s, '
'function_path: %s',
self.request_id, function_path)

try:
validate_script_file_name(script_file_name)

if not os.path.exists(function_path):
# Fallback to legacy model
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=True,
result=protos.StatusResult(
status=protos.StatusResult.Success)))

fx_metadata_results = self.index_functions(function_path)
if not is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
function_app_directory,
caller_info="functions_metadata_request")
except Exception as ex:
self._function_metadata_exception = ex

if self._function_metadata_exception:
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
status=protos.StatusResult.Failure,
exception=self._serialize_exception(
self._function_metadata_exception))))
else:
metadata_result = self._function_metadata_result

except Exception as ex:
return protos.StreamingMessage(
request_id=self.request_id,
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=False if metadata_result else
True,
function_metadata_results=metadata_result,
result=protos.StatusResult(
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
status=protos.StatusResult.Success)))

async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_metadata = func_request.metadata
function_name = function_metadata.name
function_app_directory = function_metadata.directory
function_already_indexed = self._functions.get_function(function_id)

logger.info(
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
'function_name: %s,', self.request_id, function_id, function_name)
'function_name: %s, function_already_indexed: %s',
self.request_id, function_id, function_name,
function_already_indexed)

programming_model = "V2"
try:
if not self._functions.get_function(function_id):
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
validate_script_file_name(script_file_name)
function_path = os.path.join(
function_metadata.directory,
script_file_name)

if function_metadata.properties.get("worker_indexed", False) \
or os.path.exists(function_path):
if not function_already_indexed:

if function_metadata.properties.get(
METADATA_PROPERTIES_WORKER_INDEXED, False):
# This is for the second worker and above where the worker
# indexing is enabled and load request is called without
# calling the metadata request. In this case we index the
# function and update the workers registry
_ = self.index_functions(function_path)

if not is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
function_app_directory,
caller_info="functions_load_request")
except Exception as ex:
self._function_metadata_exception = ex

# For the second worker, if there was an exception in
# indexing, we raise it here
if self._function_metadata_exception:
raise Exception(self._function_metadata_exception)

else:
# legacy function
programming_model = "V1"

func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
function_name,
function_app_directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

Expand Down Expand Up @@ -562,6 +609,7 @@ async def _handle__function_environment_reload_request(self, request):

func_env_reload_request = \
request.function_environment_reload_request
directory = func_env_reload_request.function_app_directory

# Append function project root to module finding sys.path
if func_env_reload_request.function_app_directory:
Expand All @@ -587,14 +635,20 @@ async def _handle__function_environment_reload_request(self, request):
root_logger.setLevel(logging.DEBUG)

# Reload azure google namespaces
DependencyManager.reload_customer_libraries(
func_env_reload_request.function_app_directory
)
DependencyManager.reload_customer_libraries(directory)

# calling load_binding_registry again since the
# reload_customer_libraries call clears the registry
bindings.load_binding_registry()

if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
directory,
caller_info="environment_reload_request")
except Exception as ex:
self._function_metadata_exception = ex

# Change function app directory
if getattr(func_env_reload_request,
'function_app_directory', None):
Expand Down
4 changes: 2 additions & 2 deletions azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .utils.common import get_app_setting
from .constants import MODULE_NOT_FOUND_TS_URL, PYTHON_SCRIPT_FILE_NAME, \
PYTHON_SCRIPT_FILE_NAME_DEFAULT, PYTHON_LANGUAGE_RUNTIME, \
CUSTOMER_PACKAGES_PATH, RETRY_POLICY
CUSTOMER_PACKAGES_PATH, RETRY_POLICY, METADATA_PROPERTIES_WORKER_INDEXED
from .logging import logger
from .utils.wrappers import attach_message_to_exception

Expand Down Expand Up @@ -142,7 +142,7 @@ def process_indexed_function(functions_registry: functions.Registry,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings(),
retry_options=retry_protos,
properties={"worker_indexed": "True"})
properties={METADATA_PROPERTIES_WORKER_INDEXED: "True"})

fx_metadata_results.append(function_metadata)

Expand Down
5 changes: 3 additions & 2 deletions azure_functions_worker/utils/app_setting_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39,
PYTHON_ENABLE_DEBUG_LOGGING,
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED,
PYTHON_SCRIPT_FILE_NAME)
PYTHON_SCRIPT_FILE_NAME, PYTHON_ENABLE_INIT_INDEXING)


def get_python_appsetting_state():
Expand All @@ -23,7 +23,8 @@ def get_python_appsetting_state():
PYTHON_ENABLE_DEBUG_LOGGING,
PYTHON_ENABLE_WORKER_EXTENSIONS,
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED,
PYTHON_SCRIPT_FILE_NAME]
PYTHON_SCRIPT_FILE_NAME,
PYTHON_ENABLE_INIT_INDEXING]

app_setting_states = "".join(
f"{app_setting}: {current_vars[app_setting]} | "
Expand Down
1 change: 0 additions & 1 deletion azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,3 @@ def validate_script_file_name(file_name: str):
pattern = re.compile(r'^[a-zA-Z0-9_][a-zA-Z0-9_\-]*\.py$')
if not pattern.match(file_name):
raise InvalidFileNameError(file_name)
return True
41 changes: 29 additions & 12 deletions tests/consumption_tests/test_linux_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from requests import Request

from azure_functions_worker.constants import PYTHON_ENABLE_INIT_INDEXING, \
PYTHON_ENABLE_WORKER_EXTENSIONS, PYTHON_ISOLATE_WORKER_DEPENDENCIES, \
PYTHON_ENABLE_DEBUG_LOGGING
from tests.utils.testutils_lc import (
LinuxConsumptionWebHostController
)
Expand Down Expand Up @@ -107,7 +110,7 @@ def test_new_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("NewProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -137,7 +140,7 @@ def test_old_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("OldProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -189,7 +192,7 @@ def test_debug_logging_enabled(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"EnableDebugLogging"),
"PYTHON_ENABLE_DEBUG_LOGGING": "1"
PYTHON_ENABLE_DEBUG_LOGGING: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -218,7 +221,7 @@ def test_pinning_functions_to_older_version(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"PinningFunctions"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1",
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand All @@ -232,8 +235,7 @@ def test_opencensus_with_extensions_enabled(self):
"""A function app with extensions enabled containing the
following libraries:

azure-functions, azure-eventhub, azure-storage-blob, numpy,
cryptography, pyodbc, requests
azure-functions, opencensus

should return 200 after importing all libraries.
"""
Expand All @@ -242,8 +244,25 @@ def test_opencensus_with_extensions_enabled(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
"PYTHON_ENABLE_WORKER_EXTENSIONS": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ENABLE_WORKER_EXTENSIONS: "1"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
self.assertEqual(resp.status_code, 200)

@skipIf(sys.version_info.minor != 10,
"This is testing only for python310")
def test_opencensus_with_extensions_enabled_init_indexing(self):
"""
A function app with init indexing enabled
"""
with LinuxConsumptionWebHostController(_DEFAULT_HOST_VERSION,
self._py_version) as ctrl:
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
PYTHON_ENABLE_WORKER_EXTENSIONS: "1",
PYTHON_ENABLE_INIT_INDEXING: "true"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
Expand All @@ -263,8 +282,7 @@ def test_reload_variables_after_timeout_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"TimeoutError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/hello')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -297,8 +315,7 @@ def test_reload_variables_after_oom_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"OOMError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/httptrigger')
resp = ctrl.send_request(req)
Expand Down
Loading