Skip to content

Commit 9e6be51

Browse files
authored
Merge branch 'main' into emptyroute
2 parents 17f2cce + a45c9c3 commit 9e6be51

File tree

9 files changed

+347
-46
lines changed

9 files changed

+347
-46
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020

2121
### Added
2222

23+
- Fix async redis clients not being traced correctly ([#1830](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1830))
2324
- Make Flask request span attributes available for `start_span`.
2425
([#1784](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1784))
2526
- Fix falcon instrumentation's usage of Span Status to only set the description if the status code is ERROR.
@@ -33,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3334
([#1789](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1789))
3435
- `opentelemetry-instrumentation-grpc` Allow gRPC connections via Unix socket
3536
([#1833](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1833))
37+
- Fix elasticsearch `Transport.perform_request` instrument wrap for elasticsearch >= 8
38+
([#1810](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1810))
3639

3740
## Version 1.18.0/0.39b0 (2023-05-10)
3841

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ Approvers ([@open-telemetry/python-approvers](https://github.com/orgs/open-telem
9797
- [Aaron Abbott](https://github.com/aabmass), Google
9898
- [Jeremy Voss](https://github.com/jeremydvoss), Microsoft
9999
- [Sanket Mehta](https://github.com/sanketmehta28), Cisco
100-
- [Shalev Roda](https://github.com/shalevr), Cisco
101100

102101
Emeritus Approvers:
103102

@@ -112,6 +111,7 @@ Maintainers ([@open-telemetry/python-maintainers](https://github.com/orgs/open-t
112111

113112
- [Diego Hurtado](https://github.com/ocelotl), Lightstep
114113
- [Leighton Chen](https://github.com/lzchen), Microsoft
114+
- [Shalev Roda](https://github.com/shalevr), Cisco
115115

116116
Emeritus Maintainers:
117117

instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def response_hook(span, response):
9898

9999
from .utils import sanitize_body
100100

101+
# Split of elasticsearch and elastic_transport in 8.0.0+
102+
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/master/release-notes.html#rn-8-0-0
103+
es_transport_split = elasticsearch.VERSION[0] > 7
104+
if es_transport_split:
105+
import elastic_transport
106+
101107
logger = getLogger(__name__)
102108

103109

@@ -137,16 +143,28 @@ def _instrument(self, **kwargs):
137143
tracer = get_tracer(__name__, __version__, tracer_provider)
138144
request_hook = kwargs.get("request_hook")
139145
response_hook = kwargs.get("response_hook")
140-
_wrap(
141-
elasticsearch,
142-
"Transport.perform_request",
143-
_wrap_perform_request(
144-
tracer,
145-
self._span_name_prefix,
146-
request_hook,
147-
response_hook,
148-
),
149-
)
146+
if es_transport_split:
147+
_wrap(
148+
elastic_transport,
149+
"Transport.perform_request",
150+
_wrap_perform_request(
151+
tracer,
152+
self._span_name_prefix,
153+
request_hook,
154+
response_hook,
155+
),
156+
)
157+
else:
158+
_wrap(
159+
elasticsearch,
160+
"Transport.perform_request",
161+
_wrap_perform_request(
162+
tracer,
163+
self._span_name_prefix,
164+
request_hook,
165+
response_hook,
166+
),
167+
)
150168

151169
def _uninstrument(self, **kwargs):
152170
unwrap(elasticsearch.Transport, "perform_request")
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from elasticsearch_dsl import Document, Keyword, Text
16+
17+
18+
class Article(Document):
19+
title = Text(analyzer="snowball", fields={"raw": Keyword()})
20+
body = Text(analyzer="snowball")
21+
22+
class Index:
23+
name = "test-index"
24+
25+
26+
dsl_create_statement = {
27+
"mappings": {
28+
"properties": {
29+
"title": {
30+
"analyzer": "snowball",
31+
"fields": {"raw": {"type": "keyword"}},
32+
"type": "text",
33+
},
34+
"body": {"analyzer": "snowball", "type": "text"},
35+
}
36+
}
37+
}
38+
dsl_index_result = (1, {}, '{"result": "created"}')
39+
dsl_index_span_name = "Elasticsearch/test-index/_doc/2"
40+
dsl_index_url = "/test-index/_doc/2"
41+
dsl_search_method = "POST"

instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737

3838
major_version = elasticsearch.VERSION[0]
3939

40-
if major_version == 7:
40+
if major_version == 8:
41+
from . import helpers_es8 as helpers # pylint: disable=no-name-in-module
42+
elif major_version == 7:
4143
from . import helpers_es7 as helpers # pylint: disable=no-name-in-module
4244
elif major_version == 6:
4345
from . import helpers_es6 as helpers # pylint: disable=no-name-in-module

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,52 @@ def _set_connection_attributes(span, conn):
136136
span.set_attribute(key, value)
137137

138138

139+
def _build_span_name(instance, cmd_args):
140+
if len(cmd_args) > 0 and cmd_args[0]:
141+
name = cmd_args[0]
142+
else:
143+
name = instance.connection_pool.connection_kwargs.get("db", 0)
144+
return name
145+
146+
147+
def _build_span_meta_data_for_pipeline(instance):
148+
try:
149+
command_stack = (
150+
instance.command_stack
151+
if hasattr(instance, "command_stack")
152+
else instance._command_stack
153+
)
154+
155+
cmds = [
156+
_format_command_args(c.args if hasattr(c, "args") else c[0])
157+
for c in command_stack
158+
]
159+
resource = "\n".join(cmds)
160+
161+
span_name = " ".join(
162+
[
163+
(c.args[0] if hasattr(c, "args") else c[0][0])
164+
for c in command_stack
165+
]
166+
)
167+
except (AttributeError, IndexError):
168+
command_stack = []
169+
resource = ""
170+
span_name = ""
171+
172+
return command_stack, resource, span_name
173+
174+
175+
# pylint: disable=R0915
139176
def _instrument(
140177
tracer,
141178
request_hook: _RequestHookT = None,
142179
response_hook: _ResponseHookT = None,
143180
):
144181
def _traced_execute_command(func, instance, args, kwargs):
145182
query = _format_command_args(args)
183+
name = _build_span_name(instance, args)
146184

147-
if len(args) > 0 and args[0]:
148-
name = args[0]
149-
else:
150-
name = instance.connection_pool.connection_kwargs.get("db", 0)
151185
with tracer.start_as_current_span(
152186
name, kind=trace.SpanKind.CLIENT
153187
) as span:
@@ -163,31 +197,11 @@ def _traced_execute_command(func, instance, args, kwargs):
163197
return response
164198

165199
def _traced_execute_pipeline(func, instance, args, kwargs):
166-
try:
167-
command_stack = (
168-
instance.command_stack
169-
if hasattr(instance, "command_stack")
170-
else instance._command_stack
171-
)
172-
173-
cmds = [
174-
_format_command_args(
175-
c.args if hasattr(c, "args") else c[0],
176-
)
177-
for c in command_stack
178-
]
179-
resource = "\n".join(cmds)
180-
181-
span_name = " ".join(
182-
[
183-
(c.args[0] if hasattr(c, "args") else c[0][0])
184-
for c in command_stack
185-
]
186-
)
187-
except (AttributeError, IndexError):
188-
command_stack = []
189-
resource = ""
190-
span_name = ""
200+
(
201+
command_stack,
202+
resource,
203+
span_name,
204+
) = _build_span_meta_data_for_pipeline(instance)
191205

192206
with tracer.start_as_current_span(
193207
span_name, kind=trace.SpanKind.CLIENT
@@ -232,32 +246,72 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
232246
"ClusterPipeline.execute",
233247
_traced_execute_pipeline,
234248
)
249+
250+
async def _async_traced_execute_command(func, instance, args, kwargs):
251+
query = _format_command_args(args)
252+
name = _build_span_name(instance, args)
253+
254+
with tracer.start_as_current_span(
255+
name, kind=trace.SpanKind.CLIENT
256+
) as span:
257+
if span.is_recording():
258+
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
259+
_set_connection_attributes(span, instance)
260+
span.set_attribute("db.redis.args_length", len(args))
261+
if callable(request_hook):
262+
request_hook(span, instance, args, kwargs)
263+
response = await func(*args, **kwargs)
264+
if callable(response_hook):
265+
response_hook(span, instance, response)
266+
return response
267+
268+
async def _async_traced_execute_pipeline(func, instance, args, kwargs):
269+
(
270+
command_stack,
271+
resource,
272+
span_name,
273+
) = _build_span_meta_data_for_pipeline(instance)
274+
275+
with tracer.start_as_current_span(
276+
span_name, kind=trace.SpanKind.CLIENT
277+
) as span:
278+
if span.is_recording():
279+
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
280+
_set_connection_attributes(span, instance)
281+
span.set_attribute(
282+
"db.redis.pipeline_length", len(command_stack)
283+
)
284+
response = await func(*args, **kwargs)
285+
if callable(response_hook):
286+
response_hook(span, instance, response)
287+
return response
288+
235289
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
236290
wrap_function_wrapper(
237291
"redis.asyncio",
238292
f"{redis_class}.execute_command",
239-
_traced_execute_command,
293+
_async_traced_execute_command,
240294
)
241295
wrap_function_wrapper(
242296
"redis.asyncio.client",
243297
f"{pipeline_class}.execute",
244-
_traced_execute_pipeline,
298+
_async_traced_execute_pipeline,
245299
)
246300
wrap_function_wrapper(
247301
"redis.asyncio.client",
248302
f"{pipeline_class}.immediate_execute_command",
249-
_traced_execute_command,
303+
_async_traced_execute_command,
250304
)
251305
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
252306
wrap_function_wrapper(
253307
"redis.asyncio.cluster",
254308
"RedisCluster.execute_command",
255-
_traced_execute_command,
309+
_async_traced_execute_command,
256310
)
257311
wrap_function_wrapper(
258312
"redis.asyncio.cluster",
259313
"ClusterPipeline.execute",
260-
_traced_execute_pipeline,
314+
_async_traced_execute_pipeline,
261315
)
262316

263317

instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,36 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import asyncio
1415
from unittest import mock
1516

1617
import redis
18+
import redis.asyncio
1719

1820
from opentelemetry import trace
1921
from opentelemetry.instrumentation.redis import RedisInstrumentor
2022
from opentelemetry.test.test_base import TestBase
2123
from opentelemetry.trace import SpanKind
2224

2325

26+
class AsyncMock:
27+
"""A sufficient async mock implementation.
28+
29+
Python 3.7 doesn't have an inbuilt async mock class, so this is used.
30+
"""
31+
32+
def __init__(self):
33+
self.mock = mock.Mock()
34+
35+
async def __call__(self, *args, **kwargs):
36+
future = asyncio.Future()
37+
future.set_result("random")
38+
return future
39+
40+
def __getattr__(self, item):
41+
return AsyncMock()
42+
43+
2444
class TestRedis(TestBase):
2545
def setUp(self):
2646
super().setUp()
@@ -87,6 +107,35 @@ def test_instrument_uninstrument(self):
87107
spans = self.memory_exporter.get_finished_spans()
88108
self.assertEqual(len(spans), 1)
89109

110+
def test_instrument_uninstrument_async_client_command(self):
111+
redis_client = redis.asyncio.Redis()
112+
113+
with mock.patch.object(redis_client, "connection", AsyncMock()):
114+
asyncio.run(redis_client.get("key"))
115+
116+
spans = self.memory_exporter.get_finished_spans()
117+
self.assertEqual(len(spans), 1)
118+
self.memory_exporter.clear()
119+
120+
# Test uninstrument
121+
RedisInstrumentor().uninstrument()
122+
123+
with mock.patch.object(redis_client, "connection", AsyncMock()):
124+
asyncio.run(redis_client.get("key"))
125+
126+
spans = self.memory_exporter.get_finished_spans()
127+
self.assertEqual(len(spans), 0)
128+
self.memory_exporter.clear()
129+
130+
# Test instrument again
131+
RedisInstrumentor().instrument()
132+
133+
with mock.patch.object(redis_client, "connection", AsyncMock()):
134+
asyncio.run(redis_client.get("key"))
135+
136+
spans = self.memory_exporter.get_finished_spans()
137+
self.assertEqual(len(spans), 1)
138+
90139
def test_response_hook(self):
91140
redis_client = redis.Redis()
92141
connection = redis.connection.Connection()

0 commit comments

Comments
 (0)