Skip to content

Commit a695ba3

Browse files
committed
feedback
1 parent e85ea9e commit a695ba3

File tree

3 files changed

+75
-66
lines changed

3 files changed

+75
-66
lines changed

azure_functions_worker/dispatcher.py

+13-20
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
PYTHON_SCRIPT_FILE_NAME_DEFAULT,
3232
PYTHON_LANGUAGE_RUNTIME, PYTHON_ENABLE_INIT_INDEXING,
3333
METADATA_PROPERTIES_WORKER_INDEXED)
34+
from .exceptions import HttpServerInitError
3435
from .extension import ExtensionManager
3536
from .http_v2 import http_coordinator, initialize_http_server, HttpV2Registry, \
3637
sync_http_request
@@ -112,8 +113,7 @@ def get_sync_tp_workers_set(self):
112113
3.9 scenarios (as we'll start passing only None by default), and we
113114
need to get that information.
114115
115-
Ref: concurrent.futures.thread.ThreadPoolExecutor.__init__
116-
._max_workers
116+
Ref: concurrent.futures.thread.ThreadPoolExecutor.__init__._max_workers
117117
"""
118118
return self._sync_call_tp._max_workers
119119

@@ -310,32 +310,23 @@ async def _handle__worker_init_request(self, request):
310310
self.load_function_metadata(
311311
worker_init_request.function_app_directory,
312312
caller_info="worker_init_request")
313-
except Exception as ex:
314-
self._function_metadata_exception = ex
315313

316-
try:
317314
if HttpV2Registry.http_v2_enabled():
318315
capabilities[constants.HTTP_URI] = \
319316
initialize_http_server(self._host)
317+
318+
except HttpServerInitError:
319+
raise
320320
except Exception as ex:
321-
return protos.StreamingMessage(
322-
request_id=self.request_id,
323-
worker_init_response=protos.WorkerInitResponse(
324-
result=protos.StatusResult(
325-
status=protos.StatusResult.Failure,
326-
exception=self._serialize_exception(ex))
327-
)
328-
)
321+
self._function_metadata_exception = ex
329322

330323
return protos.StreamingMessage(
331324
request_id=self.request_id,
332325
worker_init_response=protos.WorkerInitResponse(
333326
capabilities=capabilities,
334327
worker_metadata=self.get_worker_metadata(),
335328
result=protos.StatusResult(
336-
status=protos.StatusResult.Success),
337-
),
338-
)
329+
status=protos.StatusResult.Success)))
339330

340331
async def _handle__worker_status_request(self, request):
341332
# Logging is not necessary in this request since the response is used
@@ -686,13 +677,15 @@ async def _handle__function_environment_reload_request(self, request):
686677
self.load_function_metadata(
687678
directory,
688679
caller_info="environment_reload_request")
680+
681+
if HttpV2Registry.http_v2_enabled():
682+
capabilities[constants.HTTP_URI] = \
683+
initialize_http_server(self._host)
684+
except HttpServerInitError:
685+
raise
689686
except Exception as ex:
690687
self._function_metadata_exception = ex
691688

692-
if HttpV2Registry.http_v2_enabled():
693-
capabilities[constants.HTTP_URI] = \
694-
initialize_http_server(self._host)
695-
696689
# Change function app directory
697690
if getattr(func_env_reload_request,
698691
'function_app_directory', None):

azure_functions_worker/exceptions.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# http v2 exception types
2+
class HttpServerInitError(Exception):
3+
"""Exception raised when there is an error during HTTP server
4+
initialization."""
5+
6+
7+
class MissingHeaderError(ValueError):
8+
"""Exception raised when a required header is missing in the
9+
HTTP request."""

azure_functions_worker/http_v2.py

+53-46
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
from azure_functions_worker.constants import X_MS_INVOCATION_ID, \
99
BASE_EXT_SUPPORTED_PY_MINOR_VERSION, PYTHON_ENABLE_INIT_INDEXING
10+
from azure_functions_worker.exceptions import MissingHeaderError, \
11+
HttpServerInitError
1012
from azure_functions_worker.logging import logger
1113
from azure_functions_worker.utils.common import is_envvar_false
1214

@@ -113,25 +115,24 @@ def set_http_request(self, invoc_id, http_request):
113115

114116
def set_http_response(self, invoc_id, http_response):
115117
if invoc_id not in self._context_references:
116-
raise Exception("No context reference found for invocation "
117-
f"{invoc_id}")
118+
raise KeyError("No context reference found for invocation %s"
119+
% invoc_id)
118120
context_ref = self._context_references.get(invoc_id)
119121
context_ref.http_response = http_response
120122

121123
async def get_http_request_async(self, invoc_id):
122124
if invoc_id not in self._context_references:
123125
self._context_references[invoc_id] = AsyncContextReference()
124126

125-
await asyncio.sleep(0)
126127
await self._context_references.get(
127128
invoc_id).http_request_available_event.wait()
128129
return self._pop_http_request(invoc_id)
129130

130131
async def await_http_response_async(self, invoc_id):
131132
if invoc_id not in self._context_references:
132-
raise Exception("No context reference found for invocation "
133-
f"{invoc_id}")
134-
await asyncio.sleep(0)
133+
raise KeyError("No context reference found for invocation %s"
134+
% invoc_id)
135+
135136
await self._context_references.get(
136137
invoc_id).http_response_available_event.wait()
137138
return self._pop_http_response(invoc_id)
@@ -143,7 +144,7 @@ def _pop_http_request(self, invoc_id):
143144
context_ref.http_request = None
144145
return request
145146

146-
raise ValueError(f"No http request found for invocation {invoc_id}")
147+
raise ValueError("No http request found for invocation %s" % invoc_id)
147148

148149
def _pop_http_response(self, invoc_id):
149150
context_ref = self._context_references.get(invoc_id)
@@ -152,7 +153,7 @@ def _pop_http_response(self, invoc_id):
152153
context_ref.http_response = None
153154
return response
154155

155-
raise ValueError(f"No http response found for invocation {invoc_id}")
156+
raise ValueError("No http response found for invocation %s" % invoc_id)
156157

157158

158159
def get_unused_tcp_port():
@@ -169,44 +170,50 @@ def get_unused_tcp_port():
169170

170171

171172
def initialize_http_server(host_addr, **kwargs):
172-
ext_base = HttpV2Registry.ext_base()
173-
web_extension_mod_name = ext_base.ModuleTrackerMeta.get_module()
174-
extension_module = importlib.import_module(web_extension_mod_name)
175-
web_app_class = extension_module.WebApp
176-
web_server_class = extension_module.WebServer
177-
178-
unused_port = get_unused_tcp_port()
179-
180-
app = web_app_class()
181-
request_type = ext_base.RequestTrackerMeta.get_request_type()
182-
183-
@app.route
184-
async def catch_all(request: request_type): # type: ignore
185-
invoc_id = request.headers.get(X_MS_INVOCATION_ID)
186-
if invoc_id is None:
187-
raise ValueError(f"Header {X_MS_INVOCATION_ID} not found")
188-
logger.info('Received HTTP request for invocation %s', invoc_id)
189-
http_coordinator.set_http_request(invoc_id, request)
190-
http_resp = \
191-
await http_coordinator.await_http_response_async(invoc_id)
192-
193-
logger.info('Sending HTTP response for invocation %s', invoc_id)
194-
# if http_resp is an python exception, raise it
195-
if isinstance(http_resp, Exception):
196-
raise http_resp
197-
198-
return http_resp
199-
200-
web_server = web_server_class(host_addr, unused_port, app)
201-
web_server_run_task = web_server.serve()
202-
203-
loop = asyncio.get_event_loop()
204-
loop.create_task(web_server_run_task)
205-
206-
web_server_address = f"http://{host_addr}:{unused_port}"
207-
logger.info('HTTP server starting on %s', web_server_address)
208-
209-
return web_server_address
173+
try:
174+
ext_base = HttpV2Registry.ext_base()
175+
web_extension_mod_name = ext_base.ModuleTrackerMeta.get_module()
176+
extension_module = importlib.import_module(web_extension_mod_name)
177+
web_app_class = extension_module.WebApp
178+
web_server_class = extension_module.WebServer
179+
180+
unused_port = get_unused_tcp_port()
181+
182+
app = web_app_class()
183+
request_type = ext_base.RequestTrackerMeta.get_request_type()
184+
185+
@app.route
186+
async def catch_all(request: request_type): # type: ignore
187+
invoc_id = request.headers.get(X_MS_INVOCATION_ID)
188+
if invoc_id is None:
189+
raise MissingHeaderError("Header %s not found" %
190+
X_MS_INVOCATION_ID)
191+
logger.info('Received HTTP request for invocation %s', invoc_id)
192+
http_coordinator.set_http_request(invoc_id, request)
193+
http_resp = \
194+
await http_coordinator.await_http_response_async(invoc_id)
195+
196+
logger.info('Sending HTTP response for invocation %s', invoc_id)
197+
# if http_resp is an python exception, raise it
198+
if isinstance(http_resp, Exception):
199+
raise http_resp
200+
201+
return http_resp
202+
203+
web_server = web_server_class(host_addr, unused_port, app)
204+
web_server_run_task = web_server.serve()
205+
206+
loop = asyncio.get_event_loop()
207+
loop.create_task(web_server_run_task)
208+
209+
web_server_address = f"http://{host_addr}:{unused_port}"
210+
logger.info('HTTP server starting on %s', web_server_address)
211+
212+
return web_server_address
213+
214+
except Exception as e:
215+
raise HttpServerInitError("Error initializing HTTP server: %s" % e) \
216+
from e
210217

211218

212219
async def sync_http_request(http_request, invoc_request):

0 commit comments

Comments
 (0)