19
19
from datetime import datetime
20
20
21
21
import grpc
22
-
23
22
from . import bindings , constants , functions , loader , protos
24
23
from .bindings .shared_memory_data_transfer import SharedMemoryManager
25
24
from .constants import (PYTHON_ROLLBACK_CWD_PATH ,
33
32
PYTHON_LANGUAGE_RUNTIME , PYTHON_ENABLE_INIT_INDEXING ,
34
33
METADATA_PROPERTIES_WORKER_INDEXED )
35
34
from .extension import ExtensionManager
35
+ from .http_v2 import http_coordinator , initialize_http_server , HttpV2Registry , \
36
+ sync_http_request , HttpServerInitError
36
37
from .logging import disable_console_logging , enable_console_logging
37
38
from .logging import (logger , error_logger , is_system_log_category ,
38
39
CONSOLE_LOG_PREFIX , format_exception )
@@ -158,6 +159,7 @@ async def dispatch_forever(self): # sourcery skip: swap-if-expression
158
159
159
160
log_level = logging .INFO if not is_envvar_true (
160
161
PYTHON_ENABLE_DEBUG_LOGGING ) else logging .DEBUG
162
+
161
163
root_logger .setLevel (log_level )
162
164
root_logger .addHandler (logging_handler )
163
165
logger .info ('Switched to gRPC logging.' )
@@ -189,7 +191,8 @@ def stop(self) -> None:
189
191
190
192
self ._stop_sync_call_tp ()
191
193
192
- def on_logging (self , record : logging .LogRecord , formatted_msg : str ) -> None :
194
+ def on_logging (self , record : logging .LogRecord ,
195
+ formatted_msg : str ) -> None :
193
196
if record .levelno >= logging .CRITICAL :
194
197
log_level = protos .RpcLog .Critical
195
198
elif record .levelno >= logging .ERROR :
@@ -306,6 +309,13 @@ async def _handle__worker_init_request(self, request):
306
309
self .load_function_metadata (
307
310
worker_init_request .function_app_directory ,
308
311
caller_info = "worker_init_request" )
312
+
313
+ if HttpV2Registry .http_v2_enabled ():
314
+ capabilities [constants .HTTP_URI ] = \
315
+ initialize_http_server (self ._host )
316
+
317
+ except HttpServerInitError :
318
+ raise
309
319
except Exception as ex :
310
320
self ._function_metadata_exception = ex
311
321
@@ -508,6 +518,7 @@ async def _handle__invocation_request(self, request):
508
518
logger .info (', ' .join (function_invocation_logs ))
509
519
510
520
args = {}
521
+
511
522
for pb in invoc_request .input_data :
512
523
pb_type_info = fi .input_types [pb .name ]
513
524
if bindings .is_trigger_binding (pb_type_info .binding_name ):
@@ -523,7 +534,19 @@ async def _handle__invocation_request(self, request):
523
534
shmem_mgr = self ._shmem_mgr ,
524
535
is_deferred_binding = pb_type_info .deferred_bindings_enabled )
525
536
526
- fi_context = self ._get_context (invoc_request , fi .name , fi .directory )
537
+ http_v2_enabled = self ._functions .get_function (function_id ) \
538
+ .is_http_func and \
539
+ HttpV2Registry .http_v2_enabled ()
540
+
541
+ if http_v2_enabled :
542
+ http_request = await http_coordinator .get_http_request_async (
543
+ invocation_id )
544
+
545
+ await sync_http_request (http_request , invoc_request )
546
+ args [fi .trigger_metadata .get ('param_name' )] = http_request
547
+
548
+ fi_context = self ._get_context (invoc_request , fi .name ,
549
+ fi .directory )
527
550
528
551
# Use local thread storage to store the invocation ID
529
552
# for a customer's threads
@@ -536,17 +559,21 @@ async def _handle__invocation_request(self, request):
536
559
args [name ] = bindings .Out ()
537
560
538
561
if fi .is_async :
539
- call_result = await self ._run_async_func (
540
- fi_context , fi .func , args
541
- )
562
+ call_result = \
563
+ await self ._run_async_func (fi_context , fi .func , args )
542
564
else :
543
565
call_result = await self ._loop .run_in_executor (
544
566
self ._sync_call_tp ,
545
567
self ._run_sync_func ,
546
568
invocation_id , fi_context , fi .func , args )
569
+
547
570
if call_result is not None and not fi .has_return :
548
- raise RuntimeError (f'function { fi .name !r} without a $return '
549
- 'binding returned a non-None value' )
571
+ raise RuntimeError (
572
+ f'function { fi .name !r} without a $return binding'
573
+ 'returned a non-None value' )
574
+
575
+ if http_v2_enabled :
576
+ http_coordinator .set_http_response (invocation_id , call_result )
550
577
551
578
output_data = []
552
579
cache_enabled = self ._function_data_cache_enabled
@@ -566,10 +593,12 @@ async def _handle__invocation_request(self, request):
566
593
output_data .append (param_binding )
567
594
568
595
return_value = None
569
- if fi .return_type is not None :
596
+ if fi .return_type is not None and not http_v2_enabled :
570
597
return_value = bindings .to_outgoing_proto (
571
- fi .return_type .binding_name , call_result ,
572
- pytype = fi .return_type .pytype )
598
+ fi .return_type .binding_name ,
599
+ call_result ,
600
+ pytype = fi .return_type .pytype ,
601
+ )
573
602
574
603
# Actively flush customer print() function to console
575
604
sys .stdout .flush ()
@@ -584,6 +613,9 @@ async def _handle__invocation_request(self, request):
584
613
output_data = output_data ))
585
614
586
615
except Exception as ex :
616
+ if http_v2_enabled :
617
+ http_coordinator .set_http_response (invocation_id , ex )
618
+
587
619
return protos .StreamingMessage (
588
620
request_id = self .request_id ,
589
621
invocation_response = protos .InvocationResponse (
@@ -640,11 +672,18 @@ async def _handle__function_environment_reload_request(self, request):
640
672
# reload_customer_libraries call clears the registry
641
673
bindings .load_binding_registry ()
642
674
675
+ capabilities = {}
643
676
if is_envvar_true (PYTHON_ENABLE_INIT_INDEXING ):
644
677
try :
645
678
self .load_function_metadata (
646
679
directory ,
647
680
caller_info = "environment_reload_request" )
681
+
682
+ if HttpV2Registry .http_v2_enabled ():
683
+ capabilities [constants .HTTP_URI ] = \
684
+ initialize_http_server (self ._host )
685
+ except HttpServerInitError :
686
+ raise
648
687
except Exception as ex :
649
688
self ._function_metadata_exception = ex
650
689
@@ -655,7 +694,7 @@ async def _handle__function_environment_reload_request(self, request):
655
694
func_env_reload_request .function_app_directory )
656
695
657
696
success_response = protos .FunctionEnvironmentReloadResponse (
658
- capabilities = {} ,
697
+ capabilities = capabilities ,
659
698
worker_metadata = self .get_worker_metadata (),
660
699
result = protos .StatusResult (
661
700
status = protos .StatusResult .Success ))
@@ -676,8 +715,10 @@ async def _handle__function_environment_reload_request(self, request):
676
715
677
716
def index_functions (self , function_path : str ):
678
717
indexed_functions = loader .index_function_app (function_path )
679
- logger .info ('Indexed function app and found %s functions' ,
680
- len (indexed_functions ))
718
+ logger .info (
719
+ "Indexed function app and found %s functions" ,
720
+ len (indexed_functions )
721
+ )
681
722
682
723
if indexed_functions :
683
724
fx_metadata_results , fx_bindings_logs = (
@@ -747,7 +788,8 @@ async def _handle__close_shared_memory_resources_request(self, request):
747
788
@staticmethod
748
789
def _get_context (invoc_request : protos .InvocationRequest , name : str ,
749
790
directory : str ) -> bindings .Context :
750
- """ For more information refer: https://aka.ms/azfunc-invocation-context
791
+ """ For more information refer:
792
+ https://aka.ms/azfunc-invocation-context
751
793
"""
752
794
trace_context = bindings .TraceContext (
753
795
invoc_request .trace_context .trace_parent ,
@@ -889,7 +931,6 @@ def gen(resp_queue):
889
931
890
932
891
933
class AsyncLoggingHandler (logging .Handler ):
892
-
893
934
def emit (self , record : LogRecord ) -> None :
894
935
# Since we disable console log after gRPC channel is initiated,
895
936
# we should redirect all the messages into dispatcher.
0 commit comments