Skip to content

Commit dda47a1

Browse files
committed
Use wrapt.ObjectProxy for _OpenTelemetryAioServicerContext
Using wrapt eliminates a bunch of the boilerplate associated with inheriting from grpc.aio.ServicerContext directly. It also means that if methods are added or their signatures change, there will only be a breaking change for abort, set_code, or set_details.
1 parent 561dd29 commit dda47a1

File tree

1 file changed

+23
-91
lines changed
  • instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc

1 file changed

+23
-91
lines changed

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py

Lines changed: 23 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import grpc
1616
import grpc.aio
17+
import wrapt
1718

1819
from ._server import (
1920
OpenTelemetryServerInterceptor,
@@ -23,121 +24,52 @@
2324
from opentelemetry.semconv.trace import SpanAttributes
2425
from opentelemetry.trace.status import Status, StatusCode
2526

26-
# pylint:disable=abstract-method
27-
class _OpenTelemetryAioServicerContext(grpc.aio.ServicerContext):
27+
class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy):
2828
def __init__(self, servicer_context, active_span):
29-
self._servicer_context = servicer_context
30-
self._active_span = active_span
31-
self._code = grpc.StatusCode.OK
32-
self._details = None
33-
super().__init__()
34-
35-
def __getattr__(self, attr):
36-
return getattr(self._servicer_context, attr)
37-
38-
async def read(self):
39-
return await self._servicer_context.read()
40-
41-
async def write(self, message):
42-
return await self._servicer_context.write(message)
43-
44-
def is_active(self, *args, **kwargs):
45-
return self._servicer_context.is_active(*args, **kwargs)
46-
47-
def time_remaining(self, *args, **kwargs):
48-
return self._servicer_context.time_remaining(*args, **kwargs)
49-
50-
def cancel(self, *args, **kwargs):
51-
return self._servicer_context.cancel(*args, **kwargs)
52-
53-
def add_callback(self, *args, **kwargs):
54-
return self._servicer_context.add_callback(*args, **kwargs)
55-
56-
def disable_next_message_compression(self):
57-
return self._service_context.disable_next_message_compression()
58-
59-
def invocation_metadata(self, *args, **kwargs):
60-
return self._servicer_context.invocation_metadata(*args, **kwargs)
61-
62-
def peer(self):
63-
return self._servicer_context.peer()
64-
65-
def peer_identities(self):
66-
return self._servicer_context.peer_identities()
67-
68-
def peer_identity_key(self):
69-
return self._servicer_context.peer_identity_key()
70-
71-
def auth_context(self):
72-
return self._servicer_context.auth_context()
73-
74-
def set_compression(self, compression):
75-
return self._servicer_context.set_compression(compression)
76-
77-
async def send_initial_metadata(self, *args, **kwargs):
78-
return await self._servicer_context.send_initial_metadata(*args, **kwargs)
79-
80-
def set_trailing_metadata(self, *args, **kwargs):
81-
return self._servicer_context.set_trailing_metadata(*args, **kwargs)
82-
83-
def trailing_metadata(self):
84-
return self._servicer_context.trailing_metadata()
85-
86-
async def abort(self, code, details = "", trailing_metadata = tuple()):
87-
self._code = code
88-
self._details = details
89-
self._active_span.set_attribute(
29+
super().__init__(servicer_context)
30+
self._self_active_span = active_span
31+
self._self_code = grpc.StatusCode.OK
32+
self._self_details = None
33+
34+
async def abort(self, code, details="", trailing_metadata=tuple()):
35+
self._self_code = code
36+
self._self_details = details
37+
self._self_active_span.set_attribute(
9038
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
9139
)
92-
self._active_span.set_status(
40+
self._self_active_span.set_status(
9341
Status(
9442
status_code=StatusCode.ERROR,
9543
description=f"{code}:{details}",
9644
)
9745
)
98-
return await self._servicer_context.abort(code, details, trailing_metadata)
99-
100-
def code(self):
101-
if not hasattr(self._servicer_context, "code"):
102-
raise RuntimeError(
103-
"code() is not supported with the installed version of grpcio"
104-
)
105-
return self._servicer_context.code()
106-
107-
def details(self):
108-
if not hasattr(self._servicer_context, "details"):
109-
raise RuntimeError(
110-
"details() is not supported with the installed version of "
111-
"grpcio"
112-
)
113-
return self._servicer_context.details()
46+
return await self.__wrapped__.abort(code, details, trailing_metadata)
11447

11548
def set_code(self, code):
116-
self._code = code
117-
# use details if we already have it, otherwise the status description
118-
details = self._details or code.value[1]
119-
self._active_span.set_attribute(
49+
self._self_code = code
50+
details = self._self_details or code.value[1]
51+
self._self_active_span.set_attribute(
12052
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
12153
)
12254
if code != grpc.StatusCode.OK:
123-
self._active_span.set_status(
55+
self._self_active_span.set_status(
12456
Status(
12557
status_code=StatusCode.ERROR,
12658
description=f"{code}:{details}",
12759
)
12860
)
129-
return self._servicer_context.set_code(code)
61+
return self.__wrapped__.set_code(code)
13062

13163
def set_details(self, details):
132-
self._details = details
133-
if self._code != grpc.StatusCode.OK:
134-
self._active_span.set_status(
64+
self._self_details = details
65+
if self._self_code != grpc.StatusCode.OK:
66+
self._self_active_span.set_status(
13567
Status(
13668
status_code=StatusCode.ERROR,
137-
description=f"{self._code}:{details}",
69+
description=f"{self._self_code}:{details}",
13870
)
13971
)
140-
return self._servicer_context.set_details(details)
72+
return self.__wrapped__.set_details(details)
14173

14274

14375
class OpenTelemetryAioServerInterceptor(

0 commit comments

Comments
 (0)