Skip to content

Commit 8cba9ad

Browse files
committed
Improve internal insight of instances
Make toString() method of producer, consumer, and internal components return a JSON document with information of their state. Should be handy to understand the state of the overall system (number of connections, etc). Internal, no public contract. References #50
1 parent 33c62cf commit 8cba9ad

13 files changed

+255
-67
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -96,6 +96,7 @@
9696
import java.io.IOException;
9797
import java.io.OutputStream;
9898
import java.lang.reflect.Field;
99+
import java.net.InetSocketAddress;
99100
import java.net.SocketAddress;
100101
import java.nio.charset.StandardCharsets;
101102
import java.time.Duration;
@@ -1237,6 +1238,29 @@ int getPort() {
12371238
return port;
12381239
}
12391240

1241+
String connectionName() {
1242+
StringBuilder builder = new StringBuilder();
1243+
SocketAddress localAddress = localAddress();
1244+
if (localAddress instanceof InetSocketAddress) {
1245+
InetSocketAddress address = (InetSocketAddress) localAddress;
1246+
builder.append(address.getHostString()).append(":").append(address.getPort());
1247+
} else {
1248+
builder.append("?");
1249+
}
1250+
builder.append(" -> ");
1251+
return builder.append(serverAddress()).toString();
1252+
}
1253+
1254+
private String serverAddress() {
1255+
SocketAddress remoteAddress = remoteAddress();
1256+
if (remoteAddress instanceof InetSocketAddress) {
1257+
InetSocketAddress address = (InetSocketAddress) remoteAddress;
1258+
return address.getHostString() + ":" + address.getPort();
1259+
} else {
1260+
return this.host + ":" + this.port;
1261+
}
1262+
}
1263+
12401264
public List<String> route(String routingKey, String superStream) {
12411265
if (routingKey == null || superStream == null) {
12421266
throw new IllegalArgumentException("routing key and stream must not be null");

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+20-19
Original file line numberDiff line numberDiff line change
@@ -194,25 +194,24 @@ int poolSize() {
194194
@Override
195195
public String toString() {
196196
return ("[ \n"
197-
+ pools.entrySet().stream()
198-
.map(
199-
poolEntry ->
200-
" { 'broker' : '"
201-
+ poolEntry.getKey()
202-
+ "', 'clients' : [ "
203-
+ poolEntry.getValue().managers.stream()
204-
.map(
205-
manager ->
206-
"{ 'consumer_count' : "
207-
+ manager.subscriptionTrackers.stream()
208-
.filter(Objects::nonNull)
209-
.count()
210-
+ " }")
211-
.collect(Collectors.joining(", "))
212-
+ " ] }")
213-
.collect(Collectors.joining(", \n"))
214-
+ "\n]")
215-
.replace("'", "\"");
197+
+ pools.entrySet().stream()
198+
.map(
199+
poolEntry ->
200+
" { \"broker\" : \""
201+
+ poolEntry.getKey()
202+
+ "\", \"clients\" : [ "
203+
+ poolEntry.getValue().managers.stream()
204+
.map(
205+
manager ->
206+
"{ \"consumer_count\" : "
207+
+ manager.subscriptionTrackers.stream()
208+
.filter(Objects::nonNull)
209+
.count()
210+
+ " }")
211+
.collect(Collectors.joining(", "))
212+
+ " ] }")
213+
.collect(Collectors.joining(", \n"))
214+
+ "\n]");
216215
}
217216

218217
/**
@@ -267,10 +266,12 @@ boolean isClosing() {
267266
synchronized void assign(byte subscriptionIdInClient, ClientSubscriptionsManager manager) {
268267
this.subscriptionIdInClient = subscriptionIdInClient;
269268
this.manager = manager;
269+
this.consumer.setSubscriptionClient(this.manager.client);
270270
}
271271

272272
synchronized void detachFromManager() {
273273
this.manager = null;
274+
this.consumer.setSubscriptionClient(null);
274275
}
275276
}
276277

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -322,4 +322,9 @@ public void setTime(long time) {
322322
this.time = time;
323323
}
324324
}
325+
326+
@Override
327+
public String toString() {
328+
return "{ \"tracker_count\" : " + this.trackers.size() + " }";
329+
}
325330
}

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+20-21
Original file line numberDiff line numberDiff line change
@@ -144,26 +144,25 @@ int clientCount() {
144144
@Override
145145
public String toString() {
146146
return ("[ \n"
147-
+ pools.entrySet().stream()
148-
.map(
149-
poolEntry ->
150-
" { 'broker' : '"
151-
+ poolEntry.getKey()
152-
+ "', 'clients' : [ "
153-
+ poolEntry.getValue().managers.stream()
154-
.map(
155-
manager ->
156-
"{ 'producer_count' : "
157-
+ manager.producers.size()
158-
+ ", "
159-
+ " 'tracking_consumer_count' : "
160-
+ manager.trackingConsumerTrackers.size()
161-
+ " }")
162-
.collect(Collectors.joining(", "))
163-
+ " ] }")
164-
.collect(Collectors.joining(", \n"))
165-
+ "\n]")
166-
.replace("'", "\"");
147+
+ pools.entrySet().stream()
148+
.map(
149+
poolEntry ->
150+
" { \"broker\" : \""
151+
+ poolEntry.getKey()
152+
+ "\", \"clients\" : [ "
153+
+ poolEntry.getValue().managers.stream()
154+
.map(
155+
manager ->
156+
"{ \"producer_count\" : "
157+
+ manager.producers.size()
158+
+ ", "
159+
+ " \"tracking_consumer_count\" : "
160+
+ manager.trackingConsumerTrackers.size()
161+
+ " }")
162+
.collect(Collectors.joining(", "))
163+
+ " ] }")
164+
.collect(Collectors.joining(", \n"))
165+
+ "\n]");
167166
}
168167

169168
private interface AgentTracker {
@@ -280,7 +279,7 @@ public void assign(byte producerId, Client client, ClientProducersManager manage
280279
synchronized (TrackingConsumerTracker.this) {
281280
this.clientProducersManager = manager;
282281
}
283-
this.consumer.setClient(client);
282+
this.consumer.setTrackingClient(client);
284283
}
285284

286285
@Override

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -44,6 +44,7 @@ class StreamConsumer implements Consumer {
4444
private final Runnable initCallback;
4545
private volatile Runnable closingCallback;
4646
private volatile Client trackingClient;
47+
private volatile Client subscriptionClient;
4748
private volatile Status status;
4849
private volatile long lastRequestedStoredOffset = 0;
4950

@@ -177,10 +178,14 @@ boolean isOpen() {
177178
return !this.closed.get();
178179
}
179180

180-
synchronized void setClient(Client client) {
181+
synchronized void setTrackingClient(Client client) {
181182
this.trackingClient = client;
182183
}
183184

185+
void setSubscriptionClient(Client client) {
186+
this.subscriptionClient = client;
187+
}
188+
184189
synchronized void unavailable() {
185190
this.status = Status.NOT_AVAILABLE;
186191
this.trackingClient = null;
@@ -253,6 +258,22 @@ public int hashCode() {
253258

254259
@Override
255260
public String toString() {
256-
return "StreamConsumer{" + "id=" + id + ", stream='" + stream + '\'' + '}';
261+
Client subscriptionClient = this.subscriptionClient;
262+
Client trackingClient = this.trackingClient;
263+
return "{ "
264+
+ "\"id\" : "
265+
+ id
266+
+ ","
267+
+ "\"stream\" : \""
268+
+ stream
269+
+ "\","
270+
+ "\"subscription_client\" : "
271+
+ (subscriptionClient == null
272+
? "null"
273+
: ("\"" + subscriptionClient.connectionName() + "\""))
274+
+ ", "
275+
+ "\"tracking_client\" : "
276+
+ (trackingClient == null ? "null" : ("\"" + trackingClient.connectionName() + "\""))
277+
+ "}";
257278
}
258279
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -604,13 +604,15 @@ TrackingConsumerRegistration registerTrackingConsumer(
604604
@Override
605605
public String toString() {
606606
Client locator = this.locator;
607-
return "{ locator : "
608-
+ (locator == null ? "null" : ("'" + locator.getHost() + ":" + locator.getPort() + "'"))
607+
return "{ \"locator\" : "
608+
+ (locator == null ? "null" : ("\"" + locator.connectionName() + "\""))
609609
+ ", "
610-
+ "'producers' : "
610+
+ "\"producers\" : "
611611
+ this.producersCoordinator
612-
+ ", 'consumers' : "
612+
+ ", \"consumers\" : "
613613
+ this.consumersCoordinator
614+
+ ", \"offset_tracking\" : "
615+
+ this.offsetTrackingCoordinator
614616
+ "}";
615617
}
616618

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Map.Entry;
39+
import java.util.Objects;
3940
import java.util.SortedMap;
4041
import java.util.TreeMap;
4142
import java.util.concurrent.ConcurrentHashMap;
@@ -52,8 +53,11 @@
5253

5354
class StreamProducer implements Producer {
5455

56+
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
57+
5558
private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class);
5659
private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {};
60+
private final long id;
5761
private final MessageAccumulator accumulator;
5862
// FIXME investigate a more optimized data structure to handle pending messages
5963
private final ConcurrentMap<Long, AccumulatedEntity> unconfirmedMessages;
@@ -87,6 +91,7 @@ class StreamProducer implements Producer {
8791
Duration confirmTimeout,
8892
Duration enqueueTimeout,
8993
StreamEnvironment environment) {
94+
this.id = ID_SEQUENCE.getAndIncrement();
9095
this.environment = environment;
9196
this.name = name;
9297
this.stream = stream;
@@ -479,4 +484,36 @@ interface ConfirmationCallback {
479484

480485
int handle(boolean confirmed, short code);
481486
}
487+
488+
@Override
489+
public boolean equals(Object o) {
490+
if (this == o) {
491+
return true;
492+
}
493+
if (o == null || getClass() != o.getClass()) {
494+
return false;
495+
}
496+
StreamProducer that = (StreamProducer) o;
497+
return id == that.id && stream.equals(that.stream);
498+
}
499+
500+
@Override
501+
public int hashCode() {
502+
return Objects.hash(id, stream);
503+
}
504+
505+
@Override
506+
public String toString() {
507+
Client client = this.client;
508+
return "{ "
509+
+ "\"id\" : "
510+
+ id
511+
+ ","
512+
+ "\"stream\" : \""
513+
+ stream
514+
+ "\","
515+
+ "\"publishing_client\" : "
516+
+ (client == null ? "null" : ("\"" + client.connectionName() + "\""))
517+
+ "}";
518+
}
482519
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -760,7 +760,6 @@ public Integer call() throws Exception {
760760
});
761761

762762
Consumer consumer = consumerBuilder.build();
763-
764763
return consumer;
765764
})
766765
.collect(Collectors.toList()));

0 commit comments

Comments
 (0)