Skip to content

Commit afa86a5

Browse files
committed
OpenTelemetry metrics collector & tests
1 parent b9d2a79 commit afa86a5

File tree

3 files changed

+296
-17
lines changed

3 files changed

+296
-17
lines changed

pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
<slf4j.version>1.7.36</slf4j.version>
5858
<metrics.version>4.2.10</metrics.version>
5959
<micrometer.version>1.9.2</micrometer.version>
60+
<opentelemetry.version>1.16.0</opentelemetry.version>
6061
<jackson.version>2.13.3</jackson.version>
6162
<logback.version>1.2.11</logback.version>
6263
<junit.version>4.13.2</junit.version>
@@ -704,6 +705,12 @@
704705
<version>${micrometer.version}</version>
705706
<optional>true</optional>
706707
</dependency>
708+
<dependency>
709+
<groupId>io.opentelemetry</groupId>
710+
<artifactId>opentelemetry-api</artifactId>
711+
<version>${opentelemetry.version}</version>
712+
<optional>true</optional>
713+
</dependency>
707714
<dependency>
708715
<groupId>com.fasterxml.jackson.core</groupId>
709716
<artifactId>jackson-databind</artifactId>
@@ -752,6 +759,12 @@
752759
<version>${netcrusher.version}</version>
753760
<scope>test</scope>
754761
</dependency>
762+
<dependency>
763+
<groupId>io.opentelemetry</groupId>
764+
<artifactId>opentelemetry-sdk-testing</artifactId>
765+
<version>${opentelemetry.version}</version>
766+
<scope>test</scope>
767+
</dependency>
755768

