Skip to content

Commit d6a9fb6

Browse files
committed
Change the logger handler to hold the thread pool instead of the dispatcher
1 parent b541720 commit d6a9fb6

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

azure_functions_worker/dispatcher.py

+16-14
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
9292
self._grpc_connected_fut = loop.create_future()
9393
self._grpc_thread: threading.Thread = threading.Thread(
9494
name='grpc-thread', target=self.__poll_grpc)
95-
self._logging_executor: concurrent.futures.ThreadPoolExecutor \
96-
= concurrent.futures.ThreadPoolExecutor(
97-
max_workers=1,
98-
thread_name_prefix="dispatch_logging")
9995

10096
def get_sync_tp_workers_set(self):
10197
"""We don't know the exact value of the threadcount set for the Python
@@ -376,12 +372,10 @@ async def _handle__function_load_request(self, request):
376372
func_request.metadata.directory
377373
)
378374

379-
self._logging_executor.submit(
380-
logger.info,
381-
'Successfully processed FunctionLoadRequest, '
382-
f'request ID: {self.request_id}, '
383-
f'function ID: {function_id},'
384-
f'function Name: {function_name}')
375+
logger.info('Successfully processed FunctionLoadRequest, '
376+
f'request ID: {self.request_id}, '
377+
f'function ID: {function_id},'
378+
f'function Name: {function_name}')
385379

386380
return protos.StreamingMessage(
387381
request_id=self.request_id,
@@ -429,9 +423,7 @@ async def _handle__invocation_request(self, request):
429423
f'{self.get_sync_tp_workers_set()}'
430424
)
431425

432-
self._logging_executor.submit(
433-
logger.info,
434-
', '.join(function_invocation_logs))
426+
logger.info(', '.join(function_invocation_logs))
435427

436428
args = {}
437429
for pb in invoc_request.input_data:
@@ -760,6 +752,13 @@ def gen(resp_queue):
760752

761753
class AsyncLoggingHandler(logging.Handler):
762754

755+
def __init__(self, *args, **kwargs):
756+
self._logging_tp: concurrent.futures.ThreadPoolExecutor = \
757+
concurrent.futures.ThreadPoolExecutor(
758+
max_workers=1,
759+
thread_name_prefix="logging")
760+
super().__init__(*args, **kwargs)
761+
763762
def emit(self, record: LogRecord) -> None:
764763
# Since we disable console log after gRPC channel is initiated,
765764
# we should redirect all the messages into dispatcher.
@@ -770,7 +769,10 @@ def emit(self, record: LogRecord) -> None:
770769
# buffered in this handler, not calling the emit yet.
771770
msg = self.format(record)
772771
try:
773-
Dispatcher.current.on_logging(record, msg)
772+
self._logging_tp.submit(
773+
Dispatcher.current.on_logging,
774+
record,
775+
msg)
774776
except RuntimeError as runtime_error:
775777
# This will cause 'Dispatcher not found' failure.
776778
# Logging such of an issue will cause infinite loop of gRPC logging

0 commit comments

Comments
 (0)