Skip to content

Distributed Tracing for Entities #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions azure/durable_functions/entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .models import DurableEntityContext
from .models.entities import OperationResult, EntityState
from datetime import datetime
from datetime import datetime, timezone
from typing import Callable, Any, List, Dict


Expand Down Expand Up @@ -49,7 +49,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
for operation_data in batch:
result: Any = None
is_error: bool = False
start_time: datetime = datetime.now()
start_time: datetime = datetime.now(timezone.utc)

try:
# populate context
Expand All @@ -74,6 +74,7 @@ def handle(self, context: DurableEntityContext, batch: List[Dict[str, Any]]) ->
operation_result = OperationResult(
is_error=is_error,
duration=duration,
start_time=int(start_time.timestamp() * 1000),
result=result
)
response.results.append(operation_result)
Expand Down Expand Up @@ -119,7 +120,7 @@ def _elapsed_milliseconds_since(self, start_time: datetime) -> int:
int
The time, in millseconds, from start_time to now
"""
end_time = datetime.now()
end_time = datetime.now(timezone.utc)
time_diff = end_time - start_time
elapsed_time = int(time_diff.total_seconds() * 1000)
return elapsed_time
39 changes: 25 additions & 14 deletions azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,7 @@ async def start_new(self,
request_url = self._get_start_new_url(
instance_id=instance_id, orchestration_function_name=orchestration_function_name)

# Get the current span
current_span = trace.get_current_span()
span_context = current_span.get_span_context()

# Get the traceparent and tracestate from the span context
# Follows the W3C Trace Context specification for traceparent
# https://www.w3.org/TR/trace-context/#traceparent-header
trace_id = format(span_context.trace_id, '032x')
span_id = format(span_context.span_id, '016x')
trace_flags = format(span_context.trace_flags, '02x')
trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}"

trace_state = span_context.trace_state
trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()

response: List[Any] = await self._post_async_request(
request_url,
Expand Down Expand Up @@ -563,9 +551,14 @@ async def signal_entity(self, entityId: EntityId, operation_name: str,
entity_Id=entityId)

request_url = options.to_url(self._orchestration_bindings.rpc_base_url)

trace_parent, trace_state = DurableOrchestrationClient._get_current_activity_context()

response = await self._post_async_request(
request_url,
json.dumps(operation_input) if operation_input else None)
json.dumps(operation_input) if operation_input else None,
trace_parent,
trace_state)

switch_statement = {
202: lambda: None # signal accepted
Expand Down Expand Up @@ -797,3 +790,21 @@ async def resume(self, instance_id: str, reason: str) -> None:
error_message = has_error_message()
if error_message:
raise Exception(error_message)

@staticmethod
def _get_current_activity_context() -> tuple[str, str]:
# Get the current span
current_span = trace.get_current_span()
span_context = current_span.get_span_context()

# Get the traceparent and tracestate from the span context
# Follows the W3C Trace Context specification for traceparent
# https://www.w3.org/TR/trace-context/#traceparent-header
trace_id = format(span_context.trace_id, '032x')
span_id = format(span_context.span_id, '016x')
trace_flags = format(span_context.trace_flags, '02x')
trace_parent = f"00-{trace_id}-{span_id}-{trace_flags}"

trace_state = span_context.trace_state

return trace_parent, trace_state
16 changes: 16 additions & 0 deletions azure/durable_functions/models/entities/OperationResult.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class OperationResult:
def __init__(self,
is_error: bool,
duration: int,
start_time: int,
result: Optional[str] = None):
"""Instantiate an OperationResult.

Expand All @@ -21,11 +22,14 @@ def __init__(self,
Whether or not the operation resulted in an exception.
duration: int
How long the operation took, in milliseconds.
start_time: int
The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC.
result: Optional[str]
The operation result. Defaults to None.
"""
self._is_error: bool = is_error
self._duration: int = duration
self._start_time: int = start_time
self._result: Optional[str] = result

@property
Expand All @@ -49,6 +53,17 @@ def duration(self) -> int:
The duration of this operation, in milliseconds
"""
return self._duration

@property
def start_time(self) -> int:
"""Get the start time of this operation.

Returns
-------
int:
The start time of this operation's execution, in milliseconds, since January 1st 1970 midnight in UTC
"""
return self._start_time

@property
def result(self) -> Any:
Expand All @@ -72,5 +87,6 @@ def to_json(self) -> Dict[str, Any]:
to_json: Dict[str, Any] = {}
to_json["isError"] = self.is_error
to_json["duration"] = self.duration
to_json["startTime"] = self.start_time
to_json["result"] = json.dumps(self.result, default=_serialize_custom_object)
return to_json
2 changes: 2 additions & 0 deletions tests/orchestrator/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ def apply_operation(entity_state: EntityState, result: Any, state: Any, is_error
# We cannot control duration, so default it to zero and avoid checking for it
# in later asserts
duration = 0
start_time = 0
operation_result = OperationResult(
is_error=is_error,
duration=duration,
start_time=start_time,
result=result
)
entity_state._results.append(operation_result)
Expand Down
Loading