Skip to content

Commit 33c62cf

Browse files
committed
Add environment-scoped sequence to name connections
Internal counter to help differentiate connections. References #50
1 parent 8a5e875 commit 33c62cf

14 files changed

+200
-31
lines changed

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@
473473
<style>GOOGLE</style>
474474
</googleJavaFormat>
475475
</java>
476+
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
476477
<licenseHeader> <!-- specify either content or file, but not both -->
477478
<content>// Copyright (c) $YEAR VMware, Inc. or its affiliates. All rights reserved.
478479
//

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.stream.impl.Client.MetadataListener;
3232
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3333
import com.rabbitmq.stream.impl.Client.ShutdownListener;
34+
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
3435
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3536
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
3637
import java.util.ArrayList;
@@ -46,6 +47,7 @@
4647
import java.util.concurrent.ConcurrentHashMap;
4748
import java.util.concurrent.CopyOnWriteArrayList;
4849
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.function.Function;
4951
import java.util.stream.Collectors;
5052
import java.util.stream.IntStream;
5153
import org.slf4j.Logger;
@@ -64,12 +66,17 @@ class ConsumersCoordinator {
6466
private final Map<String, ManagerPool> pools = new ConcurrentHashMap<>();
6567
private final ClientFactory clientFactory;
6668
private final int maxConsumersByConnection;
69+
private final Function<ClientConnectionType, String> connectionNamingStrategy;
6770

6871
ConsumersCoordinator(
69-
StreamEnvironment environment, int maxConsumersByConnection, ClientFactory clientFactory) {
72+
StreamEnvironment environment,
73+
int maxConsumersByConnection,
74+
Function<ClientConnectionType, String> connectionNamingStrategy,
75+
ClientFactory clientFactory) {
7076
this.environment = environment;
7177
this.clientFactory = clientFactory;
7278
this.maxConsumersByConnection = maxConsumersByConnection;
79+
this.connectionNamingStrategy = connectionNamingStrategy;
7380
}
7481

7582
private static String keyForClientSubscription(Client.Broker broker) {
@@ -522,7 +529,9 @@ private ClientSubscriptionsManager(
522529
ClientFactoryContext clientFactoryContext =
523530
ClientFactoryContext.fromParameters(
524531
clientParameters
525-
.clientProperty("connection_name", "rabbitmq-stream-consumer")
532+
.clientProperty(
533+
"connection_name",
534+
connectionNamingStrategy.apply(ClientConnectionType.CONSUMER))
526535
.chunkListener(chunkListener)
527536
.creditNotification(creditNotification)
528537
.messageListener(messageListener)

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.stream.impl.Client.PublishErrorListener;
2525
import com.rabbitmq.stream.impl.Client.Response;
2626
import com.rabbitmq.stream.impl.Client.ShutdownListener;
27+
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
2728
import com.rabbitmq.stream.impl.Utils.ClientFactory;
2829
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
2930
import java.util.Collection;
@@ -35,6 +36,7 @@
3536
import java.util.concurrent.CopyOnWriteArrayList;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.function.Function;
3840
import java.util.stream.Collectors;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
@@ -48,16 +50,19 @@ class ProducersCoordinator {
4850
private final ClientFactory clientFactory;
4951
private final Map<String, ManagerPool> pools = new ConcurrentHashMap<>();
5052
private final int maxProducersByClient, maxTrackingConsumersByClient;
53+
private final Function<ClientConnectionType, String> connectionNamingStrategy;
5154

5255
ProducersCoordinator(
5356
StreamEnvironment environment,
5457
int maxProducersByClient,
5558
int maxTrackingConsumersByClient,
59+
Function<ClientConnectionType, String> connectionNamingStrategy,
5660
ClientFactory clientFactory) {
5761
this.environment = environment;
5862
this.clientFactory = clientFactory;
5963
this.maxProducersByClient = maxProducersByClient;
6064
this.maxTrackingConsumersByClient = maxTrackingConsumersByClient;
65+
this.connectionNamingStrategy = connectionNamingStrategy;
6166
}
6267

6368
private static String keyForManagerPool(Client.Broker broker) {
@@ -493,7 +498,9 @@ private ClientProducersManager(
493498
.publishErrorListener(publishErrorListener)
494499
.shutdownListener(shutdownListener)
495500
.metadataListener(metadataListener)
496-
.clientProperty("connection_name", "rabbitmq-stream-producer"))
501+
.clientProperty(
502+
"connection_name",
503+
connectionNamingStrategy.apply(ClientConnectionType.PRODUCER)))
497504
.key(owner.name);
498505
this.client = cf.client(connectionFactoryContext);
499506
clientInitializedInManager.set(true);

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -34,6 +34,7 @@
3434
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
3535
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3636
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
37+
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
3738
import io.netty.buffer.ByteBufAllocator;
3839
import io.netty.channel.EventLoopGroup;
3940
import io.netty.channel.nio.NioEventLoopGroup;
@@ -102,7 +103,8 @@ class StreamEnvironment implements Environment {
102103
int maxConsumersByConnection,
103104
DefaultTlsConfiguration tlsConfiguration,
104105
ByteBufAllocator byteBufAllocator,
105-
boolean lazyInit) {
106+
boolean lazyInit,
107+
Function<ClientConnectionType, String> connectionNamingStrategy) {
106108
this(
107109
scheduledExecutorService,
108110
clientParametersPrototype,
@@ -116,6 +118,7 @@ class StreamEnvironment implements Environment {
116118
tlsConfiguration,
117119
byteBufAllocator,
118120
lazyInit,
121+
connectionNamingStrategy,
119122
cp -> new Client(cp));
120123
}
121124

@@ -132,6 +135,7 @@ class StreamEnvironment implements Environment {
132135
DefaultTlsConfiguration tlsConfiguration,
133136
ByteBufAllocator byteBufAllocator,
134137
boolean lazyInit,
138+
Function<ClientConnectionType, String> connectionNamingStrategy,
135139
Function<Client.ClientParameters, Client> clientFactory) {
136140
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
137141
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
@@ -202,10 +206,14 @@ class StreamEnvironment implements Environment {
202206
this,
203207
maxProducersByConnection,
204208
maxTrackingConsumersByConnection,
209+
connectionNamingStrategy,
205210
Utils.coordinatorClientFactory(this));
206211
this.consumersCoordinator =
207212
new ConsumersCoordinator(
208-
this, maxConsumersByConnection, Utils.coordinatorClientFactory(this));
213+
this,
214+
maxConsumersByConnection,
215+
connectionNamingStrategy,
216+
Utils.coordinatorClientFactory(this));
209217
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
210218

211219
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
@@ -231,7 +239,10 @@ class StreamEnvironment implements Environment {
231239
newLocatorParameters
232240
.host(address.host())
233241
.port(address.port())
234-
.clientProperty("connection_name", "rabbitmq-stream-locator"));
242+
.clientProperty(
243+
"connection_name",
244+
connectionNamingStrategy.apply(
245+
ClientConnectionType.LOCATOR)));
235246
LOGGER.debug("Locator connected on {}", address);
236247
return newLocator;
237248
})
@@ -254,7 +265,9 @@ class StreamEnvironment implements Environment {
254265
.duplicate()
255266
.host(address.host())
256267
.port(address.port())
257-
.clientProperty("connection_name", "rabbitmq-stream-locator")
268+
.clientProperty(
269+
"connection_name",
270+
connectionNamingStrategy.apply(ClientConnectionType.LOCATOR))
258271
.shutdownListener(shutdownListenerReference.get());
259272
try {
260273
this.locator = clientFactory.apply(locatorParameters);

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -22,6 +22,7 @@
2222
import com.rabbitmq.stream.EnvironmentBuilder;
2323
import com.rabbitmq.stream.StreamException;
2424
import com.rabbitmq.stream.compression.CompressionCodecFactory;
25+
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
2526
import com.rabbitmq.stream.metrics.MetricsCollector;
2627
import com.rabbitmq.stream.sasl.CredentialsProvider;
2728
import com.rabbitmq.stream.sasl.SaslConfiguration;
@@ -36,6 +37,7 @@
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.concurrent.ScheduledExecutorService;
40+
import java.util.function.Function;
3941
import java.util.stream.Collectors;
4042
import javax.net.ssl.SSLException;
4143
import org.slf4j.Logger;
@@ -61,6 +63,8 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6163
private CompressionCodecFactory compressionCodecFactory;
6264
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
6365
private boolean lazyInit = false;
66+
private Function<ClientConnectionType, String> connectionNamingStrategy =
67+
Utils.defaultConnectionNamingStrategy();
6468

6569
public StreamEnvironmentBuilder() {}
6670

@@ -298,7 +302,8 @@ public Environment build() {
298302
maxConsumersByConnection,
299303
tls,
300304
byteBufAllocator,
301-
lazyInit);
305+
lazyInit,
306+
connectionNamingStrategy);
302307
}
303308

304309
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/main/java/com/rabbitmq/stream/impl/Utils.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -21,9 +21,13 @@
2121
import java.util.Arrays;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24+
import java.util.Locale;
2425
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicLong;
2629
import java.util.function.Consumer;
30+
import java.util.function.Function;
2731
import java.util.function.LongConsumer;
2832
import java.util.function.Predicate;
2933
import javax.net.ssl.X509TrustManager;
@@ -213,4 +217,23 @@ public X509Certificate[] getAcceptedIssuers() {
213217
return new X509Certificate[0];
214218
}
215219
}
220+
221+
enum ClientConnectionType {
222+
CONSUMER,
223+
PRODUCER,
224+
LOCATOR
225+
}
226+
227+
static Function<ClientConnectionType, String> defaultConnectionNamingStrategy() {
228+
Map<ClientConnectionType, AtomicLong> sequences =
229+
new ConcurrentHashMap<>(ClientConnectionType.values().length);
230+
Map<ClientConnectionType, String> prefixes =
231+
new ConcurrentHashMap<>(ClientConnectionType.values().length);
232+
for (ClientConnectionType type : ClientConnectionType.values()) {
233+
sequences.put(type, new AtomicLong(0));
234+
prefixes.put(type, "rabbitmq-stream-" + type.name().toLowerCase(Locale.ENGLISH) + "-");
235+
}
236+
return clientConnectionType ->
237+
prefixes.get(clientConnectionType) + sequences.get(clientConnectionType);
238+
}
216239
}

src/test/java/com/rabbitmq/stream/Host.java

+57-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,15 +13,21 @@
1313
1414
package com.rabbitmq.stream;
1515

16+
import com.google.common.reflect.TypeToken;
17+
import com.google.gson.Gson;
1618
import java.io.BufferedReader;
1719
import java.io.IOException;
1820
import java.io.InputStream;
1921
import java.io.InputStreamReader;
22+
import java.util.List;
2023
import java.util.concurrent.Callable;
24+
import java.util.stream.Collectors;
2125

2226
public class Host {
2327

24-
public static final String DOCKER_PREFIX = "DOCKER:";
28+
private static final String DOCKER_PREFIX = "DOCKER:";
29+
30+
private static final Gson GSON = new Gson();
2531

2632
private static String capture(InputStream is) throws IOException {
2733
BufferedReader br = new BufferedReader(new InputStreamReader(is));
@@ -93,9 +99,29 @@ public static Process rabbitmqctl(String command) throws IOException {
9399
}
94100

95101
public static Process killConnection(String connectionName) throws IOException {
102+
List<ConnectionInfo> cs = listConnections();
103+
if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
104+
throw new IllegalArgumentException(
105+
String.format(
106+
"Could not find 1 connection '%s' in stream connections: %s",
107+
connectionName,
108+
cs.stream()
109+
.map(ConnectionInfo::clientProvidedName)
110+
.collect(Collectors.joining(", "))));
111+
}
96112
return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
97113
}
98114

115+
private static List<ConnectionInfo> listConnections() throws IOException {
116+
Process process =
117+
rabbitmqctl("list_stream_connections --formatter json conn_name,client_properties");
118+
return toConnectionInfoList(capture(process.getInputStream()));
119+
}
120+
121+
static List<ConnectionInfo> toConnectionInfoList(String json) {
122+
return GSON.fromJson(json, new TypeToken<List<ConnectionInfo>>() {}.getType());
123+
}
124+
99125
public static Process killStreamLeaderProcess(String stream) throws IOException {
100126
return rabbitmqctl(
101127
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
@@ -186,4 +212,33 @@ public void close() throws Exception {
186212
this.end.call();
187213
}
188214
}
215+
216+
public static class ConnectionInfo {
217+
218+
private String conn_name;
219+
private List<List<String>> client_properties;
220+
221+
public String name() {
222+
return this.conn_name;
223+
}
224+
225+
public String clientProvidedName() {
226+
return client_properties.stream()
227+
.filter(p -> "connection_name".equals(p.get(0)))
228+
.findFirst()
229+
.get()
230+
.get(2);
231+
}
232+
233+
@Override
234+
public String toString() {
235+
return "ConnectionInfo{"
236+
+ "conn_name='"
237+
+ conn_name
238+
+ '\''
239+
+ ", client_properties="
240+
+ client_properties
241+
+ '}';
242+
}
243+
}
189244
}

0 commit comments

Comments
 (0)