Skip to content

Moving function indexing to init request #1446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 25, 2024
3 changes: 3 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@

# Paths
CUSTOMER_PACKAGES_PATH = "/home/site/wwwroot/.python_packages/lib/site-packages"

# Flag to index functions in handle init request
ENABLE_INIT_INDEXING = "ENABLE_INIT_INDEXING"
105 changes: 64 additions & 41 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PYTHON_ENABLE_DEBUG_LOGGING,
PYTHON_SCRIPT_FILE_NAME,
PYTHON_SCRIPT_FILE_NAME_DEFAULT,
PYTHON_LANGUAGE_RUNTIME)
PYTHON_LANGUAGE_RUNTIME, ENABLE_INIT_INDEXING)
from .extension import ExtensionManager
from .logging import disable_console_logging, enable_console_logging
from .logging import (logger, error_logger, is_system_log_category,
Expand Down Expand Up @@ -72,9 +72,12 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._function_data_cache_enabled = False
self._functions = functions.Registry()
self._shmem_mgr = SharedMemoryManager()

self._old_task_factory = None

# Used to store metadata returns
self.function_metadata_result = None
self.function_metadata_exception = None

# We allow the customer to change synchronous thread pool max worker
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
# For 3.[6|7|8] The default value is 1.
Expand Down Expand Up @@ -273,6 +276,7 @@ async def _handle__worker_init_request(self, request):
)

worker_init_request = request.worker_init_request
directory = worker_init_request.function_app_directory
host_capabilities = worker_init_request.capabilities
if constants.FUNCTION_DATA_CACHE in host_capabilities:
val = host_capabilities[constants.FUNCTION_DATA_CACHE]
Expand All @@ -297,6 +301,11 @@ async def _handle__worker_init_request(self, request):
# dictionary which will be later used in the invocation request
bindings.load_binding_registry()

if is_envvar_true(ENABLE_INIT_INDEXING):
self.get_function_metadata(
directory,
caller_info=sys._getframe().f_code.co_name)

return protos.StreamingMessage(
request_id=self.request_id,
worker_init_response=protos.WorkerInitResponse(
Expand All @@ -313,82 +322,92 @@ async def _handle__worker_status_request(self, request):
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())

async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
def get_function_metadata(self, directory, caller_info):
"""
This method is called to index the functions in the function app
directory and save the results in function_metadata_result or
function_metadata_exception in case of an exception.
"""
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
function_path = os.path.join(directory, script_file_name)

logger.info(
'Received WorkerMetadataRequest, request ID %s, function_path: %s',
self.request_id, function_path)
'Received WorkerMetadataRequest from %s, request ID %s, '
'script_file_name: %s',
caller_info, self.request_id, script_file_name)

try:
validate_script_file_name(script_file_name)
function_path = os.path.join(directory, script_file_name)

if not os.path.exists(function_path):
# Fallback to legacy model
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=True,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
self.function_metadata_result = (
self.index_functions(function_path)) \
if os.path.exists(function_path) else None

fx_metadata_results = self.index_functions(function_path)
except Exception as ex:
self.function_metadata_exception = self._serialize_exception(ex)

async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory

if not is_envvar_true(ENABLE_INIT_INDEXING):
self.get_function_metadata(
directory,
caller_info=sys._getframe().f_code.co_name)

if self.function_metadata_exception:
return protos.StreamingMessage(
request_id=request.request_id,
request_id=self.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
status=protos.StatusResult.Failure,
exception=self.function_metadata_exception)))
else:
metadata_result = self.function_metadata_result

except Exception as ex:
return protos.StreamingMessage(
request_id=self.request_id,
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=False if metadata_result else
True,
function_metadata_results=metadata_result,
result=protos.StatusResult(
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
status=protos.StatusResult.Success)))

async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_metadata = func_request.metadata
function_name = function_metadata.name
directory = function_metadata.directory

