From b7f24b64012a3907403ab451984600f90bcbbbfc Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 2 May 2025 23:45:05 -0700 Subject: [PATCH 1/6] first commit --- azure/durable_functions/entity.py | 7 ++-- .../models/DurableOrchestrationClient.py | 39 ++++++++++++++++++- .../models/entities/OperationResult.py | 16 ++++++++ .../models/utils/http_utils.py | 17 ++++++-- tests/orchestrator/test_entity.py | 2 + 5 files changed, 73 insertions(+), 8 deletions(-) diff --git a/azure/durable_functions/entity.py b/azure/durable_functions/entity.py index c025085d..8fab8b8b 100644 --- a/azure/durable_functions/entity.py +++ b/azure/durable_functions/entity.py @@ -1,6 +1,6 @@ from .models import DurableEntityContext from .models.entities import OperationResult, EntityState -from datetime import datetime +from datetime import datetime, timezone from typing import Callable, Any, List, Dict @@ -49,7 +49,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> for operation_data in batch: result: Any = None is_error: bool = False - start_time: datetime = datetime.now() + start_time: datetime = datetime.now(timezone.utc) try: # populate context @@ -74,6 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> operation_result = OperationResult( is_error=is_error, duration=duration, + start_time=int(start_time.timestamp() * 1000), result=result ) response.results.append(operation_result) @@ -119,7 +120,7 @@ def _elapsed_milliseconds_since(self, start_time: datetime) -> int: int The time, in millseconds, from start_time to now """ - end_time = datetime.now() + end_time = datetime.now(timezone.utc) time_diff = end_time - start_time elapsed_time = int(time_diff.total_seconds() * 1000) return elapsed_time diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 1be5a28e..1f76696a 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -4,6 +4,7 @@ from time import time from asyncio import sleep from urllib.parse import urlparse, quote +from opentelemetry import trace import azure.functions as func @@ -70,9 +71,26 @@ async def start_new(self, """ request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) + + # Get the current span + current_span = trace.get_current_span() + span_context = current_span.get_span_context() + + # Get the traceparent and tracestate from the span context + # Follows the W3C Trace Context specification for traceparent + # https://www.w3.org/TR/trace-context/#traceparent-header + trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, '016x') + trace_flags = format(span_context.trace_flags, '02x') + trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" + + trace_state = span_context.trace_state response: List[Any] = await self._post_async_request( - request_url, self._get_json_input(client_input)) + request_url, + self._get_json_input(client_input), + trace_parent, + trace_state) status_code: int = response[0] if status_code <= 202 and response[1]: @@ -545,9 +563,26 @@ async def signal_entity(self, entityId: EntityId, operation_name: str, entity_Id=entityId) request_url = options.to_url(self._orchestration_bindings.rpc_base_url) + + # Get the current span + current_span = trace.get_current_span() + span_context = current_span.get_span_context() + + # Get the traceparent and tracestate from the span context + # Follows the W3C Trace Context specification for traceparent + # https://www.w3.org/TR/trace-context/#traceparent-header + trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, '016x') + trace_flags = format(span_context.trace_flags, '02x') + trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" + + trace_state = span_context.trace_state + response = await self._post_async_request( request_url, - json.dumps(operation_input) if operation_input else None) + json.dumps(operation_input) if operation_input else None, + trace_parent, + trace_state) switch_statement = { 202: lambda: None # signal accepted diff --git a/azure/durable_functions/models/entities/OperationResult.py b/azure/durable_functions/models/entities/OperationResult.py index de775a1e..f68edc6e 100644 --- a/azure/durable_functions/models/entities/OperationResult.py +++ b/azure/durable_functions/models/entities/OperationResult.py @@ -12,6 +12,7 @@ class OperationResult: def __init__(self, is_error: bool, duration: int, + start_time: int, result: Optional[str] = None): """Instantiate an OperationResult. @@ -21,11 +22,14 @@ def __init__(self, Whether or not the operation resulted in an exception. duration: int How long the operation took, in milliseconds. + start_time: int + The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC. result: Optional[str] The operation result. Defaults to None. """ self._is_error: bool = is_error self._duration: int = duration + self._start_time: int = start_time self._result: Optional[str] = result @property @@ -49,6 +53,17 @@ def duration(self) -> int: The duration of this operation, in milliseconds """ return self._duration + + @property + def start_time(self) -> int: + """Get the start time of this operation. + + Returns + ------- + int: + The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC + """ + return self._start_time @property def result(self) -> Any: @@ -72,5 +87,6 @@ def to_json(self) -> Dict[str, Any]: to_json: Dict[str, Any] = {} to_json["isError"] = self.is_error to_json["duration"] = self.duration + to_json["startTime"] = self.start_time to_json["result"] = json.dumps(self.result, default=_serialize_custom_object) return to_json diff --git a/azure/durable_functions/models/utils/http_utils.py b/azure/durable_functions/models/utils/http_utils.py index e45cef68..eaa3a07d 100644 --- a/azure/durable_functions/models/utils/http_utils.py +++ b/azure/durable_functions/models/utils/http_utils.py @@ -3,7 +3,10 @@ import aiohttp -async def post_async_request(url: str, data: Any = None) -> List[Union[int, Any]]: +async def post_async_request(url: str, + data: Any = None, + trace_parent: str = None, + trace_state: str = None) -> List[Union[int, Any]]: """Post request with the data provided to the url provided. Parameters @@ -12,6 +15,10 @@ async def post_async_request(url: str, data: Any = None) -> List[Union[int, Any] url to make the post to data: Any object to post + trace_parent: str + traceparent header to send with the request + trace_state: str + tracestate header to send with the request Returns ------- @@ -19,8 +26,12 @@ async def post_async_request(url: str, data: Any = None) -> List[Union[int, Any] Tuple with the Response status code and the data returned from the request """ async with aiohttp.ClientSession() as session: - async with session.post(url, - json=data) as response: + headers = {} + if trace_parent: + headers["traceparent"] = trace_parent + if trace_state: + headers["tracestate"] = trace_state + async with session.post(url, json=data, headers=headers) as response: # We disable aiohttp's input type validation # as the server may respond with alternative # data encodings. This is potentially unsafe. diff --git a/tests/orchestrator/test_entity.py b/tests/orchestrator/test_entity.py index dc74c873..474cdc29 100644 --- a/tests/orchestrator/test_entity.py +++ b/tests/orchestrator/test_entity.py @@ -209,9 +209,11 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error # We cannot control duration, so default it to zero and avoid checking for it # in later asserts duration = 0 + start_time = 0 operation_result = OperationResult( is_error=is_error, duration=duration, + start_time=start_time, result=result ) entity_state._results.append(operation_result) From 77865c107eb5c2276fcb4a3f9687348da1e37c73 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Sat, 3 May 2025 00:06:56 -0700 Subject: [PATCH 2/6] moved common code into a helper method --- .../models/DurableOrchestrationClient.py | 60 +++++++------------ 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index fca07693..a3133a70 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -71,34 +71,8 @@ async def start_new(self, """ request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) - - # Get the current span - current_span = trace.get_current_span() - span_context = current_span.get_span_context() - - # Get the traceparent and tracestate from the span context - # Follows the W3C Trace Context specification for traceparent - # https://www.w3.org/TR/trace-context/#traceparent-header - trace_id = format(span_context.trace_id, '032x') - span_id = format(span_context.span_id, '016x') - trace_flags = format(span_context.trace_flags, '02x') - trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" - - trace_state = span_context.trace_state - - # Get the current span - current_span = trace.get_current_span() - span_context = current_span.get_span_context() - - # Get the traceparent and tracestate from the span context - # Follows the W3C Trace Context specification for traceparent - # https://www.w3.org/TR/trace-context/#traceparent-header - trace_id = format(span_context.trace_id, '032x') - span_id = format(span_context.span_id, '016x') - trace_flags = format(span_context.trace_flags, '02x') - trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" - trace_state = span_context.trace_state + trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context() response: List[Any] = await self._post_async_request( request_url, @@ -578,19 +552,7 @@ async def signal_entity(self, entityId: EntityId, operation_name: str, request_url = options.to_url(self._orchestration_bindings.rpc_base_url) - # Get the current span - current_span = trace.get_current_span() - span_context = current_span.get_span_context() - - # Get the traceparent and tracestate from the span context - # Follows the W3C Trace Context specification for traceparent - # https://www.w3.org/TR/trace-context/#traceparent-header - trace_id = format(span_context.trace_id, '032x') - span_id = format(span_context.span_id, '016x') - trace_flags = format(span_context.trace_flags, '02x') - trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" - - trace_state = span_context.trace_state + trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context() response = await self._post_async_request( request_url, @@ -828,3 +790,21 @@ async def resume(self, instance_id: str, reason: str) -> None: error_message = has_error_message() if error_message: raise Exception(error_message) + + @staticmethod + def _get_current_activity_context() -> tuple[str, str]: + # Get the current span + current_span = trace.get_current_span() + span_context = current_span.get_span_context() + + # Get the traceparent and tracestate from the span context + # Follows the W3C Trace Context specification for traceparent + # https://www.w3.org/TR/trace-context/#traceparent-header + trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, '016x') + trace_flags = format(span_context.trace_flags, '02x') + trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" + + trace_state = span_context.trace_state + + return trace_parent, trace_state \ No newline at end of file From 2026a1b533f560fff6a37b77a655022f0bfad08c Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 5 May 2025 20:20:48 -0700 Subject: [PATCH 3/6] addressing PR comments --- .../models/DurableOrchestrationClient.py | 2 +- .../models/entities/OperationResult.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index a3133a70..1039f1ae 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -807,4 +807,4 @@ def _get_current_activity_context() -> tuple[str, str]: trace_state = span_context.trace_state - return trace_parent, trace_state \ No newline at end of file + return trace_parent, trace_state diff --git a/azure/durable_functions/models/entities/OperationResult.py b/azure/durable_functions/models/entities/OperationResult.py index f68edc6e..05147f09 100644 --- a/azure/durable_functions/models/entities/OperationResult.py +++ b/azure/durable_functions/models/entities/OperationResult.py @@ -12,7 +12,7 @@ class OperationResult: def __init__(self, is_error: bool, duration: int, - start_time: int, + execution_start_time_ms: int, result: Optional[str] = None): """Instantiate an OperationResult. @@ -23,13 +23,14 @@ def __init__(self, duration: int How long the operation took, in milliseconds. start_time: int - The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC. + The start time of this operation's execution, in milliseconds, + since January 1st 1970 midnight in UTC. result: Optional[str] The operation result. Defaults to None. """ self._is_error: bool = is_error self._duration: int = duration - self._start_time: int = start_time + self._execution_start_time_ms: int = execution_start_time_ms self._result: Optional[str] = result @property @@ -53,17 +54,18 @@ def duration(self) -> int: The duration of this operation, in milliseconds """ return self._duration - + @property - def start_time(self) -> int: + def execution_start_time_ms(self) -> int: """Get the start time of this operation. Returns ------- int: - The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC + The start time of this operation's execution, in milliseconds, + since January 1st 1970 midnight in UTC. """ - return self._start_time + return self._execution_start_time_ms @property def result(self) -> Any: @@ -87,6 +89,6 @@ def to_json(self) -> Dict[str, Any]: to_json: Dict[str, Any] = {} to_json["isError"] = self.is_error to_json["duration"] = self.duration - to_json["startTime"] = self.start_time + to_json["startTime"] = self.execution_start_time_ms to_json["result"] = json.dumps(self.result, default=_serialize_custom_object) return to_json From 6bff9c114e1c7c498cbc0886b0c2839d899323ba Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 5 May 2025 20:24:30 -0700 Subject: [PATCH 4/6] fixing failures --- azure/durable_functions/entity.py | 2 +- tests/orchestrator/test_entity.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/azure/durable_functions/entity.py b/azure/durable_functions/entity.py index 8fab8b8b..dab2b058 100644 --- a/azure/durable_functions/entity.py +++ b/azure/durable_functions/entity.py @@ -74,7 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> operation_result = OperationResult( is_error=is_error, duration=duration, - start_time=int(start_time.timestamp() * 1000), + execution_start_time_ms=int(start_time.timestamp() * 1000), result=result ) response.results.append(operation_result) diff --git a/tests/orchestrator/test_entity.py b/tests/orchestrator/test_entity.py index 474cdc29..ceed4204 100644 --- a/tests/orchestrator/test_entity.py +++ b/tests/orchestrator/test_entity.py @@ -213,7 +213,7 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error operation_result = OperationResult( is_error=is_error, duration=duration, - start_time=start_time, + execution_start_time_ms=start_time, result=result ) entity_state._results.append(operation_result) From 8bbb39f6ae599c03701440d44ee6062772bbe20e Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 21 May 2025 12:33:23 -0700 Subject: [PATCH 5/6] added a method comment and moved from manual traceparent creation to automatic extraction --- .../models/DurableOrchestrationClient.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 1039f1ae..1b862022 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -5,6 +5,7 @@ from asyncio import sleep from urllib.parse import urlparse, quote from opentelemetry import trace +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator import azure.functions as func @@ -791,20 +792,22 @@ async def resume(self, instance_id: str, reason: str) -> None: if error_message: raise Exception(error_message) + """Gets the current trace activity traceparent and tracestate + + Returns + ------- + tuple[str, str] + A tuple containing the (traceparent, tracestate) + """ @staticmethod def _get_current_activity_context() -> tuple[str, str]: - # Get the current span - current_span = trace.get_current_span() - span_context = current_span.get_span_context() - - # Get the traceparent and tracestate from the span context - # Follows the W3C Trace Context specification for traceparent - # https://www.w3.org/TR/trace-context/#traceparent-header - trace_id = format(span_context.trace_id, '032x') - span_id = format(span_context.span_id, '016x') - trace_flags = format(span_context.trace_flags, '02x') - trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" - - trace_state = span_context.trace_state + carrier = {} + + # Inject the current trace context into the carrier + TraceContextTextMapPropagator().inject(carrier) + + # Extract the traceparent and optionally the tracestate + trace_parent = carrier.get("traceparent") + trace_state = carrier.get("tracestate") return trace_parent, trace_state From 064cfe0945c0282e0b6574bdadf3d1afa1511f15 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 21 May 2025 12:34:53 -0700 Subject: [PATCH 6/6] linter issues --- azure/durable_functions/models/DurableOrchestrationClient.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 1b862022..67aa5f71 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -4,7 +4,6 @@ from time import time from asyncio import sleep from urllib.parse import urlparse, quote -from opentelemetry import trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator import azure.functions as func @@ -792,7 +791,7 @@ async def resume(self, instance_id: str, reason: str) -> None: if error_message: raise Exception(error_message) - """Gets the current trace activity traceparent and tracestate + """Gets the current trace activity traceparent and tracestate Returns -------