Skip to content

feat: OpenTelemetry support #1469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 12, 2024
6 changes: 6 additions & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
FUNCTION_DATA_CACHE = "FunctionDataCache"
HTTP_URI = "HttpUri"

# When this capability is enabled, logs are not piped back to the
# host from the worker. Logs will directly go to where the user has
# configured them to go. This is to ensure that the logs are not
# duplicated.
WORKER_OPEN_TELEMETRY_ENABLED = "WorkerOpenTelemetryEnabled"

# Platform Environment Variables
AZURE_WEBJOBS_SCRIPT_ROOT = "AzureWebJobsScriptRoot"
CONTAINER_NAME = "CONTAINER_NAME"
Expand Down
46 changes: 45 additions & 1 deletion azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import sys
import threading
from asyncio import BaseEventLoop
from datetime import datetime
from logging import LogRecord
from typing import List, Optional
from datetime import datetime

import grpc
from . import bindings, constants, functions, loader, protos
Expand Down Expand Up @@ -46,6 +46,8 @@
from .version import VERSION

_TRUE = "true"
_TRACEPARENT = "traceparent"
_TRACESTATE = "tracestate"


class DispatcherMeta(type):
Expand Down Expand Up @@ -80,6 +82,11 @@ def __init__(self, loop: BaseEventLoop, host: str, port: int,
self._function_metadata_result = None
self._function_metadata_exception = None

# Used for checking if open telemetry is enabled
self._otel_libs_available = False
self._context_api = None
self._trace_context_propagator = None

# We allow the customer to change synchronous thread pool max worker
# count by setting the PYTHON_THREADPOOL_THREAD_COUNT app setting.
# For 3.[6|7|8] The default value is 1.
Expand Down Expand Up @@ -265,6 +272,23 @@ async def _dispatch_grpc_request(self, request):
resp = await request_handler(request)
self._grpc_resp_queue.put_nowait(resp)

def update_opentelemetry_status(self):
"""Check for OpenTelemetry library availability and
update the status attribute."""
try:
from opentelemetry import context as context_api
from opentelemetry.trace.propagation.tracecontext import (
TraceContextTextMapPropagator)

self._context_api = context_api
self._trace_context_propagator = TraceContextTextMapPropagator()
self._otel_libs_available = True

logger.info("Successfully loaded OpenTelemetry modules. "
"OpenTelemetry is now enabled.")
except ImportError:
self._otel_libs_available = False

async def _handle__worker_init_request(self, request):
logger.info('Received WorkerInitRequest, '
'python version %s, '
Expand Down Expand Up @@ -294,6 +318,11 @@ async def _handle__worker_init_request(self, request):
constants.SHARED_MEMORY_DATA_TRANSFER: _TRUE,
}

self.update_opentelemetry_status()

if self._otel_libs_available:
capabilities[constants.WORKER_OPEN_TELEMETRY_ENABLED] = _TRUE

if DependencyManager.should_load_cx_dependencies():
DependencyManager.prioritize_customer_dependencies()

Expand Down Expand Up @@ -559,6 +588,9 @@ async def _handle__invocation_request(self, request):
args[name] = bindings.Out()

if fi.is_async:
if self._otel_libs_available:
self.configure_opentelemetry(fi_context)

call_result = \
await self._run_async_func(fi_context, fi.func, args)
else:
Expand Down Expand Up @@ -673,6 +705,10 @@ async def _handle__function_environment_reload_request(self, request):
bindings.load_binding_registry()

capabilities = {}
self.update_opentelemetry_status()
if self._otel_libs_available:
capabilities[constants.WORKER_OPEN_TELEMETRY_ENABLED] = _TRUE

if is_envvar_true(PYTHON_ENABLE_INIT_INDEXING):
try:
self.load_function_metadata(
Expand Down Expand Up @@ -785,6 +821,12 @@ async def _handle__close_shared_memory_resources_request(self, request):
request_id=self.request_id,
close_shared_memory_resources_response=response)

def configure_opentelemetry(self, invocation_context):
carrier = {_TRACEPARENT: invocation_context.trace_context.trace_parent,
_TRACESTATE: invocation_context.trace_context.trace_state}
ctx = self._trace_context_propagator.extract(carrier)
self._context_api.attach(ctx)

@staticmethod
def _get_context(invoc_request: protos.InvocationRequest, name: str,
directory: str) -> bindings.Context:
Expand Down Expand Up @@ -873,6 +915,8 @@ def _run_sync_func(self, invocation_id, context, func, params):
# invocation_id from ThreadPoolExecutor's threads.
context.thread_local_storage.invocation_id = invocation_id
try:
if self._otel_libs_available:
self.configure_opentelemetry(context)
return ExtensionManager.get_sync_invocation_wrapper(context,
func)(params)
finally:
Expand Down
73 changes: 73 additions & 0 deletions tests/unittests/test_opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import unittest
from unittest.mock import patch, MagicMock

from azure_functions_worker import protos
from tests.unittests.test_dispatcher import FUNCTION_APP_DIRECTORY
from tests.utils import testutils


class TestOpenTelemetry(unittest.TestCase):

def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.dispatcher = testutils.create_dummy_dispatcher()

def tearDown(self):
self.loop.close()

def test_update_opentelemetry_status_import_error(self):
# Patch the built-in import mechanism
with patch('builtins.__import__', side_effect=ImportError):
self.dispatcher.update_opentelemetry_status()
# Verify that otel_libs_available is set to False due to ImportError
self.assertFalse(self.dispatcher._otel_libs_available)

@patch('builtins.__import__')
def test_update_opentelemetry_status_success(
self, mock_imports):
mock_imports.return_value = MagicMock()
self.dispatcher.update_opentelemetry_status()
self.assertTrue(self.dispatcher._otel_libs_available)

@patch('builtins.__import__')
def test_init_request_otel_capability_enabled(
self, mock_imports):
mock_imports.return_value = MagicMock()

init_request = protos.StreamingMessage(
worker_init_request=protos.WorkerInitRequest(
host_version="2.3.4",
function_app_directory=str(FUNCTION_APP_DIRECTORY)
)
)

init_response = self.loop.run_until_complete(
self.dispatcher._handle__worker_init_request(init_request))

self.assertEqual(init_response.worker_init_response.result.status,
protos.StatusResult.Success)

# Verify that WorkerOpenTelemetryEnabled capability is set to _TRUE
capabilities = init_response.worker_init_response.capabilities
self.assertIn("WorkerOpenTelemetryEnabled", capabilities)
self.assertEqual(capabilities["WorkerOpenTelemetryEnabled"], "true")

def test_init_request_otel_capability_disabled(self):

init_request = protos.StreamingMessage(
worker_init_request=protos.WorkerInitRequest(
host_version="2.3.4",
function_app_directory=str(FUNCTION_APP_DIRECTORY)
)
)

init_response = self.loop.run_until_complete(
self.dispatcher._handle__worker_init_request(init_request))

self.assertEqual(init_response.worker_init_response.result.status,
protos.StatusResult.Success)

capabilities = init_response.worker_init_response.capabilities
self.assertNotIn("WorkerOpenTelemetryEnabled", capabilities)
Loading