logger.info(
'Received WorkerLoadRequest, request ID %s, function_id: %s,'
'function_name: %s,', self.request_id, function_id, function_name)
'function_name: %s,',
self.request_id, function_id, function_name)

programming_model = "V2"
try:
if not self._functions.get_function(function_id):
script_file_name = get_app_setting(
setting=PYTHON_SCRIPT_FILE_NAME,
default_value=f'{PYTHON_SCRIPT_FILE_NAME_DEFAULT}')
validate_script_file_name(script_file_name)
function_path = os.path.join(
function_metadata.directory,
script_file_name)

if function_metadata.properties.get("worker_indexed", False) \
or os.path.exists(function_path):

if function_metadata.properties.get("worker_indexed", False)\
and not is_envvar_true(ENABLE_INIT_INDEXING):
# This is for the second worker and above where the worker
# indexing is enabled and load request is called without
# calling the metadata request. In this case we index the
# function and update the workers registry
_ = self.index_functions(function_path)

self.get_function_metadata(directory,
sys._getframe().f_code.co_name)
else:
# legacy function
programming_model = "V1"

func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
function_name,
directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)

Expand Down Expand Up @@ -562,6 +581,7 @@ async def _handle__function_environment_reload_request(self, request):

func_env_reload_request = \
request.function_environment_reload_request
directory = func_env_reload_request.function_app_directory

# Append function project root to module finding sys.path
if func_env_reload_request.function_app_directory:
Expand All @@ -587,14 +607,17 @@ async def _handle__function_environment_reload_request(self, request):
root_logger.setLevel(logging.DEBUG)

# Reload azure google namespaces
DependencyManager.reload_customer_libraries(
func_env_reload_request.function_app_directory
)
DependencyManager.reload_customer_libraries(directory)

# calling load_binding_registry again since the
# reload_customer_libraries call clears the registry
bindings.load_binding_registry()

if is_envvar_true(ENABLE_INIT_INDEXING):
self.get_function_metadata(
directory,
caller_info=sys._getframe().f_code.co_name)

# Change function app directory
if getattr(func_env_reload_request,
'function_app_directory', None):
Expand Down
1 change: 1 addition & 0 deletions azure_functions_worker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def load_function(name: str, directory: str, script_file: str,
f'{os.path.exists(CUSTOMER_PACKAGES_PATH)}')
def index_function_app(function_path: str):
module_name = pathlib.Path(function_path).stem
logger.info(f'Loading module {module_name}')
imported_module = importlib.import_module(module_name)

from azure.functions import FunctionRegister
Expand Down
1 change: 0 additions & 1 deletion azure_functions_worker/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,3 @@ def validate_script_file_name(file_name: str):
pattern = re.compile(r'^[a-zA-Z0-9_][a-zA-Z0-9_\-]*\.py$')
if not pattern.match(file_name):
raise InvalidFileNameError(file_name)
return True
41 changes: 29 additions & 12 deletions tests/consumption_tests/test_linux_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

from requests import Request

from azure_functions_worker.constants import ENABLE_INIT_INDEXING, \
PYTHON_ENABLE_WORKER_EXTENSIONS, PYTHON_ISOLATE_WORKER_DEPENDENCIES, \
PYTHON_ENABLE_DEBUG_LOGGING
from tests.utils.testutils_lc import (
LinuxConsumptionWebHostController
)
Expand Down Expand Up @@ -107,7 +110,7 @@ def test_new_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("NewProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -137,7 +140,7 @@ def test_old_protobuf(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("OldProtobuf"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -189,7 +192,7 @@ def test_debug_logging_enabled(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"EnableDebugLogging"),
"PYTHON_ENABLE_DEBUG_LOGGING": "1"
PYTHON_ENABLE_DEBUG_LOGGING: "1"
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -218,7 +221,7 @@ def test_pinning_functions_to_older_version(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"PinningFunctions"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1",
})
req = Request('GET', f'{ctrl.url}/api/HttpTrigger1')
resp = ctrl.send_request(req)
Expand All @@ -232,8 +235,7 @@ def test_opencensus_with_extensions_enabled(self):
"""A function app with extensions enabled containing the
following libraries:

azure-functions, azure-eventhub, azure-storage-blob, numpy,
cryptography, pyodbc, requests
azure-functions, opencensus

should return 200 after importing all libraries.
"""
Expand All @@ -242,8 +244,25 @@ def test_opencensus_with_extensions_enabled(self):
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
"PYTHON_ENABLE_WORKER_EXTENSIONS": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ENABLE_WORKER_EXTENSIONS: "1"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
self.assertEqual(resp.status_code, 200)

