Skip to content

Commit 51758a2

Browse files
feat: OpenTelemetry support (#1469)
* OpenTel Support * OpenTelemetry support * Refactored otel config * Linting fix * Added Unit Tests * Moved logging to init * Updated reload request * Linting fix * Made var private * Updating constant --------- Co-authored-by: wangbill <[email protected]>
1 parent 389539a commit 51758a2

File tree

3 files changed

+124
-1
lines changed

3 files changed

+124
-1
lines changed

azure_functions_worker/constants.py

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
FUNCTION_DATA_CACHE = "FunctionDataCache"
1313
HTTP_URI = "HttpUri"
1414

15+
# When this capability is enabled, logs are not piped back to the
16+
# host from the worker. Logs will directly go to where the user has
17+
# configured them to go. This is to ensure that the logs are not
18+
# duplicated.
19+
WORKER_OPEN_TELEMETRY_ENABLED = "WorkerOpenTelemetryEnabled"
20+
1521
# Platform Environment Variables
1622
AZURE_WEBJOBS_SCRIPT_ROOT = "AzureWebJobsScriptRoot"
1723
CONTAINER_NAME = "CONTAINER_NAME"

azure_functions_worker/dispatcher.py

+45-1
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import sys
1515
import threading
1616
from asyncio import BaseEventLoop
17+
from datetime import datetime
1718
from logging import LogRecord
1819
from typing import List, Optional
19-
from datetime import datetime
2020

2121
import grpc
2222
from . import bindings, constants, functions, loader, protos
@@ -46,6 +46,8 @@
4646
from .version import VERSION
4747

4848
_TRUE = "true"
49+
_TRACEPARENT = "traceparent"
50+
_TRACESTATE = "tracestate"
4951

5052

5153
class DispatcherMeta(type):
@@ -80,6 +82,11 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
8082
self._function_metadata_result = None
8183
self._function_metadata_exception = None
8284

85+
# Used for checking if open telemetry is enabled
86+
self._otel_libs_available = False
87+
self._context_api = None
88+
self._trace_context_propagator = None
89+
8390
# We allow the customer to change synchronous thread pool max worker
8491
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
8592
# For 3.[6|7|8] The default value is 1.
@@ -265,6 +272,23 @@ async def _dispatch_grpc_request(self, request):
265272
resp = await request_handler(request)
266273
self._grpc_resp_queue.put_nowait(resp)
267274

275+
def update_opentelemetry_status(self):
276+
"""Check for OpenTelemetry library availability and
277+
update the status attribute."""
278+
try:
279+
from opentelemetry import context as context_api
280+
from opentelemetry.trace.propagation.tracecontext import (
281+
TraceContextTextMapPropagator)
282+
283+
self._context_api = context_api
284+
self._trace_context_propagator = TraceContextTextMapPropagator()
285+
self._otel_libs_available = True
286+
287+
logger.info("Successfully loaded OpenTelemetry modules. "
288+
"OpenTelemetry is now enabled.")
289+
except ImportError:
290+
self._otel_libs_available = False
291+
268292
async def _handle__worker_init_request(self, request):
269293
logger.info('Received WorkerInitRequest, '
270294
'python version %s, '
@@ -294,6 +318,11 @@ async def _handle__worker_init_request(self, request):
294318
constants.SHARED_MEMORY_DATA_TRANSFER: _TRUE,
295319
}
296320

321+
self.update_opentelemetry_status()
322+
323+
if self._otel_libs_available:
324+
capabilities[constants.WORKER_OPEN_TELEMETRY_ENABLED] = _TRUE
325+
297326
if DependencyManager.should_load_cx_dependencies():
298327
DependencyManager.prioritize_customer_dependencies()
299328

@@ -559,6 +588,9 @@ async def _handle__invocation_request(self, request):
559588
args[name] = bindings.Out()
560589

561590
if fi.is_async:
591+
if self._otel_libs_available:
592+
self.configure_opentelemetry(fi_context)
593+
562594
call_result = \
563595
await self._run_async_func(fi_context, fi.func, args)
564596
else:
@@ -673,6 +705,10 @@ async def _handle__function_environment_reload_request(self, request):
673705
bindings.load_binding_registry()
674706

675707
capabilities = {}
708+
self.update_opentelemetry_status()
709+
if self._otel_libs_available:
710+
capabilities[constants.WORKER_OPEN_TELEMETRY_ENABLED] = _TRUE
711+
676712
if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
677713
try:
678714
self.load_function_metadata(
@@ -785,6 +821,12 @@ async def _handle__close_shared_memory_resources_request(self, request):
785821
request_id=self.request_id,
786822
close_shared_memory_resources_response=response)
787823

824+
def configure_opentelemetry(self, invocation_context):
825+
carrier = {_TRACEPARENT: invocation_context.trace_context.trace_parent,
826+
_TRACESTATE: invocation_context.trace_context.trace_state}
827+
ctx = self._trace_context_propagator.extract(carrier)
828+
self._context_api.attach(ctx)
829+
788830
@staticmethod
789831
def _get_context(invoc_request: protos.InvocationRequest, name: str,
790832
directory: str) -> bindings.Context:
@@ -873,6 +915,8 @@ def _run_sync_func(self, invocation_id, context, func, params):
873915
# invocation_id from ThreadPoolExecutor's threads.
874916
context.thread_local_storage.invocation_id = invocation_id
875917
try:
918+
if self._otel_libs_available:
919+
self.configure_opentelemetry(context)
876920
return ExtensionManager.get_sync_invocation_wrapper(context,
877921
func)(params)
878922
finally:

tests/unittests/test_opentelemetry.py

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
import unittest
3+
from unittest.mock import patch, MagicMock
4+
5+
from azure_functions_worker import protos
6+
from tests.unittests.test_dispatcher import FUNCTION_APP_DIRECTORY
7+
from tests.utils import testutils
8+
9+
10+
class TestOpenTelemetry(unittest.TestCase):
11+
12+
def setUp(self):
13+
self.loop = asyncio.new_event_loop()
14+
asyncio.set_event_loop(self.loop)
15+
self.dispatcher = testutils.create_dummy_dispatcher()
16+
17+
def tearDown(self):
18+
self.loop.close()
19+
20+
def test_update_opentelemetry_status_import_error(self):
21+
# Patch the built-in import mechanism
22+
with patch('builtins.__import__', side_effect=ImportError):
23+
self.dispatcher.update_opentelemetry_status()
24+
# Verify that otel_libs_available is set to False due to ImportError
25+
self.assertFalse(self.dispatcher._otel_libs_available)
26+
27+
@patch('builtins.__import__')
28+
def test_update_opentelemetry_status_success(
29+
self, mock_imports):
30+
mock_imports.return_value = MagicMock()
31+
self.dispatcher.update_opentelemetry_status()
32+
self.assertTrue(self.dispatcher._otel_libs_available)
33+
34+
@patch('builtins.__import__')
35+
def test_init_request_otel_capability_enabled(
36+
self, mock_imports):
37+
mock_imports.return_value = MagicMock()
38+
39+
init_request = protos.StreamingMessage(
40+
worker_init_request=protos.WorkerInitRequest(
41+
host_version="2.3.4",
42+
function_app_directory=str(FUNCTION_APP_DIRECTORY)
43+
)
44+
)
45+
46+
init_response = self.loop.run_until_complete(
47+
self.dispatcher._handle__worker_init_request(init_request))
48+
49+
self.assertEqual(init_response.worker_init_response.result.status,
50+
protos.StatusResult.Success)
51+
52+
# Verify that WorkerOpenTelemetryEnabled capability is set to _TRUE
53+
capabilities = init_response.worker_init_response.capabilities
54+
self.assertIn("WorkerOpenTelemetryEnabled", capabilities)
55+
self.assertEqual(capabilities["WorkerOpenTelemetryEnabled"], "true")
56+
57+
def test_init_request_otel_capability_disabled(self):
58+
59+
init_request = protos.StreamingMessage(
60+
worker_init_request=protos.WorkerInitRequest(
61+
host_version="2.3.4",
62+
function_app_directory=str(FUNCTION_APP_DIRECTORY)
63+
)
64+
)
65+
66+
init_response = self.loop.run_until_complete(
67+
self.dispatcher._handle__worker_init_request(init_request))
68+
69+
self.assertEqual(init_response.worker_init_response.result.status,
70+
protos.StatusResult.Success)
71+
72+
capabilities = init_response.worker_init_response.capabilities
73+
self.assertNotIn("WorkerOpenTelemetryEnabled", capabilities)

0 commit comments

Comments
 (0)