Skip to content

Commit d854c52

Browse files
javferrodocelotl
andauthored
feat(confluent-kafka): Add instrumentation to consume method (#1786)
Co-authored-by: Diego Hurtado <[email protected]>
1 parent 0871dd4 commit d854c52

File tree

5 files changed

+271
-25
lines changed

5 files changed

+271
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6161
- Add optional distro and configurator selection for auto-instrumentation
6262
([#1823](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1823))
6363

64+
### Added
65+
- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method
66+
([#1786](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1786))
67+
6468
## Version 1.18.0/0.39b0 (2023-05-10)
6569

6670
- Update runtime metrics to follow semantic conventions

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
112112
from .package import _instruments
113113
from .utils import (
114114
KafkaPropertiesExtractor,
115+
_end_current_consume_span,
116+
_create_new_consume_span,
115117
_enrich_span,
116118
_get_span_name,
117119
_kafka_getter,
@@ -137,6 +139,12 @@ def __init__(self, config):
137139
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
138140
return super().poll(timeout)
139141

142+
# This method is deliberately implemented in order to allow wrapt to wrap this function
143+
def consume(
144+
self, *args, **kwargs
145+
): # pylint: disable=useless-super-delegation
146+
return super().consume(*args, **kwargs)
147+
140148

141149
class ProxiedProducer(Producer):
142150
def __init__(self, producer: Producer, tracer: Tracer):
@@ -177,10 +185,14 @@ def committed(self, partitions, timeout=-1):
177185
def commit(self, *args, **kwargs):
178186
return self._consumer.commit(*args, **kwargs)
179187

180-
def consume(
181-
self, num_messages=1, *args, **kwargs
182-
): # pylint: disable=keyword-arg-before-vararg
183-
return self._consumer.consume(num_messages, *args, **kwargs)
188+
def consume(self, *args, **kwargs):
189+
return ConfluentKafkaInstrumentor.wrap_consume(
190+
self._consumer.consume,
191+
self,
192+
self._tracer,
193+
args,
194+
kwargs,
195+
)
184196

185197
def get_watermark_offsets(
186198
self, partition, timeout=-1, *args, **kwargs
@@ -275,6 +287,11 @@ def _inner_wrap_poll(func, instance, args, kwargs):
275287
func, instance, self._tracer, args, kwargs
276288
)
277289

290+
def _inner_wrap_consume(func, instance, args, kwargs):
291+
return ConfluentKafkaInstrumentor.wrap_consume(
292+
func, instance, self._tracer, args, kwargs
293+
)
294+
278295
wrapt.wrap_function_wrapper(
279296
AutoInstrumentedProducer,
280297
"produce",
@@ -287,6 +304,12 @@ def _inner_wrap_poll(func, instance, args, kwargs):
287304
_inner_wrap_poll,
288305
)
289306

307+
wrapt.wrap_function_wrapper(
308+
AutoInstrumentedConsumer,
309+
"consume",
310+
_inner_wrap_consume,
311+
)
312+
290313
def _uninstrument(self, **kwargs):
291314
confluent_kafka.Producer = self._original_kafka_producer
292315
confluent_kafka.Consumer = self._original_kafka_consumer
@@ -326,29 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs):
326349
@staticmethod
327350
def wrap_poll(func, instance, tracer, args, kwargs):
328351
if instance._current_consume_span:
329-
context.detach(instance._current_context_token)
330-
instance._current_context_token = None
331-
instance._current_consume_span.end()
332-
instance._current_consume_span = None
352+
_end_current_consume_span(instance)
333353

334354
with tracer.start_as_current_span(
335355
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
336356
):
337357
record = func(*args, **kwargs)
338358
if record:
339-
links = []
340-
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
341-
if ctx:
342-
for item in ctx.values():
343-
if hasattr(item, "get_span_context"):
344-
links.append(Link(context=item.get_span_context()))
345-
346-
instance._current_consume_span = tracer.start_span(
347-
name=f"{record.topic()} process",
348-
links=links,
349-
kind=SpanKind.CONSUMER,
350-
)
351-
359+
_create_new_consume_span(instance, tracer, [record])
352360
_enrich_span(
353361
instance._current_consume_span,
354362
record.topic(),
@@ -361,3 +369,26 @@ def wrap_poll(func, instance, tracer, args, kwargs):
361369
)
362370

363371
return record
372+
373+
@staticmethod
374+
def wrap_consume(func, instance, tracer, args, kwargs):
375+
if instance._current_consume_span:
376+
_end_current_consume_span(instance)
377+
378+
with tracer.start_as_current_span(
379+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
380+
):
381+
records = func(*args, **kwargs)
382+
if len(records) > 0:
383+
_create_new_consume_span(instance, tracer, records)
384+
_enrich_span(
385+
instance._current_consume_span,
386+
records[0].topic(),
387+
operation=MessagingOperationValues.PROCESS,
388+
)
389+
390+
instance._current_context_token = context.attach(
391+
trace.set_span_in_context(instance._current_consume_span)
392+
)
393+
394+
return records

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from logging import getLogger
22
from typing import List, Optional
33

4+
from opentelemetry import context, propagate
5+
from opentelemetry.trace import SpanKind, Link
46
from opentelemetry.propagators import textmap
57
from opentelemetry.semconv.trace import (
68
MessagingDestinationKindValues,
@@ -81,6 +83,34 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
8183
_kafka_getter = KafkaContextGetter()
8284

8385

86+
def _end_current_consume_span(instance):
87+
context.detach(instance._current_context_token)
88+
instance._current_context_token = None
89+
instance._current_consume_span.end()
90+
instance._current_consume_span = None
91+
92+
93+
def _create_new_consume_span(instance, tracer, records):
94+
links = _get_links_from_records(records)
95+
instance._current_consume_span = tracer.start_span(
96+
name=f"{records[0].topic()} process",
97+
links=links,
98+
kind=SpanKind.CONSUMER,
99+
)
100+
101+
102+
def _get_links_from_records(records):
103+
links = []
104+
for record in records:
105+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
106+
if ctx:
107+
for item in ctx.values():
108+
if hasattr(item, "get_span_context"):
109+
links.append(Link(context=item.get_span_context()))
110+
111+
return links
112+
113+
84114
def _enrich_span(
85115
span,
86116
topic,
@@ -94,7 +124,7 @@ def _enrich_span(
94124
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
95125
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
96126

97-
if partition:
127+
if partition is not None:
98128
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
99129

100130
span.set_attribute(
@@ -109,7 +139,7 @@ def _enrich_span(
109139

110140
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
111141
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
112-
if partition and offset and topic:
142+
if partition is not None and offset is not None and topic:
113143
span.set_attribute(
114144
SpanAttributes.MESSAGING_MESSAGE_ID,
115145
f"{topic}.{partition}.{offset}",

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 144 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414

1515
# pylint: disable=no-name-in-module
1616

17-
from unittest import TestCase
17+
from opentelemetry.semconv.trace import (
18+
SpanAttributes,
19+
MessagingDestinationKindValues,
20+
)
21+
from opentelemetry.test.test_base import TestBase
22+
from .utils import MockConsumer, MockedMessage
1823

1924
from confluent_kafka import Consumer, Producer
2025

@@ -29,7 +34,7 @@
2934
)
3035

3136

32-
class TestConfluentKafka(TestCase):
37+
class TestConfluentKafka(TestBase):
3338
def test_instrument_api(self) -> None:
3439
instrumentation = ConfluentKafkaInstrumentor()
3540

@@ -104,3 +109,140 @@ def test_context_getter(self) -> None:
104109
context_setter.set(carrier_list, "key1", "val1")
105110
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
106111
self.assertEqual(["key1"], context_getter.keys(carrier_list))
112+
113+
def test_poll(self) -> None:
114+
instrumentation = ConfluentKafkaInstrumentor()
115+
mocked_messages = [
116+
MockedMessage("topic-10", 0, 0, []),
117+
MockedMessage("topic-20", 2, 4, []),
118+
MockedMessage("topic-30", 1, 3, []),
119+
]
120+
expected_spans = [
121+
{"name": "recv", "attributes": {}},
122+
{
123+
"name": "topic-10 process",
124+
"attributes": {
125+
SpanAttributes.MESSAGING_OPERATION: "process",
126+
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
127+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
128+
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
129+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
130+
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
131+
},
132+
},
133+
{"name": "recv", "attributes": {}},
134+
{
135+
"name": "topic-20 process",
136+
"attributes": {
137+
SpanAttributes.MESSAGING_OPERATION: "process",
138+
SpanAttributes.MESSAGING_KAFKA_PARTITION: 2,
139+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
140+
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
141+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
142+
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
143+
},
144+
},
145+
{"name": "recv", "attributes": {}},
146+
{
147+
"name": "topic-30 process",
148+
"attributes": {
149+
SpanAttributes.MESSAGING_OPERATION: "process",
150+
SpanAttributes.MESSAGING_KAFKA_PARTITION: 1,
151+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
152+
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
153+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
154+
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
155+
},
156+
},
157+
{"name": "recv", "attributes": {}},
158+
]
159+
160+
consumer = MockConsumer(
161+
mocked_messages,
162+
{
163+
"bootstrap.servers": "localhost:29092",
164+
"group.id": "mygroup",
165+
"auto.offset.reset": "earliest",
166+
},
167+
)
168+
self.memory_exporter.clear()
169+
consumer = instrumentation.instrument_consumer(consumer)
170+
consumer.poll()
171+
consumer.poll()
172+
consumer.poll()
173+
consumer.poll()
174+
175+
span_list = self.memory_exporter.get_finished_spans()
176+
self._compare_spans(span_list, expected_spans)
177+
178+
def test_consume(self) -> None:
179+
instrumentation = ConfluentKafkaInstrumentor()
180+
mocked_messages = [
181+
MockedMessage("topic-1", 0, 0, []),
182+
MockedMessage("topic-1", 2, 1, []),
183+
MockedMessage("topic-1", 3, 2, []),
184+
MockedMessage("topic-2", 0, 0, []),
185+
MockedMessage("topic-3", 0, 3, []),
186+
MockedMessage("topic-2", 0, 1, []),
187+
]
188+
expected_spans = [
189+
{"name": "recv", "attributes": {}},
190+
{
191+
"name": "topic-1 process",
192+
"attributes": {
193+
SpanAttributes.MESSAGING_OPERATION: "process",
194+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
195+
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
196+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
197+
},
198+
},
199+
{"name": "recv", "attributes": {}},
200+
{
201+
"name": "topic-2 process",
202+
"attributes": {
203+
SpanAttributes.MESSAGING_OPERATION: "process",
204+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
205+
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
206+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
207+
},
208+
},
209+
{"name": "recv", "attributes": {}},
210+
{
211+
"name": "topic-3 process",
212+
"attributes": {
213+
SpanAttributes.MESSAGING_OPERATION: "process",
214+
SpanAttributes.MESSAGING_SYSTEM: "kafka",
215+
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
216+
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
217+
},
218+
},
219+
{"name": "recv", "attributes": {}},
220+
]
221+
222+
consumer = MockConsumer(
223+
mocked_messages,
224+
{
225+
"bootstrap.servers": "localhost:29092",
226+
"group.id": "mygroup",
227+
"auto.offset.reset": "earliest",
228+
},
229+
)
230+
231+
self.memory_exporter.clear()
232+
consumer = instrumentation.instrument_consumer(consumer)
233+
consumer.consume(3)
234+
consumer.consume(1)
235+
consumer.consume(2)
236+
consumer.consume(1)
237+
span_list = self.memory_exporter.get_finished_spans()
238+
self._compare_spans(span_list, expected_spans)
239+
240+
def _compare_spans(self, spans, expected_spans):
241+
for span, expected_span in zip(spans, expected_spans):
242+
self.assertEqual(expected_span["name"], span.name)
243+
for attribute_key, expected_attribute_value in expected_span[
244+
"attributes"
245+
].items():
246+
self.assertEqual(
247+
expected_attribute_value, span.attributes[attribute_key]
248+
)

0 commit comments

Comments
 (0)