@skipIf(sys.version_info.minor != 10,
"This is testing only for python310")
def test_opencensus_with_extensions_enabled_init_indexing(self):
"""
A function app with init indexing enabled
"""
with LinuxConsumptionWebHostController(_DEFAULT_HOST_VERSION,
self._py_version) as ctrl:
ctrl.assign_container(env={
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("Opencensus"),
PYTHON_ENABLE_WORKER_EXTENSIONS: "1",
ENABLE_INIT_INDEXING: "true"
})
req = Request('GET', f'{ctrl.url}/api/opencensus')
resp = ctrl.send_request(req)
Expand All @@ -263,8 +282,7 @@ def test_reload_variables_after_timeout_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"TimeoutError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/hello')
resp = ctrl.send_request(req)
Expand Down Expand Up @@ -297,8 +315,7 @@ def test_reload_variables_after_oom_error(self):
"AzureWebJobsStorage": self._storage,
"SCM_RUN_FROM_PACKAGE": self._get_blob_url(
"OOMError"),
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
})
req = Request('GET', f'{ctrl.url}/api/httptrigger')
resp = ctrl.send_request(req)
Expand Down
28 changes: 28 additions & 0 deletions tests/endtoend/test_http_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,34 @@ def get_script_dir(cls):
'common_libs_functions_stein'


class TestHttpFunctionsWithInitIndexing(TestHttpFunctions):

@classmethod
def setUpClass(cls):
os.environ["INIT_INDEXING"] = "1"
super().setUpClass()

@classmethod
def tearDownClass(cls):
# Remove the PYTHON_SCRIPT_FILE_NAME environment variable
os.environ.pop('INIT_INDEXING')
super().tearDownClass()


class TestHttpFunctionsV2WithInitIndexing(TestHttpFunctionsStein):

@classmethod
def setUpClass(cls):
os.environ["INIT_INDEXING"] = "1"
super().setUpClass()

@classmethod
def tearDownClass(cls):
# Remove the PYTHON_SCRIPT_FILE_NAME environment variable
os.environ.pop('INIT_INDEXING')
super().tearDownClass()


class TestUserThreadLoggingHttpFunctions(testutils.WebHostTestCase):
"""Test the Http trigger that contains logging with user threads.

Expand Down
9 changes: 9 additions & 0 deletions tests/endtoend/test_warmup_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
# Licensed under the MIT License.

import typing
from unittest import skipIf

from azure_functions_worker.utils.common import is_envvar_true
from tests.utils import testutils
from tests.utils.constants import DEDICATED_DOCKER_TEST, CONSUMPTION_DOCKER_TEST


@skipIf(is_envvar_true(DEDICATED_DOCKER_TEST)
or is_envvar_true(CONSUMPTION_DOCKER_TEST),
"Docker tests cannot call admin functions")
class TestWarmupFunctions(testutils.WebHostTestCase):
"""Test the Warmup Trigger in the local webhost.

Expand All @@ -29,6 +35,9 @@ def check_log_warmup(self, host_out: typing.List[str]):
self.assertEqual(host_out.count("Function App instance is warm"), 1)


@skipIf(is_envvar_true(DEDICATED_DOCKER_TEST)
or is_envvar_true(CONSUMPTION_DOCKER_TEST),
"Docker tests cannot call admin functions")
class TestWarmupFunctionsStein(TestWarmupFunctions):

@classmethod
Expand Down
Loading