diff --git a/azure-pipelines.yml b/azure-pipelines.yml index fd304d961..057a2cf5e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -9,8 +9,8 @@ variables: DOTNET_VERSION: '2.2.207' schedules: -- cron: "0 0 * * *" - displayName: Daily midnight build +- cron: "0 8 * * 1,2,3,4" + displayName: Monday to Thursday 1 AM build branches: include: - dev diff --git a/azure_functions_worker/_thirdparty/aio_compat.py b/azure_functions_worker/_thirdparty/aio_compat.py index 4630f41e1..f4a28bb60 100644 --- a/azure_functions_worker/_thirdparty/aio_compat.py +++ b/azure_functions_worker/_thirdparty/aio_compat.py @@ -1,7 +1,7 @@ """Backport of asyncio.run() function from Python 3.7. Source: https://github.com/python/cpython/blob/ - bd093355a6aaf2f4ca3ed153e195da57870a55eb/Lib/asyncio/runners.py + bd093355a6aaf2f4ca3ed153e195da57870a55eb/Lib/asyncio/runners.py """ diff --git a/azure_functions_worker/bindings/generic.py b/azure_functions_worker/bindings/generic.py index 78710a051..9d0cca8af 100644 --- a/azure_functions_worker/bindings/generic.py +++ b/azure_functions_worker/bindings/generic.py @@ -3,6 +3,7 @@ import typing from . import datumdef +from typing import Any, Optional class GenericBinding: @@ -20,8 +21,8 @@ def check_output_type_annotation(cls, pytype: type) -> bool: return issubclass(pytype, (str, bytes, bytearray)) @classmethod - def encode(cls, obj: typing.Any, *, - expected_type: typing.Optional[type]) -> datumdef.Datum: + def encode(cls, obj: Any, *, + expected_type: Optional[type]) -> datumdef.Datum: if isinstance(obj, str): return datumdef.Datum(type='string', value=obj) diff --git a/azure_functions_worker/bindings/out.py b/azure_functions_worker/bindings/out.py index a95a35f15..53ac0199d 100644 --- a/azure_functions_worker/bindings/out.py +++ b/azure_functions_worker/bindings/out.py @@ -4,11 +4,11 @@ class Out: - def __init__(self): + def __init__(self) -> None: self.__value = None def set(self, val): self.__value = val - def get(self): + def get(self) -> str: return self.__value diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 370240d8e..828a36326 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -16,7 +16,6 @@ import inspect import grpc -import pkg_resources from . import bindings from . import functions @@ -28,6 +27,9 @@ from .logging import enable_console_logging, disable_console_logging from .utils.tracing import marshall_exception_trace from .utils.wrappers import disable_feature_by +from asyncio.unix_events import _UnixSelectorEventLoop +from logging import LogRecord +from typing import Optional class DispatcherMeta(type): @@ -46,8 +48,10 @@ class Dispatcher(metaclass=DispatcherMeta): _GRPC_STOP_RESPONSE = object() - def __init__(self, loop, host, port: int, worker_id: str, request_id: str, - grpc_connect_timeout: float, grpc_max_msg_len: int = -1): + def __init__(self, loop: _UnixSelectorEventLoop, host: str, port: int, + worker_id: str, request_id: str, + grpc_connect_timeout: float, + grpc_max_msg_len: int = -1) -> None: self._loop = loop self._host = host self._port = port @@ -77,20 +81,9 @@ def __init__(self, loop, host, port: int, worker_id: str, request_id: str, self._grpc_thread = threading.Thread( name='grpc-thread', target=self.__poll_grpc) - @staticmethod - def load_bindings(): - """Load out-of-tree binding implementations.""" - services = {} - - for ep in pkg_resources.iter_entry_points('azure.functions.bindings'): - logger.info('Loading binding plugin from %s', ep.module_name) - ep.load() - - return services - @classmethod - async def connect(cls, host, port, worker_id, request_id, - connect_timeout): + async def connect(cls, host: str, port: int, worker_id: str, + request_id: str, connect_timeout: float): loop = asyncio.events.get_event_loop() disp = cls(loop, host, port, worker_id, request_id, connect_timeout) disp._grpc_thread.start() @@ -144,7 +137,7 @@ async def dispatch_forever(self): self._loop.set_task_factory(self._old_task_factory) self.stop() - def stop(self): + def stop(self) -> None: if self._grpc_thread is not None: self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE) self._grpc_thread.join() @@ -154,7 +147,7 @@ def stop(self): self._sync_call_tp.shutdown() self._sync_call_tp = None - def on_logging(self, record: logging.LogRecord, formatted_msg: str): + def on_logging(self, record: logging.LogRecord, formatted_msg: str) -> None: if record.levelno >= logging.CRITICAL: log_level = protos.RpcLog.Critical elif record.levelno >= logging.ERROR: @@ -196,11 +189,11 @@ def on_logging(self, record: logging.LogRecord, formatted_msg: str): rpc_log=protos.RpcLog(**log))) @property - def request_id(self): + def request_id(self) -> str: return self._request_id @property - def worker_id(self): + def worker_id(self) -> str: return self._worker_id # noinspection PyBroadException @@ -524,7 +517,7 @@ def gen(resp_queue): class AsyncLoggingHandler(logging.Handler): - def emit(self, record): + def emit(self, record: LogRecord) -> None: # Since we disable console log after gRPC channel is initiated # We should redirect all the messages into dispatcher msg = self.format(record) @@ -533,7 +526,7 @@ def emit(self, record): class ContextEnabledTask(asyncio.Task): - _AZURE_INVOCATION_ID = '__azure_function_invocation_id__' + AZURE_INVOCATION_ID = '__azure_function_invocation_id__' def __init__(self, coro, loop): super().__init__(coro, loop=loop) @@ -541,21 +534,22 @@ def __init__(self, coro, loop): current_task = asyncio.Task.current_task(loop) if current_task is not None: invocation_id = getattr( - current_task, self._AZURE_INVOCATION_ID, None) + current_task, self.AZURE_INVOCATION_ID, None) if invocation_id is not None: self.set_azure_invocation_id(invocation_id) - def set_azure_invocation_id(self, invocation_id): - setattr(self, self._AZURE_INVOCATION_ID, invocation_id) + def set_azure_invocation_id(self, invocation_id: str) -> None: + setattr(self, self.AZURE_INVOCATION_ID, invocation_id) -def get_current_invocation_id(): +def get_current_invocation_id() -> Optional[str]: loop = asyncio._get_running_loop() if loop is not None: current_task = asyncio.Task.current_task(loop) if current_task is not None: - task_invocation_id = getattr( - current_task, ContextEnabledTask._AZURE_INVOCATION_ID, None) + task_invocation_id = getattr(current_task, + ContextEnabledTask.AZURE_INVOCATION_ID, + None) if task_invocation_id is not None: return task_invocation_id diff --git a/azure_functions_worker/functions.py b/azure_functions_worker/functions.py index ea7633077..ee50f3701 100644 --- a/azure_functions_worker/functions.py +++ b/azure_functions_worker/functions.py @@ -32,7 +32,7 @@ class FunctionInfo(typing.NamedTuple): class FunctionLoadError(RuntimeError): - def __init__(self, function_name, msg): + def __init__(self, function_name: str, msg: str) -> None: super().__init__( f'cannot load the {function_name} function: {msg}') @@ -41,10 +41,10 @@ class Registry: _functions: typing.MutableMapping[str, FunctionInfo] - def __init__(self): + def __init__(self) -> None: self._functions = {} - def get_function(self, function_id: str): + def get_function(self, function_id: str) -> FunctionInfo: try: return self._functions[function_id] except KeyError: diff --git a/azure_functions_worker/loader.py b/azure_functions_worker/loader.py index c144db678..69ae97359 100644 --- a/azure_functions_worker/loader.py +++ b/azure_functions_worker/loader.py @@ -14,6 +14,7 @@ from .constants import MODULE_NOT_FOUND_TS_URL from .utils.wrappers import attach_message_to_exception +from os import PathLike, fspath _AZURE_NAMESPACE = '__app__' @@ -21,11 +22,11 @@ _submodule_dirs = [] -def register_function_dir(path: os.PathLike): - _submodule_dirs.append(os.fspath(path)) +def register_function_dir(path: PathLike) -> None: + _submodule_dirs.append(fspath(path)) -def install(): +def install() -> None: if _AZURE_NAMESPACE not in sys.modules: # Create and register the __app__ namespace package. ns_spec = importlib.machinery.ModuleSpec(_AZURE_NAMESPACE, None) @@ -34,7 +35,7 @@ def install(): sys.modules[_AZURE_NAMESPACE] = ns_pkg -def uninstall(): +def uninstall() -> None: pass diff --git a/azure_functions_worker/logging.py b/azure_functions_worker/logging.py index 95643c200..9abec63c7 100644 --- a/azure_functions_worker/logging.py +++ b/azure_functions_worker/logging.py @@ -50,7 +50,7 @@ def setup(log_level, log_destination): error_logger.setLevel(getattr(logging, log_level)) -def disable_console_logging(): +def disable_console_logging() -> None: if logger and handler: logger.removeHandler(handler) @@ -58,7 +58,7 @@ def disable_console_logging(): error_logger.removeHandler(error_handler) -def enable_console_logging(): +def enable_console_logging() -> None: if logger and handler: logger.addHandler(handler) @@ -66,7 +66,7 @@ def enable_console_logging(): error_logger.addHandler(error_handler) -def is_system_log_category(ctg: str): +def is_system_log_category(ctg: str) -> bool: return any( [ctg.lower().startswith(c) for c in ( 'azure_functions_worker', diff --git a/azure_functions_worker/main.py b/azure_functions_worker/main.py index 59cb7b068..2d5ad2f14 100644 --- a/azure_functions_worker/main.py +++ b/azure_functions_worker/main.py @@ -49,6 +49,4 @@ async def start_async(host, port, worker_id, request_id): host, port, worker_id, request_id, connect_timeout=5.0) - disp.load_bindings() - await disp.dispatch_forever() diff --git a/azure_functions_worker/testutils.py b/azure_functions_worker/testutils.py index 8f6a2e21a..43039ebe6 100644 --- a/azure_functions_worker/testutils.py +++ b/azure_functions_worker/testutils.py @@ -15,9 +15,10 @@ import json import logging import os -import queue import pathlib import platform +import queue +import re import shutil import socket import subprocess @@ -27,7 +28,6 @@ import typing import unittest import uuid -import re import grpc import requests @@ -35,9 +35,8 @@ from azure_functions_worker._thirdparty import aio_compat from . import dispatcher from . import protos -from .utils.common import is_envvar_true from .constants import PYAZURE_WEBHOST_DEBUG - +from .utils.common import is_envvar_true PROJECT_ROOT = pathlib.Path(__file__).parent.parent TESTS_ROOT = PROJECT_ROOT / 'tests' @@ -410,10 +409,10 @@ async def communicate(self, message, *, wait_for): self._in_queue.put_nowait((message, wait_for)) return await self._out_aqueue.get() - async def _start(self): + async def start(self): self._server.start() - async def _close(self): + async def close(self): self._in_queue.put_nowait((_MockWebHostServicer._STOP, None)) self._server.stop(1) @@ -457,20 +456,18 @@ async def __aenter__(self): loop = aio_compat.get_running_loop() self._host = _MockWebHost(loop, self._scripts_dir) - await self._host._start() - - self._worker = await dispatcher.Dispatcher.connect( - '127.0.0.1', self._host._port, - self._host.worker_id, self._host.request_id, - connect_timeout=5.0) + await self._host.start() - self._worker.load_bindings() + self._worker = await dispatcher. \ + Dispatcher.connect('127.0.0.1', self._host._port, + self._host.worker_id, + self._host.request_id, connect_timeout=5.0) self._worker_task = loop.create_task(self._worker.dispatch_forever()) - done, pending = await asyncio.wait( - [self._host._connected_fut, self._worker_task], - return_when=asyncio.FIRST_COMPLETED) + done, pending = await asyncio. \ + wait([self._host._connected_fut, self._worker_task], + return_when=asyncio.FIRST_COMPLETED) try: if self._worker_task in done: @@ -480,7 +477,7 @@ async def __aenter__(self): raise RuntimeError('could not start a worker thread') except Exception: try: - self._host._close() + await self._host.close() self._worker.stop() finally: raise @@ -498,7 +495,7 @@ async def __aexit__(self, *exc): self._worker_task = None self._worker = None - await self._host._close() + await self._host.close() self._host = None diff --git a/azure_functions_worker/utils/common.py b/azure_functions_worker/utils/common.py index ba9404722..c3bd6fd1e 100644 --- a/azure_functions_worker/utils/common.py +++ b/azure_functions_worker/utils/common.py @@ -3,14 +3,14 @@ import os -def is_true_like(setting: str): +def is_true_like(setting: str) -> bool: if setting is None: return False return setting.lower().strip() in ['1', 'true', 't', 'yes', 'y'] -def is_envvar_true(env_key: str): +def is_envvar_true(env_key: str) -> bool: if os.getenv(env_key) is None: return False diff --git a/azure_functions_worker/utils/tracing.py b/azure_functions_worker/utils/tracing.py index 42a97e062..f458e4d95 100644 --- a/azure_functions_worker/utils/tracing.py +++ b/azure_functions_worker/utils/tracing.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. from typing import List import traceback +from traceback import StackSummary, extract_tb def extend_exception_message(exc: Exception, msg: str) -> Exception: @@ -16,28 +17,23 @@ def extend_exception_message(exc: Exception, msg: str) -> Exception: def marshall_exception_trace(exc: Exception) -> str: - stack_summary: traceback.StackSummary = traceback.extract_tb( - exc.__traceback__) + stack_summary: StackSummary = extract_tb(exc.__traceback__) if isinstance(exc, ModuleNotFoundError): stack_summary = _marshall_module_not_found_error(stack_summary) return ''.join(stack_summary.format()) -def _marshall_module_not_found_error( - tbss: traceback.StackSummary -) -> traceback.StackSummary: +def _marshall_module_not_found_error(tbss: StackSummary) -> StackSummary: tbss = _remove_frame_from_stack(tbss, '') tbss = _remove_frame_from_stack( tbss, '') return tbss -def _remove_frame_from_stack( - tbss: traceback.StackSummary, - framename: str -) -> traceback.StackSummary: - filtered_stack_list: List[traceback.FrameSummary] = list( - filter(lambda frame: getattr(frame, 'filename') != framename, tbss)) - filtered_stack: traceback.StackSummary = traceback.StackSummary.from_list( - filtered_stack_list) +def _remove_frame_from_stack(tbss: StackSummary, + framename: str) -> StackSummary: + filtered_stack_list: List[traceback.FrameSummary] = \ + list(filter(lambda frame: getattr(frame, + 'filename') != framename, tbss)) + filtered_stack: StackSummary = StackSummary.from_list(filtered_stack_list) return filtered_stack diff --git a/azure_functions_worker/utils/wrappers.py b/azure_functions_worker/utils/wrappers.py index 7314c5d5c..9c3f92c6b 100644 --- a/azure_functions_worker/utils/wrappers.py +++ b/azure_functions_worker/utils/wrappers.py @@ -2,9 +2,10 @@ # Licensed under the MIT License. from .common import is_envvar_true from .tracing import extend_exception_message +from typing import Callable, Optional -def enable_feature_by(flag: str, default=None): +def enable_feature_by(flag: str, default: Optional[int] = None) -> Callable: def decorate(func): def call(*args, **kwargs): if is_envvar_true(flag): @@ -14,7 +15,7 @@ def call(*args, **kwargs): return decorate -def disable_feature_by(flag: str, default=None): +def disable_feature_by(flag: str, default: None = None) -> Callable: def decorate(func): def call(*args, **kwargs): if not is_envvar_true(flag): @@ -24,7 +25,7 @@ def call(*args, **kwargs): return decorate -def attach_message_to_exception(expt_type: Exception, message: str): +def attach_message_to_exception(expt_type: Exception, message: str) -> Callable: def decorate(func): def call(*args, **kwargs): try: