Skip to content

Commit 9922b2c

Browse files
committed
final changes and tests
1 parent efc02f3 commit 9922b2c

File tree

19 files changed

+1261
-898
lines changed

19 files changed

+1261
-898
lines changed

azure_functions_worker/constants.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
METADATA_PROPERTIES_WORKER_INDEXED = "worker_indexed"
6464

6565
# HostNames
66-
LOCAL_HOST = "localhost"
66+
LOCAL_HOST = "127.0.0.1"
6767

6868
# Header names
6969
X_MS_INVOCATION_ID = "x-ms-invocation-id"

azure_functions_worker/dispatcher.py

+36-19
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
9191
self._shmem_mgr = SharedMemoryManager()
9292
self._old_task_factory = None
9393
self.function_metadata_result = None
94+
self._has_http_func = False
9495

9596
# Used to store metadata returns
9697
self._function_metadata_result = None
@@ -516,6 +517,7 @@ async def _handle__function_load_request(self, request):
516517
async def _handle__invocation_request(self, request):
517518
invocation_time = datetime.utcnow()
518519
invoc_request = request.invocation_request
520+
trigger_metadata = invoc_request.trigger_metadata
519521
invocation_id = invoc_request.invocation_id
520522
function_id = invoc_request.function_id
521523

@@ -560,13 +562,19 @@ async def _handle__invocation_request(self, request):
560562
pytype=pb_type_info.pytype,
561563
shmem_mgr=self._shmem_mgr)
562564

565+
http_v2_enabled = False
563566
if fi.trigger_metadata.get('type') == HTTP_TRIGGER:
564567
from azure.functions.extension.base import HttpV2FeatureChecker
565568
http_v2_enabled = HttpV2FeatureChecker.http_v2_enabled()
566569

567570
if http_v2_enabled:
568571
http_request = await http_coordinator.get_http_request_async(
569572
invocation_id)
573+
574+
from azure.functions.extension.base import RequestTrackerMeta
575+
route_params = {key: item.string for key, item in trigger_metadata.items() if key not in ['Headers', 'Query']}
576+
577+
RequestTrackerMeta.get_synchronizer().sync_route_params(http_request, route_params)
570578
args[fi.trigger_metadata.get('param_name')] = http_request
571579

572580
fi_context = self._get_context(invoc_request, fi.name, fi.directory)
@@ -581,23 +589,26 @@ async def _handle__invocation_request(self, request):
581589
for name in fi.output_types:
582590
args[name] = bindings.Out()
583591

584-
if fi.is_async:
585-
call_result = await self._run_async_func(
586-
fi_context, fi.func, args
587-
)
588-
589-
else:
590-
call_result = await self._loop.run_in_executor(
591-
self._sync_call_tp,
592-
self._run_sync_func,
593-
invocation_id, fi_context, fi.func, args)
594-
595-
if call_result is not None and not fi.has_return:
596-
raise RuntimeError(f'function {fi.name!r} without a $return '
597-
'binding returned a non-None value')
598-
599-
if http_v2_enabled:
600-
http_coordinator.set_http_response(invocation_id, call_result)
592+
call_result = None
593+
call_error = None
594+
try:
595+
if fi.is_async:
596+
call_result = await self._run_async_func(fi_context, fi.func, args)
597+
else:
598+
call_result = await self._loop.run_in_executor(
599+
self._sync_call_tp,
600+
self._run_sync_func,
601+
invocation_id, fi_context, fi.func, args)
602+
603+
if call_result is not None and not fi.has_return:
604+
raise RuntimeError(f'function {fi.name!r} without a $return '
605+
'binding returned a non-None value')
606+
except Exception as e:
607+
call_error = e
608+
raise
609+
finally:
610+
if http_v2_enabled:
611+
http_coordinator.set_http_response(invocation_id, call_result if call_result is not None else call_error)
601612

602613
output_data = []
603614
cache_enabled = self._function_data_cache_enabled
@@ -753,16 +764,23 @@ async def catch_all(request: request_type): # type: ignore
753764
invoc_id = request.headers.get(X_MS_INVOCATION_ID)
754765
if invoc_id is None:
755766
raise ValueError(f"Header {X_MS_INVOCATION_ID} not found")
756-
767+
logger.info('Received HTTP request for invocation %s', invoc_id)
757768
http_coordinator.set_http_request(invoc_id, request)
758769
http_resp = await http_coordinator.await_http_response_async(invoc_id)
770+
771+
logger.info('Sending HTTP response for invocation %s', invoc_id)
772+
# if http_resp is an python exception, raise it
773+
if isinstance(http_resp, Exception):
774+
raise http_resp
775+
759776
return http_resp
760777

761778
web_server = web_server_class(LOCAL_HOST, unused_port, app)
762779
web_server_run_task = web_server.serve()
763780

764781
loop = asyncio.get_event_loop()
765782
loop.create_task(web_server_run_task)
783+
logger.info('HTTP server starting on %s:%s', LOCAL_HOST, unused_port)
766784

767785
return f"http://{LOCAL_HOST}:{unused_port}"
768786

@@ -779,7 +797,6 @@ def index_functions(self, function_path: str):
779797
indexed_functions)
780798

781799
indexed_function_logs: List[str] = []
782-
self._has_http_func = False
783800
for func in indexed_functions:
784801
self._has_http_func = self._has_http_func or func.is_http_function()
785802
function_log = "Function Name: {}, Function Binding: {}" \

azure_functions_worker/http_proxy.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,12 @@ def set_http_request(self, invoc_id, http_request):
102102
self._context_references[invoc_id] = AsyncContextReference()
103103
context_ref = self._context_references.get(invoc_id)
104104
context_ref.http_request = http_request
105-
context_ref.http_request_available_event.set()
106105

107106
def set_http_response(self, invoc_id, http_response):
108107
if invoc_id not in self._context_references:
109108
raise Exception("No context reference found for invocation %s", invoc_id)
110109
context_ref = self._context_references.get(invoc_id)
111110
context_ref.http_response = http_response
112-
context_ref.http_response_available_event.set()
113111

114112
async def get_http_request_async(self, invoc_id):
115113
if invoc_id not in self._context_references:
@@ -141,8 +139,7 @@ def _pop_http_response(self, invoc_id):
141139
if response is not None:
142140
context_ref.http_response = None
143141
return response
144-
145-
raise Exception("No http response found for invocation %s", invoc_id)
142+
# If user does not set the response, return nothing and web server will return 200 empty response
146143

147144

148145
http_coordinator = HttpCoordinator()

azure_functions_worker/loader.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def process_indexed_function(functions_registry: functions.Registry,
146146
properties={METADATA_PROPERTIES_WORKER_INDEXED: "True"})
147147

148148
fx_metadata_results.append(function_metadata)
149-
return fx_metadata_results
149+
return fx_metadata_results
150150
except Exception as e:
151151
logger.error(f'Error in process_indexed_function. {e}', exc_info=True)
152152
raise e

azure_functions_worker/logging.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
handler: Optional[logging.Handler] = None
2222
error_handler: Optional[logging.Handler] = None
23-
local_handler = logging.FileHandler("E:/projects/AzureFunctionsPythonWorker/log.txt")
24-
logger.addHandler(local_handler)
23+
# local_handler = logging.FileHandler("E:/projects/AzureFunctionsPythonWorker/log.txt")
24+
# logger.addHandler(local_handler)
2525

2626
def format_exception(exception: Exception) -> str:
2727
msg = str(exception) + "\n"

python/test/worker.config.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"description":{
33
"language":"python",
44
"extensions":[".py"],
5-
"defaultExecutablePath":"E:\\projects\\AzureFunctionsPythonWorker\\.venv_3.8\\Scripts\\python.exe",
5+
"defaultExecutablePath":"E:\\projects\\AzureFunctionsPythonWorker\\.venv_3.9_debug\\Scripts\\python.exe",
66
"defaultWorkerPath":"worker.py",
77
"workerIndexing": "true"
88
}

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@
110110
"numpy",
111111
"pre-commit"
112112
],
113-
"fastapi": ["azure-functions-extension-fastapi"]
113+
"http-v2": ["azure-functions-extension-fastapi", "ujson", "orjson"]
114114
}
115115

116116

tests/consumption_tests/test_linux_consumption.py

+35
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,41 @@ def test_reload_variables_after_oom_error(self):
336336
self.assertNotIn("Failure Exception: ModuleNotFoundError",
337337
logs)
338338

