diff --git a/azure/durable_functions/entity.py b/azure/durable_functions/entity.py index c025085d..dab2b058 100644 --- a/azure/durable_functions/entity.py +++ b/azure/durable_functions/entity.py @@ -1,6 +1,6 @@ from .models import DurableEntityContext from .models.entities import OperationResult, EntityState -from datetime import datetime +from datetime import datetime, timezone from typing import Callable, Any, List, Dict @@ -49,7 +49,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> for operation_data in batch: result: Any = None is_error: bool = False - start_time: datetime = datetime.now() + start_time: datetime = datetime.now(timezone.utc) try: # populate context @@ -74,6 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) -> operation_result = OperationResult( is_error=is_error, duration=duration, + execution_start_time_ms=int(start_time.timestamp() * 1000), result=result ) response.results.append(operation_result) @@ -119,7 +120,7 @@ def _elapsed_milliseconds_since(self, start_time: datetime) -> int: int The time, in millseconds, from start_time to now """ - end_time = datetime.now() + end_time = datetime.now(timezone.utc) time_diff = end_time - start_time elapsed_time = int(time_diff.total_seconds() * 1000) return elapsed_time diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index bd812be9..67aa5f71 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -4,7 +4,7 @@ from time import time from asyncio import sleep from urllib.parse import urlparse, quote -from opentelemetry import trace +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator import azure.functions as func @@ -72,19 +72,7 @@ async def start_new(self, request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) - # Get the current span - current_span = trace.get_current_span() - span_context = current_span.get_span_context() - - # Get the traceparent and tracestate from the span context - # Follows the W3C Trace Context specification for traceparent - # https://www.w3.org/TR/trace-context/#traceparent-header - trace_id = format(span_context.trace_id, '032x') - span_id = format(span_context.span_id, '016x') - trace_flags = format(span_context.trace_flags, '02x') - trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}" - - trace_state = span_context.trace_state + trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context() response: List[Any] = await self._post_async_request( request_url, @@ -563,9 +551,14 @@ async def signal_entity(self, entityId: EntityId, operation_name: str, entity_Id=entityId) request_url = options.to_url(self._orchestration_bindings.rpc_base_url) + + trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context() + response = await self._post_async_request( request_url, - json.dumps(operation_input) if operation_input else None) + json.dumps(operation_input) if operation_input else None, + trace_parent, + trace_state) switch_statement = { 202: lambda: None # signal accepted @@ -797,3 +790,23 @@ async def resume(self, instance_id: str, reason: str) -> None: error_message = has_error_message() if error_message: raise Exception(error_message) + + """Gets the current trace activity traceparent and tracestate + + Returns + ------- + tuple[str, str] + A tuple containing the (traceparent, tracestate) + """ + @staticmethod + def _get_current_activity_context() -> tuple[str, str]: + carrier = {} + + # Inject the current trace context into the carrier + TraceContextTextMapPropagator().inject(carrier) + + # Extract the traceparent and optionally the tracestate + trace_parent = carrier.get("traceparent") + trace_state = carrier.get("tracestate") + + return trace_parent, trace_state diff --git a/azure/durable_functions/models/entities/OperationResult.py b/azure/durable_functions/models/entities/OperationResult.py index de775a1e..05147f09 100644 --- a/azure/durable_functions/models/entities/OperationResult.py +++ b/azure/durable_functions/models/entities/OperationResult.py @@ -12,6 +12,7 @@ class OperationResult: def __init__(self, is_error: bool, duration: int, + execution_start_time_ms: int, result: Optional[str] = None): """Instantiate an OperationResult. @@ -21,11 +22,15 @@ def __init__(self, Whether or not the operation resulted in an exception. duration: int How long the operation took, in milliseconds. + start_time: int + The start time of this operation's execution, in milliseconds, + since January 1st 1970 midnight in UTC. result: Optional[str] The operation result. Defaults to None. """ self._is_error: bool = is_error self._duration: int = duration + self._execution_start_time_ms: int = execution_start_time_ms self._result: Optional[str] = result @property @@ -50,6 +55,18 @@ def duration(self) -> int: """ return self._duration + @property + def execution_start_time_ms(self) -> int: + """Get the start time of this operation. + + Returns + ------- + int: + The start time of this operation's execution, in milliseconds, + since January 1st 1970 midnight in UTC. + """ + return self._execution_start_time_ms + @property def result(self) -> Any: """Get the operation's result. @@ -72,5 +89,6 @@ def to_json(self) -> Dict[str, Any]: to_json: Dict[str, Any] = {} to_json["isError"] = self.is_error to_json["duration"] = self.duration + to_json["startTime"] = self.execution_start_time_ms to_json["result"] = json.dumps(self.result, default=_serialize_custom_object) return to_json diff --git a/tests/orchestrator/test_entity.py b/tests/orchestrator/test_entity.py index dc74c873..ceed4204 100644 --- a/tests/orchestrator/test_entity.py +++ b/tests/orchestrator/test_entity.py @@ -209,9 +209,11 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error # We cannot control duration, so default it to zero and avoid checking for it # in later asserts duration = 0 + start_time = 0 operation_result = OperationResult( is_error=is_error, duration=duration, + execution_start_time_ms=start_time, result=result ) entity_state._results.append(operation_result)