756769
</dependencies>
757770

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl;
17+
18+
import com.rabbitmq.client.Channel;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.MetricsCollector;
21+
import io.opentelemetry.api.OpenTelemetry;
22+
import io.opentelemetry.api.common.Attributes;
23+
import io.opentelemetry.api.metrics.LongCounter;
24+
import io.opentelemetry.api.metrics.Meter;
25+
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
/**
29+
* OpenTelemetry implementation of {@link MetricsCollector}.
30+
*
31+
* @see MetricsCollector
32+
* @since 5.15.1
33+
*/
34+
public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
35+
36+
private final Attributes attributes;
37+
38+
private final AtomicLong connections = new AtomicLong(0L);
39+
private final AtomicLong channels = new AtomicLong(0L);
40+
41+
private final LongCounter publishedMessagesCounter;
42+
private final LongCounter consumedMessagesCounter;
43+
private final LongCounter acknowledgedMessagesCounter;
44+
private final LongCounter rejectedMessagesCounter;
45+
private final LongCounter failedToPublishMessagesCounter;
46+
private final LongCounter ackedPublishedMessagesCounter;
47+
private final LongCounter nackedPublishedMessagesCounter;
48+
private final LongCounter unroutedPublishedMessagesCounter;
49+
50+
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
51+
this(openTelemetry, "rabbitmq");
52+
}
53+
54+
public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final String prefix) {
55+
this(openTelemetry, prefix, Attributes.empty());
56+
}
57+
58+
public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final String prefix, final Attributes attributes) {
59+
// initialize meter
60+
Meter meter = openTelemetry.getMeter("amqp-client");
61+
62+
// attributes
63+
this.attributes = attributes;
64+
65+
// connections
66+
meter.gaugeBuilder(prefix + ".connections")
67+
.setUnit("{connections}")
68+
.setDescription("The number of connections to the RabbitMQ server")
69+
.ofLongs()
70+
.buildWithCallback(measurement -> measurement.record(connections.get(), attributes));
71+
72+
// channels
73+
meter.gaugeBuilder(prefix + ".channels")
74+
.setUnit("{channels}")
75+
.setDescription("The number of channels to the RabbitMQ server")
76+
.ofLongs()
77+
.buildWithCallback(measurement -> measurement.record(channels.get(), attributes));
78+
79+
// publishedMessages
80+
this.publishedMessagesCounter = meter.counterBuilder(prefix + ".published")
81+
.setUnit("{messages}")
82+
.setDescription("The number of messages published to the RabbitMQ server")
83+
.build();
84+
85+
// consumedMessages
86+
this.consumedMessagesCounter = meter.counterBuilder(prefix + ".consumed")
87+
.setUnit("{messages}")
88+
.setDescription("The number of messages consumed from the RabbitMQ server")
89+
.build();
90+
91+
// acknowledgedMessages
92+
this.acknowledgedMessagesCounter = meter.counterBuilder(prefix + ".acknowledged")
93+
.setUnit("{messages}")
94+
.setDescription("The number of messages acknowledged from the RabbitMQ server")
95+
.build();
96+
97+
// rejectedMessages
98+
this.rejectedMessagesCounter = meter.counterBuilder(prefix + ".rejected")
99+
.setUnit("{messages}")
100+
.setDescription("The number of messages rejected from the RabbitMQ server")
101+
.build();
102+
103+
// failedToPublishMessages
104+
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
105+
.setUnit("{messages}")
106+
.setDescription("The number of messages failed to publish to the RabbitMQ server")
107+
.build();
108+
109+
// ackedPublishedMessages
110+
this.ackedPublishedMessagesCounter = meter.counterBuilder(prefix + ".acknowledged_published")
111+
.setUnit("{messages}")
112+
.setDescription("The number of published messages acknowledged by the RabbitMQ server")
113+
.build();
114+
115+
// nackedPublishedMessages
116+
this.nackedPublishedMessagesCounter = meter.counterBuilder(prefix + ".not_acknowledged_published")
117+
.setUnit("{messages}")
118+
.setDescription("The number of published messages not acknowledged by the RabbitMQ server")
119+
.build();
120+
121+
// unroutedPublishedMessages
122+
this.unroutedPublishedMessagesCounter = meter.counterBuilder(prefix + ".unrouted_published")
123+
.setUnit("{messages}")
124+
.setDescription("The number of un-routed published messages to the RabbitMQ server")
125+
.build();
126+
}
127+
128+
@Override
129+
protected void incrementConnectionCount(Connection connection) {
130+
connections.incrementAndGet();
131+
}
132+
133+
@Override
134+
protected void decrementConnectionCount(Connection connection) {
135+
connections.decrementAndGet();
136+
}
137+
138+
@Override
139+
protected void incrementChannelCount(Channel channel) {
140+
channels.incrementAndGet();
141+
}
142+
143+
@Override
144+
protected void decrementChannelCount(Channel channel) {
145+
channels.decrementAndGet();
146+
}
147+
148+
@Override
149+
protected void markPublishedMessage() {
150+
publishedMessagesCounter.add(1L, attributes);
151+
}
152+
153+
@Override
154+
protected void markMessagePublishFailed() {
155+
failedToPublishMessagesCounter.add(1L, attributes);
156+
}
157+
158+
@Override
159+
protected void markConsumedMessage() {
160+
consumedMessagesCounter.add(1L, attributes);
161+
}
162+
163+
@Override
164+
protected void markAcknowledgedMessage() {
165+
acknowledgedMessagesCounter.add(1L, attributes);
166+
}
167+
168+
@Override
169+
protected void markRejectedMessage() {
170+
rejectedMessagesCounter.add(1L, attributes);
171+
}
172+
173+
@Override
174+
protected void markMessagePublishAcknowledged() {
175+
ackedPublishedMessagesCounter.add(1L, attributes);
176+
}
177+
178+
@Override
179+
protected void markMessagePublishNotAcknowledged() {
180+
nackedPublishedMessagesCounter.add(1L, attributes);
181+
}
182+
183+
@Override
184+
protected void markPublishedMessageUnrouted() {
185+
unroutedPublishedMessagesCounter.add(1L, attributes);
186+
}
187+
188+
public AtomicLong getConnections() {
189+
return connections;
190+
}
191+
192+
public AtomicLong getChannels() {
193+
return channels;
194+
}
195+
}

0 commit comments

Comments
 (0)