Skip to content

Commit 25da38d

Browse files
committed
Refactored otel config
1 parent 52a21c4 commit 25da38d

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

azure_functions_worker/constants.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,4 @@
6363

6464
# Opentelemetry variables
6565
TRACEPARENT = "traceparent"
66-
TRACESTATE = "tracestate"
66+
TRACESTATE = "tracestate"

azure_functions_worker/dispatcher.py

+32-15
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
8080
self._function_metadata_exception = None
8181

8282
# Used for checking if open telemetry is enabled
83-
self.is_opentelemetry_available = False
83+
self.otel_libs_available = False
8484
self.context_api = None
85-
self.TraceContextTextMapPropagator = None
85+
self.trace_context_propagator = None
8686

8787
# We allow the customer to change synchronous thread pool max worker
8888
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
@@ -268,16 +268,18 @@ async def _dispatch_grpc_request(self, request):
268268
self._grpc_resp_queue.put_nowait(resp)
269269

270270
def update_opentelemetry_status(self):
271-
"""Check for OpenTelemetry library availability and update the status attribute."""
271+
"""Check for OpenTelemetry library availability and
272+
update the status attribute."""
272273
try:
273274
from opentelemetry import context as context_api
274-
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
275+
from opentelemetry.trace.propagation.tracecontext import (
276+
TraceContextTextMapPropagator)
275277

276278
self.context_api = context_api
277-
self.TraceContextTextMapPropagator = TraceContextTextMapPropagator()
278-
self.is_opentelemetry_available = True
279+
self.trace_context_propagator = TraceContextTextMapPropagator()
280+
self.otel_libs_available = True
279281
except ImportError:
280-
self.is_opentelemetry_available = False
282+
self.otel_libs_available = False
281283

282284
async def _handle__worker_init_request(self, request):
283285
logger.info('Received WorkerInitRequest, '
@@ -310,7 +312,10 @@ async def _handle__worker_init_request(self, request):
310312

311313
self.update_opentelemetry_status()
312314

313-
if self.is_opentelemetry_available:
315+
if self.otel_libs_available:
316+
# When this capability is enabled, logs are not piped back to the host from
317+
# the worker. Logs will directly go to where the user has configured the
318+
# logs to go. This is to ensure that the logs are not duplicated.
314319
capabilities["WorkerOpenTelemetryEnabled"] = _TRUE
315320

316321
if DependencyManager.should_load_cx_dependencies():
@@ -545,12 +550,6 @@ async def _handle__invocation_request(self, request):
545550

546551
fi_context = self._get_context(invoc_request, fi.name, fi.directory)
547552

548-
if self.is_opentelemetry_available:
549-
carrier = {TRACEPARENT: fi_context.trace_context.trace_parent,
550-
TRACESTATE: fi_context.trace_context.trace_state}
551-
ctx = self.TraceContextTextMapPropagator.extract(carrier)
552-
self.context_api.attach(ctx)
553-
554553
# Use local thread storage to store the invocation ID
555554
# for a customer's threads
556555
fi_context.thread_local_storage.invocation_id = invocation_id
@@ -562,6 +561,9 @@ async def _handle__invocation_request(self, request):
562561
args[name] = bindings.Out()
563562

564563
if fi.is_async:
564+
if self.otel_libs_available:
565+
self.configure_opentelemetry(fi_context, invocation_id)
566+
565567
call_result = await self._run_async_func(
566568
fi_context, fi.func, args
567569
)
@@ -570,6 +572,7 @@ async def _handle__invocation_request(self, request):
570572
self._sync_call_tp,
571573
self._run_sync_func,
572574
invocation_id, fi_context, fi.func, args)
575+
573576
if call_result is not None and not fi.has_return:
574577
raise RuntimeError(f'function {fi.name!r} without a $return '
575578
'binding returned a non-None value')
@@ -666,6 +669,10 @@ async def _handle__function_environment_reload_request(self, request):
666669
# reload_customer_libraries call clears the registry
667670
bindings.load_binding_registry()
668671

672+
capabilities = {}
673+
if self.otel_libs_available:
674+
capabilities["WorkerOpenTelemetryEnabled"] = _TRUE
675+
669676
if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
670677
try:
671678
self.load_function_metadata(
@@ -681,7 +688,7 @@ async def _handle__function_environment_reload_request(self, request):
681688
func_env_reload_request.function_app_directory)
682689

683690
success_response = protos.FunctionEnvironmentReloadResponse(
684-
capabilities={},
691+
capabilities=capabilities,
685692
worker_metadata=self.get_worker_metadata(),
686693
result=protos.StatusResult(
687694
status=protos.StatusResult.Success))
@@ -759,6 +766,14 @@ async def _handle__close_shared_memory_resources_request(self, request):
759766
request_id=self.request_id,
760767
close_shared_memory_resources_response=response)
761768

769+
def configure_opentelemetry(self, invocation_context, invocation_id: str):
770+
logger.info("Configuring opentelemetry for invocation id: %s",
771+
invocation_id)
772+
carrier = {TRACEPARENT: invocation_context.trace_context.trace_parent,
773+
TRACESTATE: invocation_context.trace_context.trace_state}
774+
ctx = self.trace_context_propagator.extract(carrier)
775+
self.context_api.attach(ctx)
776+
762777
@staticmethod
763778
def _get_context(invoc_request: protos.InvocationRequest, name: str,
764779
directory: str) -> bindings.Context:
@@ -846,6 +861,8 @@ def _run_sync_func(self, invocation_id, context, func, params):
846861
# invocation_id from ThreadPoolExecutor's threads.
847862
context.thread_local_storage.invocation_id = invocation_id
848863
try:
864+
if self.otel_libs_available:
865+
self.configure_opentelemetry(context, invocation_id)
849866
return ExtensionManager.get_sync_invocation_wrapper(context,
850867
func)(params)
851868
finally:

0 commit comments

Comments
 (0)