339+
@skipIf(sys.version_info.minor != 10,
340+
"This is testing only for python310")
341+
def test_http_v2_fastapi_streaming_upload_download(self):
342+
"""
343+
A function app with init indexing enabled
344+
"""
345+
import random as rand
346+
with LinuxConsumptionWebHostController(_DEFAULT_HOST_VERSION,
347+
self._py_version) as ctrl:
348+
ctrl.assign_container(env={
349+
"AzureWebJobsStorage": self._storage,
350+
"SCM_RUN_FROM_PACKAGE": self._get_blob_url("HttpV2FastApiStreaming"),
351+
PYTHON_ENABLE_INIT_INDEXING: "true",
352+
PYTHON_ISOLATE_WORKER_DEPENDENCIES: "1"
353+
})
354+
355+
def generate_random_bytes_stream():
356+
"""Generate a stream of random bytes."""
357+
yield b'streaming'
358+
yield b'testing'
359+
yield b'response'
360+
yield b'is'
361+
yield b'returned'
362+
363+
req = Request('POST', f'{ctrl.url}/api/http_v2_fastapi_streaming', data=generate_random_bytes_stream())
364+
resp = ctrl.send_request(req)
365+
self.assertEqual(resp.status_code, 200)
366+
367+
streamed_data = b''
368+
for chunk in resp.iter_content(chunk_size=1024):
369+
if chunk:
370+
streamed_data+= chunk
371+
372+
self.assertEqual(streamed_data, b'streamingtestingresponseisreturned')
373+
339374
def _get_blob_url(self, scenario_name: str) -> str:
340375
return (
341376
f'https://pythonworker{self._py_shortform}sa.blob.core.windows.net/'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
from datetime import datetime
5+
import logging
6+
import time
7+
8+
import azure.functions as func
9+
10+
from azure.functions.extension.fastapi import Request, Response, StreamingResponse, \
11+
HTMLResponse, PlainTextResponse, HTMLResponse, JSONResponse, \
12+
UJSONResponse, ORJSONResponse, RedirectResponse, FileResponse
13+
14+
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
15+
16+
17+
@app.route(route="default_template")
18+
async def default_template(req: Request) -> Response:
19+
logging.info('Python HTTP trigger function processed a request.')
20+
21+
name = req.query_params.get('name')
22+
if not name:
23+
try:
24+
req_body = await req.json()
25+
except ValueError:
26+
pass
27+
else:
28+
name = req_body.get('name')
29+
30+
if name:
31+
return Response(
32+
f"Hello, {name}. This HTTP triggered function "
33+
f"executed successfully.")
34+
else:
35+
return Response(
36+
"This HTTP triggered function executed successfully. "
37+
"Pass a name in the query string or in the request body for a"
38+
" personalized response.",
39+
status_code=200
40+
)
41+
42+
43+
@app.route(route="http_func")
44+
def http_func(req: Request) -> Response:
45+
time.sleep(1)
46+
47+
current_time = datetime.now().strftime("%H:%M:%S")
48+
return Response(f"{current_time}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import asyncio
4+
from datetime import datetime
5+
import logging
6+
import time
7+
8+
import azure.functions as func
9+
from azure.functions.extension.fastapi import Request, Response, StreamingResponse, \
10+
HTMLResponse, PlainTextResponse, HTMLResponse, JSONResponse, \
11+
UJSONResponse, ORJSONResponse, RedirectResponse, FileResponse
12+
13+
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
14+
15+
16+
@app.route(route="default_template")
17+
async def default_template(req: Request) -> Response:
18+
logging.info('Python HTTP trigger function processed a request.')
19+
20+
name = req.query_params.get('name')
21+
if not name:
22+
try:
23+
req_body = await req.json()
24+
except ValueError:
25+
pass
26+
else:
27+
name = req_body.get('name')
28+
29+
if name:
30+
return Response(
31+
f"Hello, {name}. This HTTP triggered function "
32+
f"executed successfully.")
33+
else:
34+
return Response(
35+
"This HTTP triggered function executed successfully. "
36+
"Pass a name in the query string or in the request body for a"
37+
" personalized response.",
38+
status_code=200
39+
)
40+
41+
42+
@app.route(route="http_func")
43+
def http_func(req: Request) -> Response:
44+
time.sleep(1)
45+
46+
current_time = datetime.now().strftime("%H:%M:%S")
47+
return Response(f"{current_time}")
48+
49+
50+
@app.route(route="upload_data_stream")
51+
async def upload_data_stream(req: Request) -> Response:
52+
# Define a list to accumulate the streaming data
53+
data_chunks = []
54+
55+
async def process_stream():
56+
async for chunk in req.stream():
57+
# Append each chunk of streaming data to the list
58+
data_chunks.append(chunk)
59+
60+
await process_stream()
61+
62+
# Concatenate the data chunks to form the complete data
63+
complete_data = b"".join(data_chunks)
64+
65+
# Return the complete data as the response
66+
return Response(content=complete_data, status_code=200)
67+
68+
69+
@app.route(route="return_streaming")
70+
async def return_streaming(req: Request) -> StreamingResponse:
71+
async def content():
72+
yield b"First chunk\n"
73+
yield b"Second chunk\n"
74+
return StreamingResponse(content())
75+
76+
@app.route(route="return_html")
77+
def return_html(req: Request) -> HTMLResponse:
78+
html_content = "<html><body><h1>Hello, World!</h1></body></html>"
79+
return HTMLResponse(content=html_content, status_code=200)
80+
81+
@app.route(route="return_ujson")
82+
def return_ujson(req: Request) -> UJSONResponse:
83+
return UJSONResponse(content={"message": "Hello, World!"}, status_code=200)
84+
85+
@app.route(route="return_orjson")
86+
def return_orjson(req: Request) -> ORJSONResponse:
87+
return ORJSONResponse(content={"message": "Hello, World!"}, status_code=200)
88+
89+
@app.route(route="return_file")
90+
def return_file(req: Request) -> FileResponse:
91+
return FileResponse("function_app.py")

0 commit comments

Comments
 (0)